修改版本

This commit is contained in:
summer 2025-07-08 23:00:32 +08:00
parent 196f91d622
commit 270b342bfd
5 changed files with 212 additions and 63 deletions

View File

@ -1,6 +1,7 @@
from base.base_task import base_task
from typing import Any
from kit.kit import DataHelper
from task.process_pending_sells_func_task import process_pending_sells_func_task
from task.base_task import base_task
class check_high_volume_func_task(base_task):
@ -15,24 +16,31 @@ class check_high_volume_func_task(base_task):
def __init__(self, strategy, sub_task=None):
super().__init__(strategy, sub_task)
def init(self, context):
self.name = "check_high_volume_func_task"
self.remark = "检查高成交量任务"
self.memo = "检查是否有异常成交量的股票"
def config(self, context: Any):
# HV_control 成交量开关
self.HV_control = self.get_config('HV_control')
# HV_duration 成交量持续时间(天)
self.HV_duration = self.get_config('HV_duration')
# HV_ratio 成交量放大倍数
self.HV_ratio = self.get_config('HV_ratio')
self.positions = context.portfolio.positions
def begin(self, context):
self.HV_control = bool(self.config['HV_control'])
self.HV_duration = int(self.config['HV_duration'])
self.HV_ratio = float(self.config['HV_ratio'])
def run(self, context):
current_data = DataHelper.get_current_data()
for stock in list(context.portfolio.positions.keys()):
self.log.info(f"检查股票 {stock} 的成交量")
if current_data[stock].paused:
self.log.warning(f"股票 {stock} 已暂停交易,跳过检查")
continue
if current_data[stock].last_price == current_data[stock].high_limit:
self.log.warning(f"股票 {stock} 当前价格已触及涨停,跳过检查")
continue
if context.portfolio.positions[stock].closeable_amount == 0:
self.log.warning(f"股票 {stock} 没有可卖出数量,跳过检查")
continue
df_volume = get_bars(
stock,
@ -46,11 +54,8 @@ class check_high_volume_func_task(base_task):
if df_volume['volume'].iloc[-1] > self.HV_ratio * df_volume['volume'].max():
self.log.info(f"检测到股票 {stock} 出现异常放量,执行卖出操作。")
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.position_monitor.register_sell_request(stock, 'high_volume')
self.strategy.state.set_sell_request(stock, self.positions[stock], 'highvolume')
# 处理所有待卖出请求
task = process_pending_sells_func_task(self.strategy, self.name)
task.process(context)
def handle(self, context):
pass

View File

@ -6,6 +6,11 @@ from task.base_task import base_task
class check_limit_up_func_task(base_task):
"""
暂时不用
暂时不用
暂时不用
暂时不用
暂时不用
检查昨日处于涨停状态的股票在当前是否破板
如破板当前价格低于涨停价则立即卖出该股票并记录卖出原因为 "limitup"
参数:
@ -15,15 +20,9 @@ class check_limit_up_func_task(base_task):
def __init__(self, strategy, sub_task=None):
super().__init__(strategy, sub_task)
def init(self, context: Any):
self.name = "check_limit_up_task"
self.remark = "检查涨停任务"
self.memo = "检查是否有涨停破板的股票"
def begin(self, context: Any):
def config(self, context: Any):
self.yesterday_HL_list = self.strategy.state.get_yesterday_high_list()
def run(self, context: Any):
now_time = context.current_dt # 获取当前时间
if self.yesterday_HL_list:
@ -46,10 +45,6 @@ class check_limit_up_func_task(base_task):
else:
self.log.info(f"股票 {stock} 仍维持涨停状态。")
# 处理待卖出股票
# if self.reason_to_sell[0] == 'limitup':
# sells_task = process_pending_sells_func_task(self.strategy, self.name)
# sells_task.process(context)
def handle(self, context: Any):
pass

View File

@ -7,6 +7,12 @@ from task.process_pending_buys_func_task import process_pending_buys_func_task
class check_remain_amount_func_task(base_task):
"""
暂不启用
暂不启用
暂不启用
暂不启用
暂不启用
检查账户资金与持仓数量
如果因涨停破板卖出导致持仓不足则从目标股票中筛选未买入股票进行补仓操作
参数:

View File

@ -23,24 +23,20 @@ class trade_afternoon_task(base_task):
def begin(self, context):
self.no_trading_today_signal = self.config['no_trading_today_signal']
self.HV_control = bool(self.config['HV_control']) # 是否启用成交量监控
def config(self, context):
self.today_trade_switch = self.get_config('today_trade_switch')
self.HV_control = self.get_config('HV_control') # 是否启用成交量监控
def run(self, context):
if not self.no_trading_today_signal:
check_limit_up_task = check_limit_up_func_task(self.strategy, sub_task=self.sub_task)
check_limit_up_task.process(context)
if not self.today_trade_switch:
# check_limit_up_task = check_limit_up_func_task(self.strategy, sub_task=self.sub_task)
# check_limit_up_task.process(context)
if self.HV_control:
check_high_volume_task = check_high_volume_func_task(self.strategy)
check_high_volume_task.process(context)
# 先处理待卖出股票
process_pending_sells_task = process_pending_sells_task(self.strategy)
process_pending_sells_task.process(context)
# 再检查是否需要补仓
check_remain_amount_task = check_remain_amount_func_task(self.strategy, sub_task=self.sub_task)
check_remain_amount_task.process(context)

203
test.py
View File

@ -9,7 +9,6 @@ from typing import Any, Optional, List, Dict
import pandas as pd
class network_config:
def __init__(self):
@ -204,7 +203,7 @@ class network_config:
response.raise_for_status()
@staticmethod
def insert_state(strategy_id: str,state_str:str,trade_date:str):
def insert_state(strategy_id: str, state_str: str, trade_date: str):
"""
:param strategy_id:
@ -294,14 +293,12 @@ class strategy_config:
id = self.configkey_id.get(key)
self.connection.update_config(id, value)
self.config[key]['new_value'] = value
def get_config(self, key: str):
func = conf.config_type(key)
return func(key)
class hold_data:
def __init__(self, data: dict):
self.stock = None
@ -764,7 +761,6 @@ class DataHelper:
return [stock for stock in stock_list if stock not in not_buy_again]
class Logger:
"""
简单的日志操作类支持控制台和文件输出
@ -1099,8 +1095,6 @@ class base_task:
return func(self.configs[key]['new_value'], data_params)
class check_state_before_task(base_task):
"""
重置state信息
@ -1160,9 +1154,6 @@ class check_market_env_task(base_task):
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.current_date = context.current_dt.date()
@ -1177,7 +1168,6 @@ class check_market_env_task(base_task):
self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号
def run(self, context: Any):
#
if self.filter_month_clear_switch and self.in_special_month:
@ -1193,15 +1183,13 @@ class check_market_env_task(base_task):
# 如果是特殊月份但没有清仓条件,则继续交易
self.today_trade_switch = True
def handle(self, context: Any):
# 保存状态
old_today_trade_switch = self.get_config("today_trade_switch")
if old_today_trade_switch != self.today_trade_switch:
self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量
self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量
self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}")
def end(self, context: Any):
self.log.info("-----------------")
@ -1214,6 +1202,7 @@ class check_market_env_task(base_task):
# 今日清仓
class check_positions_stop_loss_task(base_task):
def __init__(self, strategy: trade_strategy):
@ -1302,6 +1291,7 @@ class check_positions_stop_loss_task(base_task):
def sell_request(self, stock, data, reason):
self.temp_sells_list[f"{stock}_{reason}"] = data
class buy_stocks_func_task(base_task):
"""
每周调仓策略
@ -1321,7 +1311,9 @@ class buy_stocks_func_task(base_task):
# 获取当前持仓列表
self.hold_list = self.strategy.state.get_hold_list()
self.index_stocks = self.get_config('index_stocks')
# 调仓的目标股票列表排序
self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num)
# 限制目标股票列表长度为 stock_num
self.temp_target_list = self.target_list[:self.stock_num]
# 昨日涨停股票列表
self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list()
@ -1357,12 +1349,10 @@ class buy_stocks_func_task(base_task):
if buy_targets:
for stock in buy_targets:
pos = context.portfolio.positions.get(stock)
# self.strategy.state.set_buy_request(stock, pos, 'buy')
self.buys_request(stock, pos, 'buy')
self.log.info(f"调仓决策:将 {len(buy_targets)} 只股票加入待买入队列: {buy_targets}")
self.log.info(f"调仓决策:将 {stock} 加入待买入队列")
def handle(self, context: Any):
self.strategy.set_target_list(self.temp_target_list)
pass
def end(self, context: Any):
@ -1376,6 +1366,152 @@ class buy_stocks_func_task(base_task):
self.temp_buys_list[f"{stock}_{reason}"] = data
class process_pending_sells_task(base_task):
"""
处理待卖出股票队列
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.sell_request_list = self.strategy.state.get_sell_requests()
def run(self, context: Any):
# 如果没有待卖出股票,直接返回
if len(self.sell_request_list) == 0:
self.log.info("没有待卖出股票,跳过处理")
return
self.log.info("开始处理待卖出股票队列")
current_data = DataHelper.get_current_data()
for stock in self.sell_request_list:
self.log.info(f"处理待卖出股票: {stock}")
if stock not in context.portfolio.positions:
# 股票已不在持仓中,可能已通过其他方式卖出
self.log.warning(f" 股票 {stock} 不在当前持仓中,跳过处理")
continue
position = context.portfolio.positions[stock]
if position.closeable_amount <= 0:
# 没有可卖出数量
self.log.warning(f" 股票 {stock} 没有可卖出数量,跳过处理")
continue
# 检查是否可以交易
if current_data[stock].paused:
self.log.warning(f" 股票 {stock} 已暂停交易,跳过卖出")
continue
if current_data[stock].low_limit >= current_data[stock].last_price:
self.log.warning(f" 股票 {stock} 已触及跌停,跳过卖出")
continue
# 尝试卖出
# self.position_monitor.increment_attempt(stock)
success = self.strategy.close_position(context, position)
if success:
# self.position_monitor.mark_as_successful(stock)
self.log.info(f"成功卖出股票 {stock}")
else:
self.log.warning(f"卖出股票 {stock} 失败")
def handle(self, context: Any):
pass
def end(self, context: Any):
pass
class process_pending_buy_task(base_task):
"""
处理待买入股票队列
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.today_trade_switch = self.get_config('today_trade_switch')
self.stock_num = self.get_config('stock_num')
self.buy_requests_list = self.strategy.state.get_buy_requests()
self.hold_list = [position.security for position in list(context.portfolio.positions.values())]
self.buy_symbol_money_up_limit = self.get_config('buy_symbol_money_up_limit')
self.buy_symbol_money_total = self.get_config('buy_symbol_money_total')
def run(self, context: Any):
# 如果在特殊月份1月/4月或没有待买入股票直接返回
if self.today_trade_switch or not self.buy_requests_list:
self.log.info("今日非交易日或没有待买入股票,跳过处理")
return
self.log.info("开始处理待买入股票队列")
current_data = DataHelper.get_current_data()
position_count = len(self.hold_list)
target_num = int(self.stock_num)
# 如果持仓已满,不再买入
if position_count >= target_num:
self.log.info("当前持仓已满,清空待买入队列")
self.strategy.state.clear_buy_requests()
return
# 计算可买入的股票数量和分配资金
buy_count = min(len(self.buy_requests_list), target_num - position_count)
if buy_count <= 0:
self.log.info("没有可买入的股票,或持仓已满")
return
# 计算每只股票可分配的资金
if context.portfolio.cash < self.buy_symbol_money_total:
self.log.warning(f"可用资金 {context.portfolio.cash:.2f} 元不足,无法满足总资金要求 {self.buy_symbol_money_total}")
return
try:
value = context.portfolio.cash / buy_count
except ZeroDivisionError:
self.log.error("资金分摊时除零错误")
return
# 确保每只股票有足够的买入金额
if value < self.buy_symbol_money_up_limit: # 假设最小买入金额为5000元
self.log.warning(f"每只股票分配资金不足: {value:.2f}元,取消本次买入")
return
# 逐个买入股票
for idx, stock in enumerate(self.buy_requests_list[:buy_count]):
# 跳过已持仓的股票
if stock in self.hold_list:
self.log.warning(f"股票 {stock} 已在持仓中,跳过买入")
continue
# 检查是否可交易
if current_data[stock].paused:
self.log.warning(f"股票 {stock} 处于停牌状态,跳过买入")
continue
if current_data[stock].high_limit <= current_data[stock].last_price:
self.log.warning(f"股票 {stock} 已涨停,跳过买入")
continue
# 执行买入
if self.strategy.open_position(stock, value):
self.log.info(f"成功买入股票 {stock},分配资金 {value:.2f}")
# 删除购买列表
# 如果持仓已满,停止买入
if len(context.portfolio.positions) >= target_num:
break
def handle(self, context: Any):
pass
def end(self, context: Any):
pass
strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel())
@ -1383,17 +1519,32 @@ def check_state_before_func(context: Any):
task = check_state_before_task(strategy)
task.process(context)
def check_market_env_func(context: Any):
task = check_market_env_task(strategy)
task.process(context)
def check_positions_stop_loss_func(context: Any):
task = check_positions_stop_loss_task(strategy)
task.process(context)
def buy_stocks_func(context: Any):
task = buy_stocks_func_task(strategy)
task.process(context)
def process_pending_sells_func(context: Any):
task = process_pending_sells_task(strategy)
task.process(context)
def process_pending_buys_func(context: Any):
task = process_pending_buy_task(strategy)
task.process(context)
def initialize(context: Any) -> None:
# # 初始化策略参数
strategy.init_strategy() # 策略初始化函数
@ -1402,17 +1553,13 @@ def initialize(context: Any) -> None:
run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态
run_daily(check_market_env_func, time='9:05')
run_daily(check_positions_stop_loss_func, time='9:10')
run_daily(buy_stocks_func, time='9:15') # 每周调仓
#
#
# run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票
# run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓
# run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票
# run_daily(process_pending_sells_func, time='13:05') # 午盘开盘后也处理一次卖出
# run_daily(process_pending_buys_func, time='13:15') # 午盘开盘后也处理一次买入
run_weekly(buy_stocks_func, 2, time='10:15') # 每周调仓
run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票
run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票
# run_daily(trade_afternoon_func, time='14:30')
# run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出
# run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入
run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出
run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入
# run_daily(close_account_func, time='14:50')
# run_weekly(print_position_info_func, 5, time='15:10') # 周五收盘后打印持仓信息
pass