from typing import Any from kit.kit import DataHelper from task.base_task import base_task class check_limit_up_func_task(base_task): """ 暂时不用 暂时不用 暂时不用 暂时不用 暂时不用 检查昨日处于涨停状态的股票在当前是否破板。 如破板(当前价格低于涨停价),则立即卖出该股票,并记录卖出原因为 "limitup"。 参数: context: 聚宽平台传入的交易上下文对象 """ def __init__(self, strategy, sub_task=None): super().__init__(strategy, sub_task) def config(self, context: Any): self.yesterday_HL_list = self.strategy.state.get_yesterday_high_list() def run(self, context: Any): now_time = context.current_dt # 获取当前时间 if self.yesterday_HL_list: for stock in self.yesterday_HL_list: # 使用1分钟周期数据判断当前价格和涨停价是否符合条件 current_data = DataHelper.get_price_safe( stock, end_date=now_time, frequency='1m', fields=['close', 'high_limit'], count=1, panel=False, fill_paused=True ) if current_data is not None and not current_data.empty: if current_data.iloc[0]['close'] < current_data.iloc[0]['high_limit']: self.log.info(f"股票 {stock} 涨停破板,触发卖出操作。") # 通过持仓监控器注册卖出请求,而不是直接卖出 self.strategy.state.set_sell_request(stock, current_data.iloc[0], 'limitup') else: self.log.info(f"股票 {stock} 仍维持涨停状态。") def handle(self, context: Any): pass def end(self, context: Any): pass