diff --git a/config/strategy_config.py b/config/strategy_config.py index d0434e3..311f4ab 100644 --- a/config/strategy_config.py +++ b/config/strategy_config.py @@ -1,3 +1,5 @@ +from typing import Any + from config.strategy_state import state from kit.conf import conf from kit.logger import Logger @@ -60,7 +62,7 @@ class strategy_config: """ return self.config - def set_config(self, key: str, value: str) -> None: + def set_config(self, key: str, value: Any) -> None: """ 通过字典语法设置配置项,并自动触发数据库更新 @@ -70,7 +72,7 @@ class strategy_config: """ id = self.configkey_id.get(key) self.connection.update_config(id, value) - + self.config[key]['new_value'] = value def get_config(self, key: str): func = conf.config_type(key) return func(key) diff --git a/kit/conf.py b/kit/conf.py index a2951bd..0ae4902 100644 --- a/kit/conf.py +++ b/kit/conf.py @@ -17,9 +17,8 @@ class conf: return bool(value) @staticmethod - def to_list(value: str, params, separator: str = '_'): - params_list = params.split(separator, -1) - return value.split(params_list) + def to_list(value: str, separator: str = '_'): + return value.split(separator) @staticmethod def config_type(value: str): @@ -33,7 +32,9 @@ class conf: return data[value] -ddd = conf.config_type() -func = ddd["int"] -print(func("123")) # 输出: 123 +ddd = conf.config_type("list") +print(ddd("1_2_3", "_")) # 输出: 123 + + +# "1,2,3".split(",", -1) \ No newline at end of file diff --git a/task/base_task.py b/task/base_task.py index a81c4af..e45e399 100644 --- a/task/base_task.py +++ b/task/base_task.py @@ -41,6 +41,6 @@ class base_task: def get_config(self, key: str): configs = self.configs.get(key) data_type = configs['data_type'] - data_params = configs['data_params'] + data_params = configs['data_param'] func = conf.config_type(data_type) return func(self.configs[key]['new_value'], data_params) diff --git a/task/buy_stocks_func_task.py b/task/buy_stocks_func_task.py index 8303401..896ef4e 100644 --- a/task/buy_stocks_func_task.py +++ b/task/buy_stocks_func_task.py @@ -41,7 +41,7 @@ class buy_stocks_func_task(base_task): # 重置当天已买入记录 # self.position_manager.reset_not_buy_again() # 取目标持仓数以内的股票作为调仓目标 - self.log.info(f"每周调仓目标股票: {self.temp_target_list}") + # self.log.info(f"每周调仓目标股票: {self.temp_target_list}") # 遍历当前持仓,若股票不在目标列表,则执行卖出操作 for stock in self.hold_list: diff --git a/task/check_market_env_task.py b/task/check_market_env_task.py index e563f42..e59bae4 100644 --- a/task/check_market_env_task.py +++ b/task/check_market_env_task.py @@ -51,7 +51,7 @@ class check_market_env_task(base_task): def handle(self, context: Any): # 保存状态 - old_today_trade_switch = self.strategy_config.get_config("today_trade_switch") + old_today_trade_switch = self.get_config("today_trade_switch") if old_today_trade_switch != self.today_trade_switch: self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}") diff --git a/task/check_positions_stop_loss_task.py b/task/check_positions_stop_loss_task.py index 869d47c..26eb16e 100644 --- a/task/check_positions_stop_loss_task.py +++ b/task/check_positions_stop_loss_task.py @@ -66,15 +66,28 @@ class check_positions_stop_loss_task(base_task): self.sell_request(stock, pos, 'market_stop_loss') def handle(self, context: Any): - # 将出售的数据传递给state + # 保存状态 + for key, value in self.temp_sells_list.items(): + stock, reason = key.split('_') + self.strategy.state.set_sell_request(stock, value, reason) pass def end(self, context: Any): self.log.info("-----------------") self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用")) - self.log.info("今日交易开关:{}") - self.log.info("止损策略方式:{}") + self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易")) + self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if + self.stoploss_strategy == 3 else "异常")) self.log.info("当前卖出请求") + for key, value in self.temp_sells_list.items(): + self.log.info(f" {key}: {value}") + + # 总卖出请求 + sells_list = self.strategy.state.get_sell_requests() + if len(sells_list) > 0: + self.log.info("总卖出请求:") + for stock, data in sells_list.items(): + self.log.info(f" {stock}: {data}") pass def sell_request(self, stock, data, reason): diff --git a/test.py b/test.py index 60e2eca..14f29eb 100644 --- a/test.py +++ b/test.py @@ -1,7 +1,7 @@ import logging import os from datetime import timedelta -from datetime import datetime +import datetime import requests from typing import Any, Optional, List, Dict @@ -293,7 +293,7 @@ class strategy_config: """ id = self.configkey_id.get(key) self.connection.update_config(id, value) - + self.config[key]['new_value'] = value def get_config(self, key: str): func = conf.config_type(key) return func(key) @@ -459,9 +459,8 @@ class conf: return bool(value) @staticmethod - def to_list(value: str, params, separator: str = '_'): - params_list = params.split(separator, -1) - return value.split(params_list) + def to_list(value: str, separator: str = '_'): + return value.split(separator) @staticmethod def config_type(value: str): @@ -1095,7 +1094,7 @@ class base_task: def get_config(self, key: str): configs = self.configs.get(key) data_type = configs['data_type'] - data_params = configs['data_params'] + data_params = configs['data_param'] func = conf.config_type(data_type) return func(self.configs[key]['new_value'], data_params) @@ -1150,6 +1149,233 @@ class check_state_before_task(base_task): self.strategy.state.set_low_stock(stock['code'], stock) +class check_market_env_task(base_task): + """ + 交易环境确认 + 1、判断是否为特殊月份(1月/4月) + 2、如果是特殊月份且通过了清仓检查,则清仓所有持仓 + 3、如果不是特殊月份且没有交易信号,则重置交易信号 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + + + + def config(self, context: Any): + + self.current_date = context.current_dt.date() + self.current_month = self.current_date.month + self.current_day = self.current_date.day + + # 判断是否执行空仓策略 + self.filter_month_clear_switch = self.get_config('filter_month_clear_switch') + self.filer_month_data = self.get_config('filter_month_data') + # 判断是否为特殊月 = True时 等于 执行1月/4月 全部清仓 + self.in_special_month = str(self.current_month) in self.filer_month_data + + self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号 + + + def run(self, context: Any): + # + if self.filter_month_clear_switch and self.in_special_month: + # 进入特殊月份,应清仓 + for stock in self.strategy.state.data['position_list'].keys(): + self.strategy.state.set_sell_request(stock, self.strategy.state.data['position_list'][stock], f"enter_{self.current_month}_month") + # 清仓后就不交易 + self.today_trade_switch = False + elif not self.filter_month_clear_switch: + # 不在特殊月 或 不执行空仓策略 可以交易 + self.today_trade_switch = True + else: + # 如果是特殊月份但没有清仓条件,则继续交易 + self.today_trade_switch = True + + + def handle(self, context: Any): + # 保存状态 + old_today_trade_switch = self.get_config("today_trade_switch") + if old_today_trade_switch != self.today_trade_switch: + self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 + self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}") + + + def end(self, context: Any): + + self.log.info("-----------------") + self.log.info("一.今日环境确认:") + self.log.info(" 当前日期: {}".format(self.current_date)) + self.log.info(" 空仓策略: {}".format("开启" if self.filter_month_clear_switch else "关闭")) + self.log.info(" 今日交易: {}".format("交易" if self.today_trade_switch else "清仓")) + self.log.info(" 特殊月份: {}".format(",".join(str(item) for item in self.filer_month_data))) + self.log.info(" 当前月份: {}".format(self.current_month)) + + # 今日清仓 + +class check_positions_stop_loss_task(base_task): + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def config(self, context: Any): + + self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略 + self.hold_list = self.strategy.state.data['position_list'] # 持仓列表 + self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关 + self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略 + self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值 + self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值 + # self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因 + self.index_stocks = self.get_config('index_stocks') # 指数代码 + self.temp_sells_list = {} + + def run(self, context: Any): + if len(self.hold_list) == 0: + self.log.debug("当前无持仓,跳过止损检查。") + return + + if self.stoploss_switch and self.today_trade_switch: + if self.stoploss_strategy == 1 or self.stoploss_strategy == 3: + # 个股止盈或止损判断 + for stock in list(context.portfolio.positions.keys()): + pos = context.portfolio.positions[stock] + if pos.price >= pos.avg_cost * 2: + # 通过持仓监控器注册卖出请求,而不是直接卖出 + # self.strategy.state.set_sell_request(stock, pos, 'take_profit') + self.sell_request(stock, pos, 'take_profit') + self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。") + elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data: + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.strategy.state.set_sell_request(stock, pos, 'stop_loss') + self.sell_request(stock, pos, 'take_profit') + self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。") + # self.strategy.state.set_sell_reason("stoploss") + + if self.stoploss_strategy == 2 or self.stoploss_strategy == 3: + # 大盘止损判断,若整体市场跌幅过大则平仓所有股票 + stock_list = DataHelper.get_index_stocks(self.index_stocks) + df = DataHelper.get_price_safe( + stock_list, + end_date=context.previous_date, + frequency='daily', + fields=['close', 'open'], + count=1, + panel=False + ) + if df is not None and not df.empty: + down_ratio = (df['close'] / df['open']).mean() + if down_ratio <= self.stoploss_market_rate_data: + # self.strategy.state.set_sell_reason("stoploss") + self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。") + for stock in list(context.portfolio.positions.keys()): + pos = context.portfolio.positions[stock] + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.sell_request(stock, pos, 'market_stop_loss') + + def handle(self, context: Any): + # 保存状态 + for key, value in self.temp_sells_list.items(): + stock, reason = key.split('_') + self.strategy.state.set_sell_request(stock, value, reason) + pass + + def end(self, context: Any): + self.log.info("-----------------") + self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用")) + self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易")) + self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if + self.stoploss_strategy == 3 else "异常")) + self.log.info("当前卖出请求") + for key, value in self.temp_sells_list.items(): + self.log.info(f" {key}: {value}") + + # 总卖出请求 + sells_list = self.strategy.state.get_sell_requests() + if len(sells_list) > 0: + self.log.info("总卖出请求:") + for stock, data in sells_list.items(): + self.log.info(f" {stock}: {data}") + pass + + def sell_request(self, stock, data, reason): + self.temp_sells_list[f"{stock}_{reason}"] = data + +class buy_stocks_func_task(base_task): + """ + 每周调仓策略: + 如果非空仓日,先选股得到目标股票列表,再卖出当前持仓中不在目标列表且昨日未涨停的股票, + 最后买入目标股票,同时记录当天买入情况避免重复下单。 + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def config(self, context: Any): + + # 每次调仓的目标股票数量 + self.stock_num = self.get_config('stock_num') + # 获取当前持仓列表 + self.hold_list = self.strategy.state.get_hold_list() + self.index_stocks = self.get_config('index_stocks') + self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num) + self.temp_target_list = self.target_list[:self.stock_num] + # 昨日涨停股票列表 + self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list() + self.today_trade_switch = self.get_config('today_trade_switch') + self.temp_sells_list = {} + self.temp_buys_list = {} + + def run(self, context: Any): + + if not self.today_trade_switch: + self.log.info("今日非交易日,跳过调仓操作。") + return + + # 重置当天已买入记录 + # self.position_manager.reset_not_buy_again() + # 取目标持仓数以内的股票作为调仓目标 + self.log.info(f"每周调仓目标股票: {self.temp_target_list}") + + # 遍历当前持仓,若股票不在目标列表,则执行卖出操作 + for stock in self.hold_list: + if stock not in self.temp_target_list: + pos = context.portfolio.positions.get(stock) + self.log.info(f"调仓决策:卖出股票 {stock}") + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.sell_request(stock, pos, 'rebalance') + # self.strategy.state.set_sell_request(stock, pos, 'rebalance') + self.sell_request(stock, pos, 'rebalance') + else: + self.log.info(f"调仓决策:继续持有股票 {stock}") + + # 对目标股票执行买入操作前,先将其注册到待买入队列 + buy_targets = [stock for stock in self.temp_target_list if stock not in self.hold_list] + if buy_targets: + for stock in buy_targets: + pos = context.portfolio.positions.get(stock) + # self.strategy.state.set_buy_request(stock, pos, 'buy') + self.buys_request(stock, pos, 'buy') + self.log.info(f"调仓决策:将 {len(buy_targets)} 只股票加入待买入队列: {buy_targets}") + + def handle(self, context: Any): + self.strategy.set_target_list(self.temp_target_list) + pass + + def end(self, context: Any): + pass + # 这里可以添加任何必要的清理或总结操作 + + def sell_request(self, stock, data, reason): + self.temp_sells_list[f"{stock}_{reason}"] = data + + def buys_request(self, stock, data, reason): + self.temp_buys_list[f"{stock}_{reason}"] = data + + strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel()) @@ -1157,17 +1383,27 @@ def check_state_before_func(context: Any): task = check_state_before_task(strategy) task.process(context) +def check_market_env_func(context: Any): + task = check_market_env_task(strategy) + task.process(context) +def check_positions_stop_loss_func(context: Any): + task = check_positions_stop_loss_task(strategy) + task.process(context) + +def buy_stocks_func(context: Any): + task = buy_stocks_func_task(strategy) + task.process(context) def initialize(context: Any) -> None: # # 初始化策略参数 strategy.init_strategy() # 策略初始化函数 # # 注册调度任务 run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态 - # run_daily(check_holdings_yesterday_func, time='9:00') - # run_daily(prepare_stock_list_func, time='9:05') + run_daily(check_market_env_func, time='9:05') + run_daily(check_positions_stop_loss_func, time='9:10') + run_daily(buy_stocks_func, time='9:15') # 每周调仓 # - # run_daily(sell_stocks_func, time='10:00') # 每日检查止损条件 # # run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 # run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓