from typing import Any from kit.kit import DataHelper from strategy import trade_strategy from task.base_task import base_task class check_positions_stop_loss_task(base_task): def __init__(self, strategy: trade_strategy): super().__init__(strategy) def config(self, context: Any): self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略 self.hold_list = self.strategy.state.data['position_list'] # 持仓列表 self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关 self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略 self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值 self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值 # self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因 self.index_stocks = self.get_config('index_stocks') # 指数代码 self.temp_sells_list = {} def run(self, context: Any): if len(self.hold_list) == 0: self.log.debug("当前无持仓,跳过止损检查。") return if self.stoploss_switch and self.today_trade_switch: if self.stoploss_strategy == 1 or self.stoploss_strategy == 3: # 个股止盈或止损判断 for stock in list(context.portfolio.positions.keys()): pos = context.portfolio.positions[stock] if pos.price >= pos.avg_cost * 2: # 通过持仓监控器注册卖出请求,而不是直接卖出 # self.strategy.state.set_sell_request(stock, pos, 'take_profit') self.sell_request(stock, pos, 'take_profit') self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。") elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data: # 通过持仓监控器注册卖出请求,而不是直接卖出 self.strategy.state.set_sell_request(stock, pos, 'stop_loss') self.sell_request(stock, pos, 'take_profit') self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。") # self.strategy.state.set_sell_reason("stoploss") if self.stoploss_strategy == 2 or self.stoploss_strategy == 3: # 大盘止损判断,若整体市场跌幅过大则平仓所有股票 stock_list = DataHelper.get_index_stocks(self.index_stocks) df = DataHelper.get_price_safe( stock_list, end_date=context.previous_date, frequency='daily', fields=['close', 'open'], count=1, panel=False ) if df is not None and not df.empty: down_ratio = (df['close'] / df['open']).mean() if down_ratio <= self.stoploss_market_rate_data: # self.strategy.state.set_sell_reason("stoploss") self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。") for stock in list(context.portfolio.positions.keys()): pos = context.portfolio.positions[stock] # 通过持仓监控器注册卖出请求,而不是直接卖出 self.sell_request(stock, pos, 'market_stop_loss') def handle(self, context: Any): # 保存状态 for key, value in self.temp_sells_list.items(): stock, reason = key.split('_') self.strategy.state.set_sell_request(stock, value, reason) pass def end(self, context: Any): self.log.info("-----------------") self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用")) self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易")) self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if self.stoploss_strategy == 3 else "异常")) self.log.info("当前卖出请求") for key, value in self.temp_sells_list.items(): self.log.info(f" {key}: {value}") # 总卖出请求 sells_list = self.strategy.state.get_sell_requests() if len(sells_list) > 0: self.log.info("总卖出请求:") for stock, data in sells_list.items(): self.log.info(f" {stock}: {data}") pass def sell_request(self, stock, data, reason): self.temp_sells_list[f"{stock}_{reason}"] = data