This commit is contained in:
summer 2025-07-06 17:42:06 +08:00
parent 835f3ddb45
commit 1693a404f1
7 changed files with 275 additions and 23 deletions

View File

@ -1,3 +1,5 @@
from typing import Any
from config.strategy_state import state from config.strategy_state import state
from kit.conf import conf from kit.conf import conf
from kit.logger import Logger from kit.logger import Logger
@ -60,7 +62,7 @@ class strategy_config:
""" """
return self.config return self.config
def set_config(self, key: str, value: str) -> None: def set_config(self, key: str, value: Any) -> None:
""" """
通过字典语法设置配置项并自动触发数据库更新 通过字典语法设置配置项并自动触发数据库更新
@ -70,7 +72,7 @@ class strategy_config:
""" """
id = self.configkey_id.get(key) id = self.configkey_id.get(key)
self.connection.update_config(id, value) self.connection.update_config(id, value)
self.config[key]['new_value'] = value
def get_config(self, key: str): def get_config(self, key: str):
func = conf.config_type(key) func = conf.config_type(key)
return func(key) return func(key)

View File

@ -17,9 +17,8 @@ class conf:
return bool(value) return bool(value)
@staticmethod @staticmethod
def to_list(value: str, params, separator: str = '_'): def to_list(value: str, separator: str = '_'):
params_list = params.split(separator, -1) return value.split(separator)
return value.split(params_list)
@staticmethod @staticmethod
def config_type(value: str): def config_type(value: str):
@ -33,7 +32,9 @@ class conf:
return data[value] return data[value]
ddd = conf.config_type()
func = ddd["int"] ddd = conf.config_type("list")
print(func("123")) # 输出: 123 print(ddd("1_2_3", "_")) # 输出: 123
# "1,2,3".split(",", -1)

View File

@ -41,6 +41,6 @@ class base_task:
def get_config(self, key: str): def get_config(self, key: str):
configs = self.configs.get(key) configs = self.configs.get(key)
data_type = configs['data_type'] data_type = configs['data_type']
data_params = configs['data_params'] data_params = configs['data_param']
func = conf.config_type(data_type) func = conf.config_type(data_type)
return func(self.configs[key]['new_value'], data_params) return func(self.configs[key]['new_value'], data_params)

View File

@ -41,7 +41,7 @@ class buy_stocks_func_task(base_task):
# 重置当天已买入记录 # 重置当天已买入记录
# self.position_manager.reset_not_buy_again() # self.position_manager.reset_not_buy_again()
# 取目标持仓数以内的股票作为调仓目标 # 取目标持仓数以内的股票作为调仓目标
self.log.info(f"每周调仓目标股票: {self.temp_target_list}") # self.log.info(f"每周调仓目标股票: {self.temp_target_list}")
# 遍历当前持仓,若股票不在目标列表,则执行卖出操作 # 遍历当前持仓,若股票不在目标列表,则执行卖出操作
for stock in self.hold_list: for stock in self.hold_list:

View File

@ -51,7 +51,7 @@ class check_market_env_task(base_task):
def handle(self, context: Any): def handle(self, context: Any):
# 保存状态 # 保存状态
old_today_trade_switch = self.strategy_config.get_config("today_trade_switch") old_today_trade_switch = self.get_config("today_trade_switch")
if old_today_trade_switch != self.today_trade_switch: if old_today_trade_switch != self.today_trade_switch:
self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量
self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}") self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}")

View File

@ -66,15 +66,28 @@ class check_positions_stop_loss_task(base_task):
self.sell_request(stock, pos, 'market_stop_loss') self.sell_request(stock, pos, 'market_stop_loss')
def handle(self, context: Any): def handle(self, context: Any):
# 将出售的数据传递给state # 保存状态
for key, value in self.temp_sells_list.items():
stock, reason = key.split('_')
self.strategy.state.set_sell_request(stock, value, reason)
pass pass
def end(self, context: Any): def end(self, context: Any):
self.log.info("-----------------") self.log.info("-----------------")
self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用")) self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用"))
self.log.info("今日交易开关:{}") self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易"))
self.log.info("止损策略方式:{}") self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if
self.stoploss_strategy == 3 else "异常"))
self.log.info("当前卖出请求") self.log.info("当前卖出请求")
for key, value in self.temp_sells_list.items():
self.log.info(f" {key}: {value}")
# 总卖出请求
sells_list = self.strategy.state.get_sell_requests()
if len(sells_list) > 0:
self.log.info("总卖出请求:")
for stock, data in sells_list.items():
self.log.info(f" {stock}: {data}")
pass pass
def sell_request(self, stock, data, reason): def sell_request(self, stock, data, reason):

254
test.py
View File

@ -1,7 +1,7 @@
import logging import logging
import os import os
from datetime import timedelta from datetime import timedelta
from datetime import datetime import datetime
import requests import requests
from typing import Any, Optional, List, Dict from typing import Any, Optional, List, Dict
@ -293,7 +293,7 @@ class strategy_config:
""" """
id = self.configkey_id.get(key) id = self.configkey_id.get(key)
self.connection.update_config(id, value) self.connection.update_config(id, value)
self.config[key]['new_value'] = value
def get_config(self, key: str): def get_config(self, key: str):
func = conf.config_type(key) func = conf.config_type(key)
return func(key) return func(key)
@ -459,9 +459,8 @@ class conf:
return bool(value) return bool(value)
@staticmethod @staticmethod
def to_list(value: str, params, separator: str = '_'): def to_list(value: str, separator: str = '_'):
params_list = params.split(separator, -1) return value.split(separator)
return value.split(params_list)
@staticmethod @staticmethod
def config_type(value: str): def config_type(value: str):
@ -1095,7 +1094,7 @@ class base_task:
def get_config(self, key: str): def get_config(self, key: str):
configs = self.configs.get(key) configs = self.configs.get(key)
data_type = configs['data_type'] data_type = configs['data_type']
data_params = configs['data_params'] data_params = configs['data_param']
func = conf.config_type(data_type) func = conf.config_type(data_type)
return func(self.configs[key]['new_value'], data_params) return func(self.configs[key]['new_value'], data_params)
@ -1150,6 +1149,233 @@ class check_state_before_task(base_task):
self.strategy.state.set_low_stock(stock['code'], stock) self.strategy.state.set_low_stock(stock['code'], stock)
class check_market_env_task(base_task):
"""
交易环境确认
1判断是否为特殊月份1/4
2如果是特殊月份且通过了清仓检查则清仓所有持仓
3如果不是特殊月份且没有交易信号则重置交易信号
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.current_date = context.current_dt.date()
self.current_month = self.current_date.month
self.current_day = self.current_date.day
# 判断是否执行空仓策略
self.filter_month_clear_switch = self.get_config('filter_month_clear_switch')
self.filer_month_data = self.get_config('filter_month_data')
# 判断是否为特殊月 = True时 等于 执行1月/4月 全部清仓
self.in_special_month = str(self.current_month) in self.filer_month_data
self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号
def run(self, context: Any):
#
if self.filter_month_clear_switch and self.in_special_month:
# 进入特殊月份,应清仓
for stock in self.strategy.state.data['position_list'].keys():
self.strategy.state.set_sell_request(stock, self.strategy.state.data['position_list'][stock], f"enter_{self.current_month}_month")
# 清仓后就不交易
self.today_trade_switch = False
elif not self.filter_month_clear_switch:
# 不在特殊月 或 不执行空仓策略 可以交易
self.today_trade_switch = True
else:
# 如果是特殊月份但没有清仓条件,则继续交易
self.today_trade_switch = True
def handle(self, context: Any):
# 保存状态
old_today_trade_switch = self.get_config("today_trade_switch")
if old_today_trade_switch != self.today_trade_switch:
self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量
self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}")
def end(self, context: Any):
self.log.info("-----------------")
self.log.info("一.今日环境确认:")
self.log.info(" 当前日期: {}".format(self.current_date))
self.log.info(" 空仓策略: {}".format("开启" if self.filter_month_clear_switch else "关闭"))
self.log.info(" 今日交易: {}".format("交易" if self.today_trade_switch else "清仓"))
self.log.info(" 特殊月份: {}".format(",".join(str(item) for item in self.filer_month_data)))
self.log.info(" 当前月份: {}".format(self.current_month))
# 今日清仓
class check_positions_stop_loss_task(base_task):
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略
self.hold_list = self.strategy.state.data['position_list'] # 持仓列表
self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关
self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略
self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值
self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值
# self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因
self.index_stocks = self.get_config('index_stocks') # 指数代码
self.temp_sells_list = {}
def run(self, context: Any):
if len(self.hold_list) == 0:
self.log.debug("当前无持仓,跳过止损检查。")
return
if self.stoploss_switch and self.today_trade_switch:
if self.stoploss_strategy == 1 or self.stoploss_strategy == 3:
# 个股止盈或止损判断
for stock in list(context.portfolio.positions.keys()):
pos = context.portfolio.positions[stock]
if pos.price >= pos.avg_cost * 2:
# 通过持仓监控器注册卖出请求,而不是直接卖出
# self.strategy.state.set_sell_request(stock, pos, 'take_profit')
self.sell_request(stock, pos, 'take_profit')
self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。")
elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data:
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.strategy.state.set_sell_request(stock, pos, 'stop_loss')
self.sell_request(stock, pos, 'take_profit')
self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。")
# self.strategy.state.set_sell_reason("stoploss")
if self.stoploss_strategy == 2 or self.stoploss_strategy == 3:
# 大盘止损判断,若整体市场跌幅过大则平仓所有股票
stock_list = DataHelper.get_index_stocks(self.index_stocks)
df = DataHelper.get_price_safe(
stock_list,
end_date=context.previous_date,
frequency='daily',
fields=['close', 'open'],
count=1,
panel=False
)
if df is not None and not df.empty:
down_ratio = (df['close'] / df['open']).mean()
if down_ratio <= self.stoploss_market_rate_data:
# self.strategy.state.set_sell_reason("stoploss")
self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。")
for stock in list(context.portfolio.positions.keys()):
pos = context.portfolio.positions[stock]
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.sell_request(stock, pos, 'market_stop_loss')
def handle(self, context: Any):
# 保存状态
for key, value in self.temp_sells_list.items():
stock, reason = key.split('_')
self.strategy.state.set_sell_request(stock, value, reason)
pass
def end(self, context: Any):
self.log.info("-----------------")
self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用"))
self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易"))
self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if
self.stoploss_strategy == 3 else "异常"))
self.log.info("当前卖出请求")
for key, value in self.temp_sells_list.items():
self.log.info(f" {key}: {value}")
# 总卖出请求
sells_list = self.strategy.state.get_sell_requests()
if len(sells_list) > 0:
self.log.info("总卖出请求:")
for stock, data in sells_list.items():
self.log.info(f" {stock}: {data}")
pass
def sell_request(self, stock, data, reason):
self.temp_sells_list[f"{stock}_{reason}"] = data
class buy_stocks_func_task(base_task):
"""
每周调仓策略
如果非空仓日先选股得到目标股票列表再卖出当前持仓中不在目标列表且昨日未涨停的股票
最后买入目标股票同时记录当天买入情况避免重复下单
参数:
context: 聚宽平台传入的交易上下文对象
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
# 每次调仓的目标股票数量
self.stock_num = self.get_config('stock_num')
# 获取当前持仓列表
self.hold_list = self.strategy.state.get_hold_list()
self.index_stocks = self.get_config('index_stocks')
self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num)
self.temp_target_list = self.target_list[:self.stock_num]
# 昨日涨停股票列表
self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list()
self.today_trade_switch = self.get_config('today_trade_switch')
self.temp_sells_list = {}
self.temp_buys_list = {}
def run(self, context: Any):
if not self.today_trade_switch:
self.log.info("今日非交易日,跳过调仓操作。")
return
# 重置当天已买入记录
# self.position_manager.reset_not_buy_again()
# 取目标持仓数以内的股票作为调仓目标
self.log.info(f"每周调仓目标股票: {self.temp_target_list}")
# 遍历当前持仓,若股票不在目标列表,则执行卖出操作
for stock in self.hold_list:
if stock not in self.temp_target_list:
pos = context.portfolio.positions.get(stock)
self.log.info(f"调仓决策:卖出股票 {stock}")
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.sell_request(stock, pos, 'rebalance')
# self.strategy.state.set_sell_request(stock, pos, 'rebalance')
self.sell_request(stock, pos, 'rebalance')
else:
self.log.info(f"调仓决策:继续持有股票 {stock}")
# 对目标股票执行买入操作前,先将其注册到待买入队列
buy_targets = [stock for stock in self.temp_target_list if stock not in self.hold_list]
if buy_targets:
for stock in buy_targets:
pos = context.portfolio.positions.get(stock)
# self.strategy.state.set_buy_request(stock, pos, 'buy')
self.buys_request(stock, pos, 'buy')
self.log.info(f"调仓决策:将 {len(buy_targets)} 只股票加入待买入队列: {buy_targets}")
def handle(self, context: Any):
self.strategy.set_target_list(self.temp_target_list)
pass
def end(self, context: Any):
pass
# 这里可以添加任何必要的清理或总结操作
def sell_request(self, stock, data, reason):
self.temp_sells_list[f"{stock}_{reason}"] = data
def buys_request(self, stock, data, reason):
self.temp_buys_list[f"{stock}_{reason}"] = data
strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel()) strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel())
@ -1157,17 +1383,27 @@ def check_state_before_func(context: Any):
task = check_state_before_task(strategy) task = check_state_before_task(strategy)
task.process(context) task.process(context)
def check_market_env_func(context: Any):
task = check_market_env_task(strategy)
task.process(context)
def check_positions_stop_loss_func(context: Any):
task = check_positions_stop_loss_task(strategy)
task.process(context)
def buy_stocks_func(context: Any):
task = buy_stocks_func_task(strategy)
task.process(context)
def initialize(context: Any) -> None: def initialize(context: Any) -> None:
# # 初始化策略参数 # # 初始化策略参数
strategy.init_strategy() # 策略初始化函数 strategy.init_strategy() # 策略初始化函数
# # 注册调度任务 # # 注册调度任务
run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态 run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态
# run_daily(check_holdings_yesterday_func, time='9:00') run_daily(check_market_env_func, time='9:05')
# run_daily(prepare_stock_list_func, time='9:05') run_daily(check_positions_stop_loss_func, time='9:10')
run_daily(buy_stocks_func, time='9:15') # 每周调仓
# #
# run_daily(sell_stocks_func, time='10:00') # 每日检查止损条件
# #
# run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 # run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票
# run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓 # run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓