from typing import Any from kit.kit import DataHelper from task.base_task import base_task class check_high_volume_func_task(base_task): """ 检查持仓股票当日成交量是否异常放量: 如果当日成交量大于过去 HV_duration 天内最大成交量的 HV_ratio 倍,则视为异常,执行卖出操作。 参数: context: 聚宽平台传入的交易上下文对象 """ def __init__(self, strategy, sub_task=None): super().__init__(strategy, sub_task) def config(self, context: Any): # HV_control 成交量开关 self.HV_control = self.get_config('HV_control') # HV_duration 成交量持续时间(天) self.HV_duration = self.get_config('HV_duration') # HV_ratio 成交量放大倍数 self.HV_ratio = self.get_config('HV_ratio') self.positions = context.portfolio.positions def run(self, context): current_data = DataHelper.get_current_data() for stock in list(context.portfolio.positions.keys()): self.log.info(f"检查股票 {stock} 的成交量") if current_data[stock].paused: self.log.warning(f"股票 {stock} 已暂停交易,跳过检查") continue if current_data[stock].last_price == current_data[stock].high_limit: self.log.warning(f"股票 {stock} 当前价格已触及涨停,跳过检查") continue if context.portfolio.positions[stock].closeable_amount == 0: self.log.warning(f"股票 {stock} 没有可卖出数量,跳过检查") continue df_volume = get_bars( stock, count=self.HV_duration, unit='1d', fields=['volume'], include_now=True, df=True ) if df_volume is not None and not df_volume.empty: if df_volume['volume'].iloc[-1] > self.HV_ratio * df_volume['volume'].max(): self.log.info(f"检测到股票 {stock} 出现异常放量,执行卖出操作。") # 通过持仓监控器注册卖出请求,而不是直接卖出 self.strategy.state.set_sell_request(stock, self.positions[stock], 'highvolume') def handle(self, context): pass def end(self, context): pass