diff --git a/task/check_high_volume_func_task.py b/task/check_high_volume_func_task.py index 45caed2..0658374 100644 --- a/task/check_high_volume_func_task.py +++ b/task/check_high_volume_func_task.py @@ -1,6 +1,7 @@ -from base.base_task import base_task +from typing import Any + from kit.kit import DataHelper -from task.process_pending_sells_func_task import process_pending_sells_func_task +from task.base_task import base_task class check_high_volume_func_task(base_task): @@ -15,24 +16,31 @@ class check_high_volume_func_task(base_task): def __init__(self, strategy, sub_task=None): super().__init__(strategy, sub_task) - def init(self, context): - self.name = "check_high_volume_func_task" - self.remark = "检查高成交量任务" - self.memo = "检查是否有异常成交量的股票" + def config(self, context: Any): + + # HV_control 成交量开关 + self.HV_control = self.get_config('HV_control') + + # HV_duration 成交量持续时间(天) + self.HV_duration = self.get_config('HV_duration') + + # HV_ratio 成交量放大倍数 + self.HV_ratio = self.get_config('HV_ratio') + self.positions = context.portfolio.positions - def begin(self, context): - self.HV_control = bool(self.config['HV_control']) - self.HV_duration = int(self.config['HV_duration']) - self.HV_ratio = float(self.config['HV_ratio']) def run(self, context): current_data = DataHelper.get_current_data() for stock in list(context.portfolio.positions.keys()): + self.log.info(f"检查股票 {stock} 的成交量") if current_data[stock].paused: + self.log.warning(f"股票 {stock} 已暂停交易,跳过检查") continue if current_data[stock].last_price == current_data[stock].high_limit: + self.log.warning(f"股票 {stock} 当前价格已触及涨停,跳过检查") continue if context.portfolio.positions[stock].closeable_amount == 0: + self.log.warning(f"股票 {stock} 没有可卖出数量,跳过检查") continue df_volume = get_bars( stock, @@ -46,11 +54,8 @@ class check_high_volume_func_task(base_task): if df_volume['volume'].iloc[-1] > self.HV_ratio * df_volume['volume'].max(): self.log.info(f"检测到股票 {stock} 出现异常放量,执行卖出操作。") # 通过持仓监控器注册卖出请求,而不是直接卖出 - self.position_monitor.register_sell_request(stock, 'high_volume') + self.strategy.state.set_sell_request(stock, self.positions[stock], 'highvolume') - # 处理所有待卖出请求 - task = process_pending_sells_func_task(self.strategy, self.name) - task.process(context) def handle(self, context): pass diff --git a/task/check_limit_up_func_task.py b/task/check_limit_up_func_task.py index e0df297..1508fd6 100644 --- a/task/check_limit_up_func_task.py +++ b/task/check_limit_up_func_task.py @@ -6,6 +6,11 @@ from task.base_task import base_task class check_limit_up_func_task(base_task): """ + 暂时不用 + 暂时不用 + 暂时不用 + 暂时不用 + 暂时不用 检查昨日处于涨停状态的股票在当前是否破板。 如破板(当前价格低于涨停价),则立即卖出该股票,并记录卖出原因为 "limitup"。 参数: @@ -15,15 +20,9 @@ class check_limit_up_func_task(base_task): def __init__(self, strategy, sub_task=None): super().__init__(strategy, sub_task) - def init(self, context: Any): - self.name = "check_limit_up_task" - self.remark = "检查涨停任务" - self.memo = "检查是否有涨停破板的股票" - - def begin(self, context: Any): + def config(self, context: Any): self.yesterday_HL_list = self.strategy.state.get_yesterday_high_list() - def run(self, context: Any): now_time = context.current_dt # 获取当前时间 if self.yesterday_HL_list: @@ -46,10 +45,6 @@ class check_limit_up_func_task(base_task): else: self.log.info(f"股票 {stock} 仍维持涨停状态。") - # 处理待卖出股票 - # if self.reason_to_sell[0] == 'limitup': - # sells_task = process_pending_sells_func_task(self.strategy, self.name) - # sells_task.process(context) def handle(self, context: Any): pass diff --git a/task/check_remain_amount_func_task.py b/task/check_remain_amount_func_task.py index f268183..5b2ecc7 100644 --- a/task/check_remain_amount_func_task.py +++ b/task/check_remain_amount_func_task.py @@ -7,6 +7,12 @@ from task.process_pending_buys_func_task import process_pending_buys_func_task class check_remain_amount_func_task(base_task): """ + + 暂不启用 + 暂不启用 + 暂不启用 + 暂不启用 + 暂不启用 检查账户资金与持仓数量: 如果因涨停破板卖出导致持仓不足,则从目标股票中筛选未买入股票,进行补仓操作。 参数: diff --git a/task/trade_afternoon_task.py b/task/trade_afternoon_task.py index fc7ae2f..bc24e71 100644 --- a/task/trade_afternoon_task.py +++ b/task/trade_afternoon_task.py @@ -23,24 +23,20 @@ class trade_afternoon_task(base_task): - def begin(self, context): - self.no_trading_today_signal = self.config['no_trading_today_signal'] - self.HV_control = bool(self.config['HV_control']) # 是否启用成交量监控 + def config(self, context): + self.today_trade_switch = self.get_config('today_trade_switch') + self.HV_control = self.get_config('HV_control') # 是否启用成交量监控 def run(self, context): - if not self.no_trading_today_signal: - check_limit_up_task = check_limit_up_func_task(self.strategy, sub_task=self.sub_task) - check_limit_up_task.process(context) + if not self.today_trade_switch: + # check_limit_up_task = check_limit_up_func_task(self.strategy, sub_task=self.sub_task) + # check_limit_up_task.process(context) if self.HV_control: check_high_volume_task = check_high_volume_func_task(self.strategy) check_high_volume_task.process(context) - # 先处理待卖出股票 - process_pending_sells_task = process_pending_sells_task(self.strategy) - process_pending_sells_task.process(context) - # 再检查是否需要补仓 check_remain_amount_task = check_remain_amount_func_task(self.strategy, sub_task=self.sub_task) check_remain_amount_task.process(context) diff --git a/test.py b/test.py index 2063a08..7aa7460 100644 --- a/test.py +++ b/test.py @@ -9,7 +9,6 @@ from typing import Any, Optional, List, Dict import pandas as pd - class network_config: def __init__(self): @@ -204,7 +203,7 @@ class network_config: response.raise_for_status() @staticmethod - def insert_state(strategy_id: str,state_str:str,trade_date:str): + def insert_state(strategy_id: str, state_str: str, trade_date: str): """ :param strategy_id: @@ -294,14 +293,12 @@ class strategy_config: id = self.configkey_id.get(key) self.connection.update_config(id, value) self.config[key]['new_value'] = value + def get_config(self, key: str): func = conf.config_type(key) return func(key) - - - class hold_data: def __init__(self, data: dict): self.stock = None @@ -764,7 +761,6 @@ class DataHelper: return [stock for stock in stock_list if stock not in not_buy_again] - class Logger: """ 简单的日志操作类,支持控制台和文件输出 @@ -1099,8 +1095,6 @@ class base_task: return func(self.configs[key]['new_value'], data_params) - - class check_state_before_task(base_task): """ 重置state信息 @@ -1160,9 +1154,6 @@ class check_market_env_task(base_task): def __init__(self, strategy: trade_strategy): super().__init__(strategy) - - - def config(self, context: Any): self.current_date = context.current_dt.date() @@ -1177,7 +1168,6 @@ class check_market_env_task(base_task): self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号 - def run(self, context: Any): # if self.filter_month_clear_switch and self.in_special_month: @@ -1193,15 +1183,13 @@ class check_market_env_task(base_task): # 如果是特殊月份但没有清仓条件,则继续交易 self.today_trade_switch = True - def handle(self, context: Any): # 保存状态 old_today_trade_switch = self.get_config("today_trade_switch") if old_today_trade_switch != self.today_trade_switch: - self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 + self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}") - def end(self, context: Any): self.log.info("-----------------") @@ -1214,6 +1202,7 @@ class check_market_env_task(base_task): # 今日清仓 + class check_positions_stop_loss_task(base_task): def __init__(self, strategy: trade_strategy): @@ -1302,6 +1291,7 @@ class check_positions_stop_loss_task(base_task): def sell_request(self, stock, data, reason): self.temp_sells_list[f"{stock}_{reason}"] = data + class buy_stocks_func_task(base_task): """ 每周调仓策略: @@ -1321,7 +1311,9 @@ class buy_stocks_func_task(base_task): # 获取当前持仓列表 self.hold_list = self.strategy.state.get_hold_list() self.index_stocks = self.get_config('index_stocks') + # 调仓的目标股票列表排序 self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num) + # 限制目标股票列表长度为 stock_num self.temp_target_list = self.target_list[:self.stock_num] # 昨日涨停股票列表 self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list() @@ -1357,12 +1349,10 @@ class buy_stocks_func_task(base_task): if buy_targets: for stock in buy_targets: pos = context.portfolio.positions.get(stock) - # self.strategy.state.set_buy_request(stock, pos, 'buy') self.buys_request(stock, pos, 'buy') - self.log.info(f"调仓决策:将 {len(buy_targets)} 只股票加入待买入队列: {buy_targets}") + self.log.info(f"调仓决策:将 {stock} 加入待买入队列") def handle(self, context: Any): - self.strategy.set_target_list(self.temp_target_list) pass def end(self, context: Any): @@ -1376,6 +1366,152 @@ class buy_stocks_func_task(base_task): self.temp_buys_list[f"{stock}_{reason}"] = data +class process_pending_sells_task(base_task): + """ + 处理待卖出股票队列 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def config(self, context: Any): + self.sell_request_list = self.strategy.state.get_sell_requests() + + def run(self, context: Any): + # 如果没有待卖出股票,直接返回 + if len(self.sell_request_list) == 0: + self.log.info("没有待卖出股票,跳过处理") + return + + self.log.info("开始处理待卖出股票队列") + current_data = DataHelper.get_current_data() + + for stock in self.sell_request_list: + self.log.info(f"处理待卖出股票: {stock}") + if stock not in context.portfolio.positions: + # 股票已不在持仓中,可能已通过其他方式卖出 + self.log.warning(f" 股票 {stock} 不在当前持仓中,跳过处理") + continue + + position = context.portfolio.positions[stock] + if position.closeable_amount <= 0: + # 没有可卖出数量 + self.log.warning(f" 股票 {stock} 没有可卖出数量,跳过处理") + continue + + # 检查是否可以交易 + if current_data[stock].paused: + self.log.warning(f" 股票 {stock} 已暂停交易,跳过卖出") + continue + + if current_data[stock].low_limit >= current_data[stock].last_price: + self.log.warning(f" 股票 {stock} 已触及跌停,跳过卖出") + continue + + # 尝试卖出 + # self.position_monitor.increment_attempt(stock) + success = self.strategy.close_position(context, position) + + if success: + # self.position_monitor.mark_as_successful(stock) + self.log.info(f"成功卖出股票 {stock}") + else: + self.log.warning(f"卖出股票 {stock} 失败") + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass + + +class process_pending_buy_task(base_task): + """ + 处理待买入股票队列 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def config(self, context: Any): + self.today_trade_switch = self.get_config('today_trade_switch') + self.stock_num = self.get_config('stock_num') + self.buy_requests_list = self.strategy.state.get_buy_requests() + self.hold_list = [position.security for position in list(context.portfolio.positions.values())] + self.buy_symbol_money_up_limit = self.get_config('buy_symbol_money_up_limit') + self.buy_symbol_money_total = self.get_config('buy_symbol_money_total') + + def run(self, context: Any): + # 如果在特殊月份(1月/4月)或没有待买入股票,直接返回 + if self.today_trade_switch or not self.buy_requests_list: + self.log.info("今日非交易日或没有待买入股票,跳过处理") + return + + self.log.info("开始处理待买入股票队列") + current_data = DataHelper.get_current_data() + position_count = len(self.hold_list) + target_num = int(self.stock_num) + + # 如果持仓已满,不再买入 + if position_count >= target_num: + self.log.info("当前持仓已满,清空待买入队列") + self.strategy.state.clear_buy_requests() + return + + # 计算可买入的股票数量和分配资金 + buy_count = min(len(self.buy_requests_list), target_num - position_count) + if buy_count <= 0: + self.log.info("没有可买入的股票,或持仓已满") + return + + # 计算每只股票可分配的资金 + if context.portfolio.cash < self.buy_symbol_money_total: + self.log.warning(f"可用资金 {context.portfolio.cash:.2f} 元不足,无法满足总资金要求 {self.buy_symbol_money_total} 元") + return + + try: + value = context.portfolio.cash / buy_count + except ZeroDivisionError: + self.log.error("资金分摊时除零错误") + return + + # 确保每只股票有足够的买入金额 + if value < self.buy_symbol_money_up_limit: # 假设最小买入金额为5000元 + self.log.warning(f"每只股票分配资金不足: {value:.2f}元,取消本次买入") + return + + # 逐个买入股票 + for idx, stock in enumerate(self.buy_requests_list[:buy_count]): + # 跳过已持仓的股票 + if stock in self.hold_list: + self.log.warning(f"股票 {stock} 已在持仓中,跳过买入") + continue + + # 检查是否可交易 + if current_data[stock].paused: + self.log.warning(f"股票 {stock} 处于停牌状态,跳过买入") + continue + + if current_data[stock].high_limit <= current_data[stock].last_price: + self.log.warning(f"股票 {stock} 已涨停,跳过买入") + continue + + # 执行买入 + if self.strategy.open_position(stock, value): + self.log.info(f"成功买入股票 {stock},分配资金 {value:.2f}") + # 删除购买列表 + + # 如果持仓已满,停止买入 + if len(context.portfolio.positions) >= target_num: + break + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass + + strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel()) @@ -1383,17 +1519,32 @@ def check_state_before_func(context: Any): task = check_state_before_task(strategy) task.process(context) + def check_market_env_func(context: Any): task = check_market_env_task(strategy) task.process(context) + def check_positions_stop_loss_func(context: Any): task = check_positions_stop_loss_task(strategy) task.process(context) + def buy_stocks_func(context: Any): task = buy_stocks_func_task(strategy) task.process(context) + + +def process_pending_sells_func(context: Any): + task = process_pending_sells_task(strategy) + task.process(context) + + +def process_pending_buys_func(context: Any): + task = process_pending_buy_task(strategy) + task.process(context) + + def initialize(context: Any) -> None: # # 初始化策略参数 strategy.init_strategy() # 策略初始化函数 @@ -1402,17 +1553,13 @@ def initialize(context: Any) -> None: run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态 run_daily(check_market_env_func, time='9:05') run_daily(check_positions_stop_loss_func, time='9:10') - run_daily(buy_stocks_func, time='9:15') # 每周调仓 - # - # - # run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 - # run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓 - # run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票 - # run_daily(process_pending_sells_func, time='13:05') # 午盘开盘后也处理一次卖出 - # run_daily(process_pending_buys_func, time='13:15') # 午盘开盘后也处理一次买入 + run_weekly(buy_stocks_func, 2, time='10:15') # 每周调仓 + run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 + run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票 # run_daily(trade_afternoon_func, time='14:30') - # run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出 - # run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入 + + run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出 + run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入 # run_daily(close_account_func, time='14:50') # run_weekly(print_position_info_func, 5, time='15:10') # 周五收盘后打印持仓信息 pass