easyquant/task/check_high_volume_func_task.py

65 lines
2.4 KiB
Python
Raw Permalink Normal View History

2025-07-08 23:00:32 +08:00
from typing import Any
2025-07-03 23:39:31 +08:00
from kit.kit import DataHelper
2025-07-08 23:00:32 +08:00
from task.base_task import base_task
2025-07-03 23:39:31 +08:00
class check_high_volume_func_task(base_task):
"""
检查持仓股票当日成交量是否异常放量
如果当日成交量大于过去 HV_duration 天内最大成交量的 HV_ratio 则视为异常执行卖出操作
参数:
context: 聚宽平台传入的交易上下文对象
"""
def __init__(self, strategy, sub_task=None):
super().__init__(strategy, sub_task)
2025-07-08 23:00:32 +08:00
def config(self, context: Any):
# HV_control 成交量开关
self.HV_control = self.get_config('HV_control')
# HV_duration 成交量持续时间(天)
self.HV_duration = self.get_config('HV_duration')
# HV_ratio 成交量放大倍数
self.HV_ratio = self.get_config('HV_ratio')
self.positions = context.portfolio.positions
2025-07-03 23:39:31 +08:00
def run(self, context):
current_data = DataHelper.get_current_data()
for stock in list(context.portfolio.positions.keys()):
2025-07-08 23:00:32 +08:00
self.log.info(f"检查股票 {stock} 的成交量")
2025-07-03 23:39:31 +08:00
if current_data[stock].paused:
2025-07-08 23:00:32 +08:00
self.log.warning(f"股票 {stock} 已暂停交易,跳过检查")
2025-07-03 23:39:31 +08:00
continue
if current_data[stock].last_price == current_data[stock].high_limit:
2025-07-08 23:00:32 +08:00
self.log.warning(f"股票 {stock} 当前价格已触及涨停,跳过检查")
2025-07-03 23:39:31 +08:00
continue
if context.portfolio.positions[stock].closeable_amount == 0:
2025-07-08 23:00:32 +08:00
self.log.warning(f"股票 {stock} 没有可卖出数量,跳过检查")
2025-07-03 23:39:31 +08:00
continue
df_volume = get_bars(
stock,
count=self.HV_duration,
unit='1d',
fields=['volume'],
include_now=True,
df=True
)
if df_volume is not None and not df_volume.empty:
if df_volume['volume'].iloc[-1] > self.HV_ratio * df_volume['volume'].max():
self.log.info(f"检测到股票 {stock} 出现异常放量,执行卖出操作。")
# 通过持仓监控器注册卖出请求,而不是直接卖出
2025-07-08 23:00:32 +08:00
self.strategy.state.set_sell_request(stock, self.positions[stock], 'highvolume')
2025-07-03 23:39:31 +08:00
def handle(self, context):
pass
def end(self, context):
pass