from typing import Any, List class exchange: def close_positions(self, context: Any, position: Any): """ 关闭所有持仓 """ raise NotImplementedError("This method should be implemented by subclasses") def buy_security(self, context: Any, target_list: List[str]) -> None: """ 买入操作:对目标股票执行买入,下单资金均摊分配 """ raise NotImplementedError("This method should be implemented by subclasses") def open_position(self, security: str, value: float) -> bool: """ 开仓操作:尝试买入指定股票 参数: security: 股票代码 value: 分配给该股票的资金 返回: 若下单成功(部分或全部成交)返回 True,否则返回 False """ raise NotImplementedError("This method should be implemented by subclasses") class jq_channel(exchange): def __init__(self): pass def order_target_value_(self, stock: str, value: float) -> Any: """ 封装 order_target_value 函数进行下单,同时记录中文日志和异常信息 参数: security: 股票代码 value: 下单目标金额 返回: 下单后生成的订单对象;若失败返回 None """ try: order = order_target_value(stock, value) return order except Exception as e: return None def close_positions(self, context: Any, position: Any): """ 平仓操作:尽可能将指定股票仓位全部卖出 参数: position: 持仓对象 返回: 若下单后订单全部成交返回 True,否则返回 False """ security = position.security order = self.order_target_value_(security, 0) if order is None: # log.error(f"股票 {security} 下单失败,可能是API调用错误") return False if order.status == OrderStatus.held and order.filled == order.amount: # log.info(f"股票 {security} 卖出订单完全成交") return True else: # log.warning(f"股票 {security} 卖出订单部分成交或未成交,状态: {order.status}, 已成交: {order.filled}/{order.amount}") return False def buy_security(self, context: Any, target_list: List[str]) -> None: """ 买入操作:对目标股票执行买入,下单资金均摊分配 参数: context: 聚宽平台传入的交易上下文对象 target_list: 目标股票代码列表 """ position_count = len(context.portfolio.positions) target_num = len(target_list) if target_num > position_count: try: value = context.portfolio.cash / (target_num - position_count) except ZeroDivisionError as e: # log.error(f"资金分摊时除零错误: {e}") return for stock in target_list: if context.portfolio.positions[stock].total_amount == 0: if self.open_position(stock, value): # log.info(f"已买入股票 {stock},分配资金 {value:.2f}") # self.not_buy_again.append(stock) if len(context.portfolio.positions) == target_num: break def open_position(self, security: str, value: float) -> bool: """ 开仓操作:尝试买入指定股票 参数: security: 股票代码 value: 分配给该股票的资金 返回: 若下单成功(部分或全部成交)返回 True,否则返回 False """ order = self.order_target_value_(security, value) if order is not None and order.filled > 0: return True return False