commit 51ca0877d942724573c9f45b2047020b10c143a6 Author: summer Date: Thu Jul 3 23:39:31 2025 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/logs/app.log b/config/logs/app.log new file mode 100644 index 0000000..36cc899 --- /dev/null +++ b/config/logs/app.log @@ -0,0 +1,92 @@ +2025-06-26 01:28:57 [INFO] 初始化策略配置 +2025-06-26 01:28:59 [INFO] 注册策略序列:10000112 +2025-06-26 01:28:59 [INFO] 确认配置参数: +2025-06-26 01:28:59 [INFO] 确认配置参数完毕, 如有错误请立刻停止执行 +2025-06-26 01:33:36 [INFO] 初始化策略配置 +2025-06-26 01:33:38 [INFO] 注册策略序列:10000114 +2025-06-26 01:33:38 [INFO] 确认配置参数: +2025-06-26 01:33:38 [INFO] 参数名: stock_num, 值: 9, 备注: 买入股票的数量 +2025-06-26 01:33:38 [INFO] 参数名: pass_month, 值: 1,4, 备注: 特殊月过滤, 1,4等于1月和4月 +2025-06-26 01:33:38 [INFO] 参数名: key3, 值: new_value, 备注: None +2025-06-26 01:33:38 [INFO] 参数名: key4, 值: d, 备注: None +2025-06-26 01:33:38 [INFO] 参数名: example_key, 值: new_value, 备注: None +2025-06-26 01:33:38 [INFO] 参数名: ke3, 值: new_value, 备注: None +2025-06-26 01:33:38 [INFO] 参数名: run_stoploss, 值: True, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: stoploss_strategy, 值: 3, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: stoploss_limit, 值: 0.88, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: stoploss_market, 值: 0.94, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: index_stocks, 值: 399101.XSHE, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: no_trading_today_signal, 值: True, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: HV_control, 值: False, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: HV_duration, 值: 120, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: HV_ratio, 值: 0.9, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: benchmark_stock, 值: 000001.XSHG, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: pass_april, 值: True, 备注: None +2025-06-26 01:33:39 [INFO] 参数名: in_no_trading_period, 值: False, 备注: None +2025-06-26 01:33:39 [INFO] 确认配置参数完毕, 如有错误请立刻停止执行 +2025-06-26 01:37:44 [INFO] 初始化策略配置 +2025-06-26 01:37:44 [INFO] 注册策略序列:10000115 +2025-06-26 01:37:44 [INFO] 确认配置参数: +2025-06-26 01:37:44 [INFO] 参数名: stock_num, 值: 9, 备注: 买入股票的数量 +2025-06-26 01:37:44 [INFO] 参数名: pass_month, 值: 1,4, 备注: 特殊月过滤, 1,4等于1月和4月 +2025-06-26 01:37:44 [INFO] 参数名: key3, 值: new_value, 备注: None +2025-06-26 01:37:44 [INFO] 参数名: key4, 值: d, 备注: None +2025-06-26 01:37:44 [INFO] 参数名: example_key, 值: new_value, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: ke3, 值: new_value, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: run_stoploss, 值: True, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: stoploss_strategy, 值: 3, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: stoploss_limit, 值: 0.88, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: stoploss_market, 值: 0.94, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: index_stocks, 值: 399101.XSHE, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: no_trading_today_signal, 值: True, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: HV_control, 值: False, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: HV_duration, 值: 120, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: HV_ratio, 值: 0.9, 备注: None +2025-06-26 01:37:45 [INFO] 参数名: benchmark_stock, 值: 000001.XSHG, 备注: None +2025-06-26 01:37:46 [INFO] 参数名: pass_april, 值: True, 备注: None +2025-06-26 01:37:46 [INFO] 参数名: in_no_trading_period, 值: False, 备注: None +2025-06-26 01:37:46 [INFO] 确认配置参数完毕, 如有错误请立刻停止执行 +2025-06-26 01:38:26 [INFO] 初始化策略配置 +2025-06-26 01:38:26 [INFO] 注册策略序列:10000116 +2025-06-26 01:38:27 [INFO] 确认配置参数: +2025-06-26 01:38:27 [INFO] 参数名: stock_num, 值: 9, 备注: 买入股票的数量 +2025-06-26 01:38:27 [INFO] 参数名: pass_month, 值: 1,4, 备注: 特殊月过滤, 1,4等于1月和4月 +2025-06-26 01:38:27 [INFO] 参数名: key3, 值: new_value, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: key4, 值: d, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: example_key, 值: new_value, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: ke3, 值: new_value, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: run_stoploss, 值: True, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: stoploss_strategy, 值: 3, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: stoploss_limit, 值: 0.88, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: stoploss_market, 值: 0.94, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: index_stocks, 值: 399101.XSHE, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: no_trading_today_signal, 值: True, 备注: None +2025-06-26 01:38:27 [INFO] 参数名: HV_control, 值: False, 备注: None +2025-06-26 01:38:28 [INFO] 参数名: HV_duration, 值: 120, 备注: None +2025-06-26 01:38:28 [INFO] 参数名: HV_ratio, 值: 0.9, 备注: None +2025-06-26 01:38:28 [INFO] 参数名: benchmark_stock, 值: 000001.XSHG, 备注: None +2025-06-26 01:38:28 [INFO] 参数名: pass_april, 值: True, 备注: None +2025-06-26 01:38:28 [INFO] 参数名: in_no_trading_period, 值: False, 备注: None +2025-06-26 01:38:28 [INFO] 确认配置参数完毕, 如有错误请立刻停止执行 +2025-06-26 21:39:32 [INFO] 初始化策略配置 +2025-06-26 21:39:32 [INFO] 注册策略序列:10000117 +2025-06-26 21:42:11 [INFO] 确认配置参数: +2025-06-26 21:42:52 [INFO] 参数名: stock_num, 值: 9, 备注: 买入股票的数量 +2025-06-26 21:42:52 [INFO] 参数名: pass_month, 值: 1,4, 备注: 特殊月过滤, 1,4等于1月和4月 +2025-06-26 21:42:52 [INFO] 参数名: key3, 值: new_value, 备注: None +2025-06-26 21:42:52 [INFO] 参数名: key4, 值: d, 备注: None +2025-06-26 21:42:52 [INFO] 参数名: example_key, 值: new_value, 备注: None +2025-06-26 21:42:52 [INFO] 参数名: ke3, 值: new_value, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: run_stoploss, 值: True, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: stoploss_strategy, 值: 3, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: stoploss_limit, 值: 0.88, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: stoploss_market, 值: 0.94, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: index_stocks, 值: 399101.XSHE, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: no_trading_today_signal, 值: True, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: HV_control, 值: False, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: HV_duration, 值: 120, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: HV_ratio, 值: 0.9, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: benchmark_stock, 值: 000001.XSHG, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: pass_april, 值: True, 备注: None +2025-06-26 21:42:53 [INFO] 参数名: in_no_trading_period, 值: False, 备注: None +2025-06-26 21:42:53 [INFO] 确认配置参数完毕, 如有错误请立刻停止执行 diff --git a/config/network_config.py b/config/network_config.py new file mode 100644 index 0000000..7f7ee8d --- /dev/null +++ b/config/network_config.py @@ -0,0 +1,226 @@ +import datetime + +import requests + + +class network_config: + + def __init__(self): + """ + 初始化网络配置 + """ + self.BASE_URL = "http://101.34.79.70:8081" # 替换为你的 NocoDB 地址 + API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 + self.headers = { + "xc-token": API_TOKEN + } + + self.project_table = { + "strategy_config": "mw1zi3dr5sy03em", # 查看项目URL获取 + "strategy_base": "mtvgo6pbrlmbgjc", + "strategy_config_template": "m4nty4o5t6sxzts", + "strategy_logs": "m8ebfypvbfpk38p", + "today_state": "mosii8hqry8xx9b" + } + + pass + + def register_strategy(self, strategy_name: str, template_id: str = ""): + """ + 注册策略 + :param strategy_name: 策略名称 + :return: None + """ + PROJECT_ID = self.project_table['strategy_base'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + data = { + + "text": strategy_name, + "memo": template_id, + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + "modify_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + + } + response = requests.post(url, headers=self.headers, data=data) + response.raise_for_status() # 检查错误 + result = response.json() + template_list = self.get_template_list(template_id) + for template in template_list: + self.insert_config(result["id"], template) + + return result["id"] + + def get_strategy_id(self, strategy_name: str): + """ + 获取策略ID + :param strategy_name: 策略名称 + :return: 策略ID + """ + PROJECT_ID = self.project_table['strategy_base'] + response = requests.get(f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records", headers=self.headers) + response.raise_for_status() + data = response.json() + records = data["list"] + return next((str(record["id"]) for record in records if record.get("text") == strategy_name), None) + + def get_template_list(self, strategy_id: str) -> any: + """ + 获取网络配置参数 + :param type: + :param strategy_id: + :param key: 配置参数的键 + :return: 配置参数的值 + """ + PROJECT_ID = self.project_table['strategy_config_template'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + params = dict(limit=100, offset=0, where=f"(template_id,eq,{strategy_id})") + try: + return requests.get(url, headers=self.headers, params=params).json()['list'] + except Exception as e: + return [] + + def get_data_list(self, strategy_id: str, type: str) -> any: + """ + 获取网络配置参数 + :param type: + :param strategy_id: + :param key: 配置参数的键 + :return: 配置参数的值 + """ + # 这里可以添加获取配置逻辑 + PROJECT_ID = self.project_table['strategy_config'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + params = dict(limit=100, offset=0, where=f"(strategy_id,eq,{strategy_id})") + try: + result = [] + records = requests.get(url, headers=self.headers, params=params).json()['list'] + for record in records: + if record.get("strategy_id") == str(strategy_id) and record.get("type") == type: + result.append(record) + return result + except Exception as e: + return [] + + def update_config(self, id: str, new_value: str): + """ + 更新网络配置参数 + + :return: + """ + PROJECT_ID = self.project_table['strategy_config'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + new_value = { + "id": id, + "new_value": new_value, + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + update_response = requests.patch(url, headers=self.headers, json=new_value) + update_response.raise_for_status() + return update_response.json() + + def insert_config(self, strategy_id: str, template: dict): + """ + 插入网络配置参数 + """ + PROJECT_ID = self.project_table['strategy_config'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + data = { + "strategy_id": strategy_id, + "type": "config", + "name": template['name'], + "data_type": template['data_type'], + "data_param": template['data_param'], + "memo": template['memo'], + "new_value": template['value'], + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + response = requests.post(url, headers=self.headers, data=data) + response.raise_for_status() # 检查错误 + + # def update_check(self, id: str, old_value: str, new_value: str): + # """ + # 更新网络配置参数 + # + # :return: + # """ + # PROJECT_ID = self.project_table['strategy_config'] + # url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records/" + # new_value = { + # "id": id, + # "old_value": old_value, + # "new_value": new_value, + # "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + # } + # update_response = requests.patch(url, headers=self.headers, json=new_value) + # update_response.raise_for_status() + # return update_response.json() + # + # def insert_check(self, strategy_id: str, key: str, value: str): + # """ + # 插入网络配置参数 + # """ + # PROJECT_ID = self.project_table['strategy_config'] + # url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + # data = { + # "strategy_id": strategy_id, + # "type": "check", + # "name": key, + # "new_value": value, + # "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + # + # } + # response = requests.post(url, headers=self.headers, data=data) + # response.raise_for_status() # 检查错误 + + @staticmethod + def insert_logs(strategy_id: str, log_type: str, log_content: str): + """ + 插入日志记录 + :param strategy_id: 策略ID + :param log_type: 日志类型 + :param log_content: 日志内容 + :return: None + """ + PROJECT_ID = "m8ebfypvbfpk38p" + url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records" + data = { + "strategy_id": strategy_id, + "msg_type": log_type, + "message": log_content, + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + } + API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 + headers = { + "xc-token": API_TOKEN + } + response = requests.post(url, headers=headers, data=data) + response.raise_for_status() + + @staticmethod + def insert_state(strategy_id: str,state_str:str,trade_date:str): + """ + + :param strategy_id: + :param state_str: + :param trade_date: + :return: + """ + PROJECT_ID = "mosii8hqry8xx9b" + url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records" + data = { + "strategy_id": strategy_id, + "trade_date": trade_date, # 使用当前时间 + "state": state_str + } + API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 + headers = { + "xc-token": API_TOKEN + } + response = requests.post(url, headers=headers, data=data) + response.raise_for_status() + + + +# aaa = network_config() +# ddd = aaa.get_data_list("10000113","config") +# print(ddd) diff --git a/config/strategy_config.py b/config/strategy_config.py new file mode 100644 index 0000000..d0434e3 --- /dev/null +++ b/config/strategy_config.py @@ -0,0 +1,79 @@ +from config.strategy_state import state +from kit.conf import conf +from kit.logger import Logger +from config.network_config import network_config + + +class strategy_config: + + def __init__(self, + strategy_name: str, + strategy_template_id: str, + config_save: network_config, + log): + """ + 基础必要初始化策略配置 + """ + # 策略名 + self.strategy_name = strategy_name + # 配置模版id + self.template_id = strategy_template_id + # 配置变更保存方式 + self.connection = config_save + # 日志记录器 + self.logger = Logger(log) + + """ + 实例化strategy_config类时,初始化必要的属性 + """ + # 策略ID + self.strategy_id = None + # 配置对应主键 + self.configkey_id = {} + # 配置信息 + self.config = {} + + # 核心缓存 + self.strategy_state = state() + + # 初始化配置文件 + self.init_from_db() + + def init_from_db(self): + self.strategy_id = self.connection.register_strategy(self.strategy_name, self.template_id) + + self.logger.info("初始化策略配置", self.strategy_id) + self.logger.info(' 注册策略序列:' + str(self.strategy_id), self.strategy_id) + config_list = self.connection.get_data_list(self.strategy_id, 'config') + self.config = {item['name']: item for item in config_list} + self.configkey_id = {item['name']: item['id'] for item in config_list} + + self.logger.info('确认配置参数:', self.strategy_id) + for config in config_list: + self.logger.info(f" 参数名: {config['name']}, 值: {config['new_value']}, 备注: {config['memo']}", self.strategy_id) + self.logger.info('确认配置参数完毕, 如有错误请立刻停止执行', self.strategy_id) + + def get_all_configs(self) -> dict: + """ + 获取所有策略配置参数 + :return: 包含所有配置参数的字典 + """ + return self.config + + def set_config(self, key: str, value: str) -> None: + """ + 通过字典语法设置配置项,并自动触发数据库更新 + + Args: + key (str): 配置项名称 + value (Any): 配置项的新值 + """ + id = self.configkey_id.get(key) + self.connection.update_config(id, value) + + def get_config(self, key: str): + func = conf.config_type(key) + return func(key) + +# if __name__ == "__main__": +# config_instance = strategy_config('aaa', '10000000', network_config(), None) diff --git a/config/strategy_state.py b/config/strategy_state.py new file mode 100644 index 0000000..498ad28 --- /dev/null +++ b/config/strategy_state.py @@ -0,0 +1,138 @@ +from datetime import datetime + + +class hold_data: + def __init__(self, data: dict): + self.stock = None + self.price = None + self.amount = None + self.cost = None + self.profit = None + self.profit_rate = None + + +class state: + def __init__(self): + self.data = {} + self.init() + + # 初始化 + def init(self): + self.data = { + "create_date": "", + "position_list": {}, + "sell_request_list": {}, + "buy_request_list": {}, + "high_stock_list": {}, + "low_stock_list": {}, + "reason_to_sell": "", + "history_buy_list": {}, + "history_sell_list": {}, + } + + def get_buy_requests(self): + return self.data["buy_request_list"] + + def clear_buy_requests(self): + """ + 清空买入请求列表 + """ + self.data["buy_request_list"] = {} + + + def reset(self): + self.init() + + def set_state_datetime(self, key: str, value: datetime) -> None: + """ + 设置状态值 + :param key: 状态键 + :param value: 状态值 + """ + self.data[key] = value + + def set_hold_stock(self, stock: str, data: dict) -> None: + """ + 设置持仓股票数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["position_list"][stock] = hd + + def set_high_stock(self, stock: str, data: dict) -> None: + """ + 设置高位股票数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["high_stock_list"][stock] = hd + + def set_low_stock(self, stock: str, data: dict) -> None: + """ + 设置低位股票数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["low_stock_list"][stock] = hd + + def set_sell_request(self, stock: str, data: dict, reason: str) -> None: + """ + 设置卖出请求数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["sell_request_list"][stock + "_" + reason] = hd + + def set_buy_request(self, stock: str, data: dict, reason: str) -> None: + """ + 设置买入请求数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["buy_request_list"][stock + "_" + reason] = hd + + def clear_buys_requests(self): + """ + 清空买入请求列表 + """ + self.data["buy_request_list"] = {} + + def get_sell_reason(self): + return self.data["reason_to_sell"] + + def set_sell_reason(self, reason: str): + """ + 设置卖出原因 + :param reason: 卖出原因 + """ + self.data["reason_to_sell"] = reason + + + def get_sell_requests(self): + """ + 获取卖出请求列表 + :return: 卖出请求列表 + """ + return self.data["sell_request_list"] + + def get_yesterday_high_list(self): + """ + 获取昨日涨停股票列表 + :return: 昨日涨停股票列表 + """ + return self.data["high_stock_list"] \ No newline at end of file diff --git a/jq_channel.py b/jq_channel.py new file mode 100644 index 0000000..12099db --- /dev/null +++ b/jq_channel.py @@ -0,0 +1,108 @@ +from typing import Any, List + + +class exchange: + + def close_positions(self, context: Any, position: Any): + """ + 关闭所有持仓 + """ + raise NotImplementedError("This method should be implemented by subclasses") + + def buy_security(self, context: Any, target_list: List[str]) -> None: + """ + 买入操作:对目标股票执行买入,下单资金均摊分配 + """ + raise NotImplementedError("This method should be implemented by subclasses") + + def open_position(self, security: str, value: float) -> bool: + """ + 开仓操作:尝试买入指定股票 + 参数: + security: 股票代码 + value: 分配给该股票的资金 + 返回: + 若下单成功(部分或全部成交)返回 True,否则返回 False + """ + raise NotImplementedError("This method should be implemented by subclasses") + + +class jq_channel(exchange): + + def __init__(self): + pass + + def order_target_value_(self, stock: str, value: float) -> Any: + """ + 封装 order_target_value 函数进行下单,同时记录中文日志和异常信息 + 参数: + security: 股票代码 + value: 下单目标金额 + 返回: + 下单后生成的订单对象;若失败返回 None + """ + try: + order = order_target_value(stock, value) + return order + except Exception as e: + return None + + def close_positions(self, context: Any, position: Any): + """ + 平仓操作:尽可能将指定股票仓位全部卖出 + 参数: + position: 持仓对象 + 返回: + 若下单后订单全部成交返回 True,否则返回 False + """ + security = position.security + order = self.order_target_value_(security, 0) + + if order is None: + # log.error(f"股票 {security} 下单失败,可能是API调用错误") + return False + + if order.status == OrderStatus.held and order.filled == order.amount: + # log.info(f"股票 {security} 卖出订单完全成交") + return True + else: + # log.warning(f"股票 {security} 卖出订单部分成交或未成交,状态: {order.status}, 已成交: {order.filled}/{order.amount}") + return False + + def buy_security(self, context: Any, target_list: List[str]) -> None: + """ + 买入操作:对目标股票执行买入,下单资金均摊分配 + + 参数: + context: 聚宽平台传入的交易上下文对象 + target_list: 目标股票代码列表 + """ + position_count = len(context.portfolio.positions) + target_num = len(target_list) + if target_num > position_count: + try: + value = context.portfolio.cash / (target_num - position_count) + except ZeroDivisionError as e: + # log.error(f"资金分摊时除零错误: {e}") + return + for stock in target_list: + if context.portfolio.positions[stock].total_amount == 0: + if self.open_position(stock, value): + # log.info(f"已买入股票 {stock},分配资金 {value:.2f}") + # self.not_buy_again.append(stock) + if len(context.portfolio.positions) == target_num: + break + + def open_position(self, security: str, value: float) -> bool: + """ + 开仓操作:尝试买入指定股票 + 参数: + security: 股票代码 + value: 分配给该股票的资金 + 返回: + 若下单成功(部分或全部成交)返回 True,否则返回 False + """ + order = self.order_target_value_(security, value) + if order is not None and order.filled > 0: + return True + return False diff --git a/kit/__init__.py b/kit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kit/conf.py b/kit/conf.py new file mode 100644 index 0000000..a2951bd --- /dev/null +++ b/kit/conf.py @@ -0,0 +1,39 @@ +class conf: + + @staticmethod + def to_int(value: str, params, separator: str = '_'): + return int(value) + + @staticmethod + def to_float(value: str, params, separator: str = '_'): + return float(value) + + @staticmethod + def to_str(value: str, params, separator: str = '_'): + return str(value) + + @staticmethod + def to_bool(value: str, params, separator: str = '_'): + return bool(value) + + @staticmethod + def to_list(value: str, params, separator: str = '_'): + params_list = params.split(separator, -1) + return value.split(params_list) + + @staticmethod + def config_type(value: str): + data = { + "int": conf.to_int, + "float": conf.to_float, + "str": conf.to_str, + "bool": conf.to_bool, + "list": conf.to_list, + } + + return data[value] + +ddd = conf.config_type() + +func = ddd["int"] +print(func("123")) # 输出: 123 diff --git a/kit/kit.py b/kit/kit.py new file mode 100644 index 0000000..f973a1f --- /dev/null +++ b/kit/kit.py @@ -0,0 +1,292 @@ +from datetime import timedelta +from typing import Any, Optional, List, Dict + +import pandas as pd + +class DataHelper: + """ + 数据操作辅助类 + 封装了数据接口的调用,包括 get_price 与 history 函数, + 并在内部捕获异常、输出中文错误日志,避免重复编写 try/except 代码。 + """ + + @staticmethod + def get_price_safe( + security: Any, + end_date: Any, + frequency: str, + fields: List[str], + count: int, + panel: bool = False, + skip_paused: bool = True, + fq: Optional[str] = None, + fill_paused: bool = False + ) -> Optional[pd.DataFrame]: + """ + 安全调用 get_price 数据接口 + 参数: + security: 单只股票代码或股票代码列表 + end_date: 数据截止日期 + frequency: 数据频率,如 "daily" 或 "1m" + fields: 需要获取的数据字段列表(例如 ['open', 'close']) + count: 请求数据的记录数 + panel: 是否返回面板数据(默认为 False) + skip_paused: 是否跳过停牌股票(默认为 True) + fq: 复权方式(例如"pre"或"post",默认 None) + fill_paused: 是否填充停牌数据(默认为 False) + 返回: + 返回包含数据的 DataFrame,如果出错则返回 None + """ + try: + # 调用聚宽提供的 get_price 获取数据 + df = get_price( + security, + end_date=end_date, + frequency=frequency, + fields=fields, + count=count, + panel=panel, + skip_paused=skip_paused, + fq=fq, + fill_paused=fill_paused + ) + return df + except Exception as e: + # 输出中文错误日志,并返回 None + # logger.error(f"获取 {security} 的价格数据时出错: {e}") + return None + + @staticmethod + def get_current_data(): + """ + 获取当前市场数据 + 返回: + 当前市场数据的字典,包含股票代码到数据对象的映射 + """ + try: + # 调用聚宽提供的 get_current_data 获取当前数据 + return get_current_data() + except Exception as e: + # Logger.error(f"获取当前市场数据时出错: {e}") + return {} + + @staticmethod + def get_history_safe( + security: Any, + unit: str, + field: str, + count: int, + ) -> Optional[Dict[str, List[float]]]: + """ + 安全调用 history 数据接口,批量获取历史数据 + + 参数: + security: 单只或多只股票代码 + unit: 数据单位,例如 "1m" 表示1分钟数据 + field: 请求数据字段名称,如 "close"(收盘价) + count: 请求历史数据记录数 + + 返回: + 返回一个字典,映射股票代码到对应的数据列表;出错则返回 None + """ + try: + # 调用聚宽的 history 函数获取数据 + data = history(count, unit=unit, field=field, security_list=security) + return data + except Exception as e: + return None + + @staticmethod + def get_index_stocks(index_code: str) -> List[str]: + """ + 获取指定指数的成分股列表 + + 参数: + index_code: 指数代码,例如 "000300.XSHG" 表示沪深300指数 + date: 可选参数,指定查询日期,默认为 None,表示查询最新成分股 + + 返回: + 成分股代码列表 + """ + try: + # 调用聚宽的 get_index_stocks 函数获取成分股 + return get_index_stocks(index_code) + except Exception as e: + return [] + + @staticmethod + def get_security_info(security: str) -> Optional[Any]: + """ + 获取指定股票的安全信息 + + 参数: + security: 股票代码,例如 "000001.XSHE" + + 返回: + 股票的安全信息对象,如果出错则返回 None + """ + try: + # 调用聚宽的 get_security_info 函数获取股票信息 + return get_security_info(security) + except Exception as e: + return None + + @staticmethod + def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]: + """ + 选股模块: + 1. 从指定股票池(如 399101.XSHE 指数成分股)中获取初步股票列表; + 2. 应用多个过滤器筛选股票(次新股、科创股、ST、停牌、涨跌停等); + 3. 基于基本面数据(EPS、市值)排序后返回候选股票列表。 + 参数: + context: 聚宽平台传入的交易上下文对象 + 返回: + 筛选后的候选股票代码列表 + """ + # 从指定指数中获取初步股票列表 + initial_list: List[str] = DataHelper.get_index_stocks(index_stocks) + + # 依次应用过滤器,筛去不符合条件的股票 + initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股 + initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票 + initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票 + initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票 + initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票 + initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票 + + # 利用基本面查询获取股票代码和EPS数据,并按照市值升序排序 + q = query(valuation.code, indicator.eps) \ + .filter(valuation.code.in_(initial_list)) \ + .order_by(valuation.market_cap.asc()) + df = get_fundamentals(q) + stock_list: List[str] = list(df.code) + stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大 + # 取前2倍目标持仓股票数作为候选池 + final_list: List[str] = stock_list[:2 * int(stock_num)] + + # 查询并输出候选股票的财务信息(如财报日期、营业收入、EPS) + if final_list: + info_query = query( + valuation.code, + income.pubDate, + income.statDate, + income.operating_revenue, + indicator.eps + ).filter(valuation.code.in_(final_list)) + df_info = get_fundamentals(info_query) + # for _, row in df_info.iterrows(): + # log.info( + # f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')},EPS {row.get('eps', 'N/A')}") + return final_list + + @staticmethod + def filter_paused_stock(stock_list: List[str]) -> List[str]: + """ + 过滤停牌的股票 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 未停牌的股票代码列表 + """ + current_data = DataHelper.get_current_data() + return [stock for stock in stock_list if not current_data[stock].paused] + + @staticmethod + def filter_st_stock(stock_list: List[str]) -> List[str]: + """ + 过滤带有 ST 或其他风险标识的股票 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 无 ST 或风险标识的股票代码列表 + """ + current_data = DataHelper.get_current_data() + return [stock for stock in stock_list if (not current_data[stock].is_st) and + ('ST' not in current_data[stock].name) and + ('*' not in current_data[stock].name) and + ('退' not in current_data[stock].name)] + + @staticmethod + def filter_kcbj_stock(stock_list: List[str]) -> List[str]: + """ + 过滤科创、北交股票 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表(排除以 '4'、'8' 开头以及以 '68' 起始的股票) + """ + return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')] + + @staticmethod + def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤当天已经涨停的股票(若未持仓则过滤) + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) + current_data = DataHelper.get_current_data() + if history_data is None: + return stock_list + return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or + (history_data.get(stock, [0])[-1] < current_data[stock].high_limit)] + + @staticmethod + def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤当天已经跌停的股票(若未持仓则过滤) + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) + current_data = DataHelper.get_current_data() + if history_data is None: + return stock_list + return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or + (history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)] + + @staticmethod + def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤次新股:排除上市时间不足375天的股票 + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + yesterday = context.previous_date + return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))] + + @staticmethod + def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤股价高于设定上限(up_price)的股票(非持仓股票参与过滤) + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) + if history_data is None: + return stock_list + return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or + history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price] + + @staticmethod + def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]: + """ + 过滤掉当日已买入的股票,避免重复下单 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 未买入的股票代码列表 + """ + return [stock for stock in stock_list if stock not in not_buy_again] \ No newline at end of file diff --git a/kit/logger.py b/kit/logger.py new file mode 100644 index 0000000..afda4ad --- /dev/null +++ b/kit/logger.py @@ -0,0 +1,119 @@ +import logging +import os +from datetime import datetime + +from config.network_config import network_config + + +class Logger: + """ + 简单的日志操作类,支持控制台和文件输出 + 可以使用现有的logger对象或创建新的实例 + """ + _instance = None + + def __new__(cls, existing_logger=None, *args, **kwargs): + if existing_logger is not None: + # 如果提供了现有的logger对象,创建新实例并使用该对象 + instance = super().__new__(cls) + instance.logger = existing_logger + return instance + + # 否则使用单例模式 + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, existing_logger=None, log_file="logs/app.log"): + # 如果使用现有logger,直接返回 + if existing_logger is not None: + return + + # 如果已经初始化过,直接返回 + if hasattr(self, 'logger'): + return + + # 创建logs目录(如果不存在) + os.makedirs(os.path.dirname(log_file), exist_ok=True) + + # 初始化logger + self.logger = logging.getLogger('AppLogger') + self.logger.setLevel(logging.DEBUG) + + # 清除可能存在的处理器 + self.logger.handlers.clear() + + # 设置日志格式 + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 添加控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + # 添加文件处理器 + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + def debug(self, message, strategy_id=None): + """记录调试级别的日志""" + self.logger.debug(message) + try: + network_config.insert_logs(strategy_id, "debug", message) + except: + pass + + def info(self, message, strategy_id=None): + """记录信息级别的日志""" + self.logger.info(message) + try: + network_config.insert_logs(strategy_id, "info ", message) + except: + pass + + def warning(self, message, strategy_id=None): + """记录警告级别的日志""" + self.logger.warning(message) + try: + network_config.insert_logs(strategy_id, "warn", message) + except: + pass + + def error(self, message, strategy_id=None): + """记录错误级别的日志""" + self.logger.error(message) + try: + network_config.insert_logs(strategy_id, "error", message) + except: + pass + + def critical(self, message, strategy_id=None): + """记录严重错误级别的日志""" + self.logger.critical(message) + try: + network_config.insert_logs(strategy_id, "critical", message) + except: + pass +# # 使用示例 +# if __name__ == "__main__": +# # 示例1:创建新的logger实例 +# logger = Logger() +# logger.info("这是一个新创建的logger实例") +# +# # 示例2:使用现有的logger对象 +# existing_logger = logging.getLogger('ExistingLogger') +# existing_logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler() +# handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) +# existing_logger.addHandler(handler) +# +# logger2 = Logger(existing_logger=existing_logger) +# logger2.info("这是使用现有logger对象创建的实例") +# +# # 示例3:测试单例模式(不传入existing_logger时) +# logger3 = Logger() +# logger3.info("这条日志来自第三个logger实例(与第一个是同一个实例)") diff --git a/main.py b/main.py new file mode 100644 index 0000000..cf5a233 --- /dev/null +++ b/main.py @@ -0,0 +1,48 @@ +from logging import log +from typing import Any + +from config.network_config import network_config +from jq_channel import jq_channel +from strategy import trade_strategy +from task.check_state_before_task import check_state_before_task + +strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel()) + + + + + + + + + +def initialize(context: Any) -> None: + # # 初始化策略参数 + strategy.init_strategy() # 策略初始化函数 + + # # 注册调度任务 + # run_daily(check_positions_before_market_open_func, time='9:01') # 开盘前先检查持仓状态 + # run_daily(check_holdings_yesterday_func, time='9:00') + # run_daily(prepare_stock_list_func, time='9:05') + # + # run_daily(sell_stocks_func, time='10:00') # 每日检查止损条件 + # + # run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 + # run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓 + # run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票 + # run_daily(process_pending_sells_func, time='13:05') # 午盘开盘后也处理一次卖出 + # run_daily(process_pending_buys_func, time='13:15') # 午盘开盘后也处理一次买入 + # run_daily(trade_afternoon_func, time='14:30') + # run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出 + # run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入 + # run_daily(close_account_func, time='14:50') + # run_weekly(print_position_info_func, 5, time='15:10') # 周五收盘后打印持仓信息 + task = check_state_before_task(strategy) + task.process(context) + + + + + + pass +# initialize(None) # 调用初始化函数,开始策略执行 diff --git a/strategy.py b/strategy.py new file mode 100644 index 0000000..27d0ac0 --- /dev/null +++ b/strategy.py @@ -0,0 +1,94 @@ +from config.strategy_state import state +from jq_channel import exchange +from kit.logger import Logger +from config.strategy_config import strategy_config + + +class trade_strategy: + + def __init__(self, strategy_name, strategy_template_id, config_save, log, channel: exchange): + # -------必要参数------- + self.log = Logger(log) # 日志记录器 + self.strategy_config = strategy_config(strategy_name, + strategy_template_id, + config_save, + self.log) + + self.state = state() # 策略状态管理器,跟踪策略运行状态 + self.channel = channel + + # 添加持仓监控和管理器 + + def init_strategy(self) -> None: + """ + 策略初始化函数 + 配置交易环境参数,包括防未来数据、基准、滑点、订单成本以及日志输出等级。 + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + # 启用防未来数据以确保历史回测的严谨性 + self.log.info("初始化策略TradingStrategy配置...") + + set_option('avoid_future_data', True) + # 设置策略基准为上证指数 + #set_benchmark(self.strategy_config.config["benchmark_stock"]) + set_benchmark("000001.XSHG") + # 使用真实市场价格,模拟实际交易 + set_option('use_real_price', True) + # 设置固定滑点,确保价格偏差不大 + set_slippage(FixedSlippage(3 / 10000)) + # 设置订单成本,包括印花税和佣金 + set_order_cost(OrderCost( + open_tax=0, + close_tax=0.001, # 卖出时0.1%的印花税 + open_commission=2.5 / 10000, + close_commission=2.5 / 10000, + close_today_commission=0, + min_commission=5 # 最低佣金为5元 + ), type='stock') + # 设置日志输出等级(中文日志输出) + self.log.logger.set_level('order', 'error') + self.log.logger.set_level('system', 'error') + self.log.logger.set_level('strategy', 'debug') + + def set_target_list(self, target_list): + self.target_list = target_list.copy() + + def get_target_list(self): + """获取本次调仓候选股票列表""" + return self.target_list + + def get_configs(self) -> dict: + """ + 获取策略配置 + :return: strategy_config 对象 + """ + return self.strategy_config.get_all_configs() + + def get_strategy_config(self) -> strategy_config: + """ + 获取策略配置字典 + :return: 包含所有配置参数的字典 + """ + return self.strategy_config + + def close_position(self, context, stock: str): + """ + 平仓操作:尽可能将指定股票仓位全部卖出 + 参数: + position: 持仓对象 + 返回: + 若下单后订单全部成交返回 True,否则返回 False + """ + self.channel.close_positions(context, stock) + + def open_position(self, security: str, value: float) -> bool: + """ + 开仓操作:尝试买入指定股票 + 参数: + security: 股票代码 + value: 分配给该股票的资金 + 返回: + 若下单成功(部分或全部成交)返回 True,否则返回 False + """ + return self.channel.open_position(security, value) \ No newline at end of file diff --git a/task/__init__.py b/task/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/task/base_task.py b/task/base_task.py new file mode 100644 index 0000000..9141137 --- /dev/null +++ b/task/base_task.py @@ -0,0 +1,46 @@ +from typing import Any + +from kit.conf import conf +from strategy import trade_strategy + + +class base_task: + def __init__(self, strategy: trade_strategy, sub_task=None): + self.strategy = strategy + self.name = "base_task" + self.remark = "base_task" + self.memo = "base_task" + self.sub_task = sub_task + + self.strategy_config = self.strategy.get_strategy_config() + self.configs = self.strategy.get_configs() + self.log = self.strategy.get_logger() + + def init(self, context: Any): + pass + + def config(self, context: Any): + pass + + def run(self, context: Any): + pass + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass + + def process(self, context: Any): + self.init(context) + self.config(context) + self.run(context) + self.handle(context) + self.end(context) + + def get_config(self, key: str): + configs = self.configs.get(key) + data_type = configs['data_type'] + data_params = configs['data_params'] + func = conf.config_type(data_type) + return func(self.configs[key]['new_value'], data_params) diff --git a/task/buy_stocks_func_task.py b/task/buy_stocks_func_task.py new file mode 100644 index 0000000..868effc --- /dev/null +++ b/task/buy_stocks_func_task.py @@ -0,0 +1,73 @@ +from typing import Any, List + +from kit.kit import DataHelper +from strategy import trade_strategy +from task.base_task import base_task + + +class buy_stocks_func_task(base_task): + """ + 每周调仓策略: + 如果非空仓日,先选股得到目标股票列表,再卖出当前持仓中不在目标列表且昨日未涨停的股票, + 最后买入目标股票,同时记录当天买入情况避免重复下单。 + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def config(self, context: Any): + + # 每次调仓的目标股票数量 + self.stock_num = self.get_config('stock_num') + # 获取当前持仓列表 + self.hold_list = [position.security for position in list(context.portfolio.positions.values())] + self.index_stocks = self.get_config('index_stocks') + self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num) + self.temp_target_list = self.target_list[:self.stock_num] + # 昨日涨停股票列表 + self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list() + self.today_trade_switch = self.get_config('today_trade_switch') + + def run(self, context: Any): + + if not self.today_trade_switch: + self.log.info("今日非交易日,跳过调仓操作。") + return + + # 重置当天已买入记录 + # self.position_manager.reset_not_buy_again() + # 取目标持仓数以内的股票作为调仓目标 + self.log.info(f"每周调仓目标股票: {self.temp_target_list}") + + # 遍历当前持仓,若股票不在目标列表且非昨日涨停,则执行卖出操作 + for stock in self.hold_list: + if stock not in self.temp_target_list and stock not in self.yesterday_HL_list: + pos = context.portfolio.positions.get(stock) + self.log.info(f"调仓决策:卖出股票 {stock}") + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.strategy.state.set_sell_request(stock, pos, 'rebalance') + else: + self.log.info(f"调仓决策:继续持有股票 {stock}") + + # 对目标股票执行买入操作前,先将其注册到待买入队列 + buy_targets = [stock for stock in self.temp_target_list if stock not in self.hold_list] + if buy_targets: + for stock in buy_targets: + pos = context.portfolio.positions.get(stock) + self.strategy.state.set_buy_request(stock, pos, 'buy') + self.log.info(f"调仓决策:将 {len(buy_targets)} 只股票加入待买入队列: {buy_targets}") + + # 更新当天已买入记录,防止重复买入 + # for position in list(context.portfolio.positions.values()): + # if position.security not in self.position_manager.get_not_buy_again(): + # self.position_manager.get_not_buy_again().append(position.security) + + def handle(self, context: Any): + self.strategy.set_target_list(self.temp_target_list) + pass + + def end(self, context: Any): + pass + # 这里可以添加任何必要的清理或总结操作 diff --git a/task/check_high_volume_func_task.py b/task/check_high_volume_func_task.py new file mode 100644 index 0000000..45caed2 --- /dev/null +++ b/task/check_high_volume_func_task.py @@ -0,0 +1,59 @@ +from base.base_task import base_task +from kit.kit import DataHelper +from task.process_pending_sells_func_task import process_pending_sells_func_task + + +class check_high_volume_func_task(base_task): + """ + 检查持仓股票当日成交量是否异常放量: + 如果当日成交量大于过去 HV_duration 天内最大成交量的 HV_ratio 倍,则视为异常,执行卖出操作。 + + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + + def __init__(self, strategy, sub_task=None): + super().__init__(strategy, sub_task) + + def init(self, context): + self.name = "check_high_volume_func_task" + self.remark = "检查高成交量任务" + self.memo = "检查是否有异常成交量的股票" + + def begin(self, context): + self.HV_control = bool(self.config['HV_control']) + self.HV_duration = int(self.config['HV_duration']) + self.HV_ratio = float(self.config['HV_ratio']) + + def run(self, context): + current_data = DataHelper.get_current_data() + for stock in list(context.portfolio.positions.keys()): + if current_data[stock].paused: + continue + if current_data[stock].last_price == current_data[stock].high_limit: + continue + if context.portfolio.positions[stock].closeable_amount == 0: + continue + df_volume = get_bars( + stock, + count=self.HV_duration, + unit='1d', + fields=['volume'], + include_now=True, + df=True + ) + if df_volume is not None and not df_volume.empty: + if df_volume['volume'].iloc[-1] > self.HV_ratio * df_volume['volume'].max(): + self.log.info(f"检测到股票 {stock} 出现异常放量,执行卖出操作。") + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.position_monitor.register_sell_request(stock, 'high_volume') + + # 处理所有待卖出请求 + task = process_pending_sells_func_task(self.strategy, self.name) + task.process(context) + + def handle(self, context): + pass + + def end(self, context): + pass diff --git a/task/check_limit_up_func_task.py b/task/check_limit_up_func_task.py new file mode 100644 index 0000000..e0df297 --- /dev/null +++ b/task/check_limit_up_func_task.py @@ -0,0 +1,58 @@ +from typing import Any + +from kit.kit import DataHelper +from task.base_task import base_task + + +class check_limit_up_func_task(base_task): + """ + 检查昨日处于涨停状态的股票在当前是否破板。 + 如破板(当前价格低于涨停价),则立即卖出该股票,并记录卖出原因为 "limitup"。 + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + + def __init__(self, strategy, sub_task=None): + super().__init__(strategy, sub_task) + + def init(self, context: Any): + self.name = "check_limit_up_task" + self.remark = "检查涨停任务" + self.memo = "检查是否有涨停破板的股票" + + def begin(self, context: Any): + self.yesterday_HL_list = self.strategy.state.get_yesterday_high_list() + + + def run(self, context: Any): + now_time = context.current_dt # 获取当前时间 + if self.yesterday_HL_list: + for stock in self.yesterday_HL_list: + # 使用1分钟周期数据判断当前价格和涨停价是否符合条件 + current_data = DataHelper.get_price_safe( + stock, + end_date=now_time, + frequency='1m', + fields=['close', 'high_limit'], + count=1, + panel=False, + fill_paused=True + ) + if current_data is not None and not current_data.empty: + if current_data.iloc[0]['close'] < current_data.iloc[0]['high_limit']: + self.log.info(f"股票 {stock} 涨停破板,触发卖出操作。") + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.strategy.state.set_sell_request(stock, current_data.iloc[0], 'limitup') + else: + self.log.info(f"股票 {stock} 仍维持涨停状态。") + + # 处理待卖出股票 + # if self.reason_to_sell[0] == 'limitup': + # sells_task = process_pending_sells_func_task(self.strategy, self.name) + # sells_task.process(context) + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass diff --git a/task/check_market_env_task.py b/task/check_market_env_task.py new file mode 100644 index 0000000..e563f42 --- /dev/null +++ b/task/check_market_env_task.py @@ -0,0 +1,72 @@ +from typing import Any + +from strategy import trade_strategy +from task.base_task import base_task + + +class check_market_env_task(base_task): + """ + 交易环境确认 + 1、判断是否为特殊月份(1月/4月) + 2、如果是特殊月份且通过了清仓检查,则清仓所有持仓 + 3、如果不是特殊月份且没有交易信号,则重置交易信号 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + + + + def config(self, context: Any): + + self.current_date = context.current_dt.date() + self.current_month = self.current_date.month + self.current_day = self.current_date.day + + # 判断是否执行空仓策略 + self.filter_month_clear_switch = self.get_config('filter_month_clear_switch') + self.filer_month_data = self.get_config('filter_month_data') + # 判断是否为特殊月 = True时 等于 执行1月/4月 全部清仓 + self.in_special_month = str(self.current_month) in self.filer_month_data + + self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号 + + + def run(self, context: Any): + # + if self.filter_month_clear_switch and self.in_special_month: + # 进入特殊月份,应清仓 + for stock in self.strategy.state.data['position_list'].keys(): + self.strategy.state.set_sell_request(stock, self.strategy.state.data['position_list'][stock], f"enter_{self.current_month}_month") + # 清仓后就不交易 + self.today_trade_switch = False + elif not self.filter_month_clear_switch: + # 不在特殊月 或 不执行空仓策略 可以交易 + self.today_trade_switch = True + else: + # 如果是特殊月份但没有清仓条件,则继续交易 + self.today_trade_switch = True + + + def handle(self, context: Any): + # 保存状态 + old_today_trade_switch = self.strategy_config.get_config("today_trade_switch") + if old_today_trade_switch != self.today_trade_switch: + self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 + self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}") + + + def end(self, context: Any): + + self.log.info("-----------------") + self.log.info("一.今日环境确认:") + self.log.info(" 当前日期: {}".format(self.current_date)) + self.log.info(" 空仓策略: {}".format("开启" if self.filter_month_clear_switch else "关闭")) + self.log.info(" 今日交易: {}".format("交易" if self.today_trade_switch else "清仓")) + self.log.info(" 特殊月份: {}".format(",".join(str(item) for item in self.filer_month_data))) + self.log.info(" 当前月份: {}".format(self.current_month)) + + # 今日清仓 + + diff --git a/task/check_positions_stop_loss_task.py b/task/check_positions_stop_loss_task.py new file mode 100644 index 0000000..2fa24ab --- /dev/null +++ b/task/check_positions_stop_loss_task.py @@ -0,0 +1,67 @@ +from typing import Any + +from kit.kit import DataHelper +from strategy import trade_strategy +from task.base_task import base_task + + +class check_positions_stop_loss_task(base_task): + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def init(self, context: Any): + + pass + + def config(self, context: Any): + + self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略 + self.hold_list = self.strategy.state.data['position_list'] # 持仓列表 + self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关 + self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略 + self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值 + self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值 + self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因 + self.index_stocks = self.get_config('index_stocks') # 指数代码 + + def run(self, context: Any): + if len(self.hold_list) == 0: + self.log.debug("当前无持仓,跳过止损检查。") + return + + if self.stoploss_switch and self.today_trade_switch: + if self.stoploss_strategy == 1 or self.stoploss_strategy == 3: + # 个股止盈或止损判断 + for stock in list(context.portfolio.positions.keys()): + pos = context.portfolio.positions[stock] + if pos.price >= pos.avg_cost * 2: + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.strategy.state.set_sell_request(stock, pos, 'take_profit') + self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。") + elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data: + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.strategy.state.set_sell_request(stock, pos, 'stop_loss') + self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。") + self.strategy.state.set_sell_reason("stoploss") + + if self.stoploss_strategy == 2 or self.stoploss_strategy == 3: + # 大盘止损判断,若整体市场跌幅过大则平仓所有股票 + stock_list = DataHelper.get_index_stocks(self.index_stocks) + df = DataHelper.get_price_safe( + stock_list, + end_date=context.previous_date, + frequency='daily', + fields=['close', 'open'], + count=1, + panel=False + ) + if df is not None and not df.empty: + down_ratio = (df['close'] / df['open']).mean() + if down_ratio <= self.stoploss_market_rate_data: + self.strategy.state.set_sell_reason("stoploss") + self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。") + for stock in list(context.portfolio.positions.keys()): + pos = context.portfolio.positions[stock] + # 通过持仓监控器注册卖出请求,而不是直接卖出 + self.position_monitor.register_sell_request(stock, pos, 'market_stop_loss') diff --git a/task/check_remain_amount_func_task.py b/task/check_remain_amount_func_task.py new file mode 100644 index 0000000..f268183 --- /dev/null +++ b/task/check_remain_amount_func_task.py @@ -0,0 +1,58 @@ +from typing import Any + +from base.base_task import base_task +from kit.kit import DataHelper +from task.process_pending_buys_func_task import process_pending_buys_func_task + + +class check_remain_amount_func_task(base_task): + """ + 检查账户资金与持仓数量: + 如果因涨停破板卖出导致持仓不足,则从目标股票中筛选未买入股票,进行补仓操作。 + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + + def __init__(self, strategy: Any, sub_task=None): + super().__init__(strategy, sub_task) + + def init(self, context: Any): + self.name = "check_remain_amount_func_task" + self.remark = "检查剩余可用资金任务" + self.memo = "检查是否有剩余可用资金进行补仓" + + def begin(self, context: Any): + self.stock_num = self.strategy_config.config['stock_num'] + self.reason_to_sell = self.strategy.get_reason_to_sell() + self.target_list = self.strategy.get_target_list() + self.not_buy_again = self.position_manager.get_not_buy_again() + + def run(self, context: Any): + if self.reason_to_sell[0]: + # 更新持仓列表 + self.hold_list = [position.security for position in list(context.portfolio.positions.values())] + + if len(self.hold_list) < self.stock_num: + # 处理卖出后的补仓需求 + target_list = DataHelper.filter_not_buy_again(self.target_list, self.not_buy_again) + if target_list: + # 将候选股票加入待买入队列 + self.position_manager.register_buy_request(target_list[:self.stock_num - len(self.hold_list)]) + self.log.info( + f"检测到补仓需求,可用资金 {round(context.portfolio.cash, 2)},添加 {len(target_list[:self.stock_num - len(self.hold_list)])} 只股票到待买入队列") + + # 立即处理待买入请求 + task = process_pending_buys_func_task(self.strategy, self.name) + task.process(context) + + # 重置卖出原因 + self.reason_to_sell = [''] + else: + self.log.info("未检测到涨停破板或止损卖出事件,不进行补仓买入。") + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass + diff --git a/task/check_state_before_task.py b/task/check_state_before_task.py new file mode 100644 index 0000000..77aa71d --- /dev/null +++ b/task/check_state_before_task.py @@ -0,0 +1,101 @@ +import json +from typing import Any + +from config.network_config import network_config +from kit.kit import DataHelper +from strategy import trade_strategy +from task.base_task import base_task + + +class check_state_before_task(base_task): + """ + 重置state信息 + 最前置信息 + """ + + def __init__(self, strategy: trade_strategy, sub_task=None): + super().__init__(strategy, sub_task) + + def init(self, context: Any): + self.log.info("--------------------------") + self.log.info("重置运行中环境数据") + + def config(self, context: Any): + # 重置state信息 + + self.hold_stock = [position.security for position in list(context.portfolio.positions.values())] + self.hold_stock_data = {position.security: position for position in list(context.portfolio.positions.values())} + self.yesterday_high_list = [] + self.yesterday_low_list = [] + + def run(self, context: Any): + positions = context.portfolio.positions + for stock in positions: + self.strategy.state.set_hold_stock(stock.security, stock.__dict__) + + if self.hold_stock: + # 获取持仓股票昨日数据(包括收盘价、涨停价、跌停价) + df = DataHelper.get_price_safe( + self.hold_stock, + end_date=context.previous_date, + frequency='daily', + fields=['close', 'high_limit', 'low_limit'], + count=1, + panel=False, + fill_paused=False + ) + if df is not None and not df.empty: + # 过滤出收盘价等于涨停价的股票,作为昨日涨停股票 + self.yesterday_high_list = list(df[df['close'] == df['high_limit']]) + self.yesterday_low_list = list(df[df['close'] == df['low_limit']]) + + def handle(self, context: Any): + for stock in self.yesterday_high_list: + self.strategy.state.set_high_stock(stock['code'], stock) + for stock in self.yesterday_low_list: + self.strategy.state.set_low_stock(stock['code'], stock) + + def end(self, context: Any): + self.log.info("-----------------") + self.log.info("一.截止昨日持仓情况:") + self.hold_stock_info() + self.yesterday_high_list_info() + self.yesterday_low_list_info() + + self.log.info("二.保存缓存信息") + self.state_save(context) + + self.log.info("三.重置缓存信息") + self.strategy.state.reset() + # + def hold_stock_info(self): + + if self.hold_stock: + self.log.info(" 持仓信息: {} 只,昨日涨停信息: {} 只,昨日跌停信息: {} 只".format(len(self.hold_stock), len(self.yesterday_high_list), + len(self.yesterday_low_list))) + for positions in self.hold_stock: + data = self.hold_stock_data + self.log.debug(" 持仓信息: {}".format(positions)) + else: + self.log.info(" 无持仓") + + def yesterday_high_list_info(self): + if self.yesterday_high_list: + self.log.info(" 涨停信息: {} 只".format(len(self.yesterday_high_list))) + for stock in self.yesterday_high_list: + self.log.debug(" 涨停信息: {}".format(stock)) + else: + self.log.info(" 无涨停") + + def yesterday_low_list_info(self): + if self.yesterday_high_list: + self.log.info(" 跌停信息: {} 只".format(len(self.yesterday_high_list))) + for stock in self.yesterday_high_list: + self.log.debug(" 跌停信息: {}".format(stock)) + else: + self.log.info(" 无跌停") + + def state_save(self, context: Any): + json_data = json.dumps(self.strategy.state) + trade_date = context.current_dt.date() + network_config.insert_state(self.strategy_config.strategy_id, json_data, trade_date.strftime("%Y-%m-%d %H:%M:%S")) diff --git a/task/process_pending_buy_task.py b/task/process_pending_buy_task.py new file mode 100644 index 0000000..059d18b --- /dev/null +++ b/task/process_pending_buy_task.py @@ -0,0 +1,96 @@ +from typing import Any + +from kit.kit import DataHelper +from strategy import trade_strategy +from task.base_task import base_task + + +class process_pending_buy_task(base_task): + """ + 处理待买入股票队列 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + def config(self, context: Any): + self.today_trade_switch = self.get_config('today_trade_switch') + self.stock_num = self.get_config('stock_num') + self.buy_requests_list = self.strategy.state.get_buy_requests() + + def run(self, context: Any): + + # 如果在特殊月份(1月/4月)或没有待买入股票,直接返回 + if self.today_trade_switch or not self.buy_requests_list: + return + + self.log.info("开始处理待买入股票队列") + current_data = DataHelper.get_current_data() + + # 更新当前持仓列表 + self.hold_list = [position.security for position in list(context.portfolio.positions.values())] + position_count = len(self.hold_list) + target_num = int(self.stock_num) + + # 如果持仓已满,不再买入 + if position_count >= target_num: + self.log.info("当前持仓已满,清空待买入队列") + self.strategy.state.clear_buy_requests() + return + + # 计算可买入的股票数量和分配资金 + buy_count = min(len(self.buy_requests_list), target_num - position_count) + if buy_count <= 0: + return + + # 计算每只股票可分配的资金 + try: + value = context.portfolio.cash / buy_count + except ZeroDivisionError: + self.log.error("资金分摊时除零错误") + return + + # 确保每只股票有足够的买入金额 + if value < 5000: # 假设最小买入金额为5000元 + self.log.warning(f"每只股票分配资金不足: {value:.2f}元,取消本次买入") + return + + # 逐个买入股票 + success_count = 0 + for idx, stock in enumerate(self.buy_requests_list[:buy_count]): + # 跳过已持仓的股票 + if stock in self.hold_list: + continue + + # 检查是否可交易 + if current_data[stock].paused: + self.log.warning(f"股票 {stock} 处于停牌状态,跳过买入") + continue + + if current_data[stock].high_limit <= current_data[stock].last_price: + self.log.warning(f"股票 {stock} 已涨停,跳过买入") + continue + + # 执行买入 + if self.strategy.open_position(stock, value): + self.log.info(f"成功买入股票 {stock},分配资金 {value:.2f}") + # self.position_manager.get_not_buy_again().append(stock) + success_count += 1 + + # 如果持仓已满,停止买入 + if len(context.portfolio.positions) >= target_num: + break + + # 清理已处理的股票 + # self.position_manager.pending_buys = self.position_manager.pending_buys[buy_count:] + + # self.log.info(f"本次共成功买入 {success_count} 只股票,待买入队列还剩 {len(self.position_manager.pending_buys)} 只") + + # 输出最新持仓状态 + # self.position_manager.log_status(context, [p.security for p in list(context.portfolio.positions.values())]) + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass diff --git a/task/process_pending_sells_task.py b/task/process_pending_sells_task.py new file mode 100644 index 0000000..c33728c --- /dev/null +++ b/task/process_pending_sells_task.py @@ -0,0 +1,65 @@ +from typing import Any + +from kit.kit import DataHelper +from strategy import trade_strategy +from task.base_task import base_task + + +class process_pending_sells_task(base_task): + """ + 处理待卖出股票队列 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + + + def config(self, context: Any): + self.sell_request_list = self.strategy.state.get_sell_requests() + + def run(self, context: Any): + # 如果没有待卖出股票,直接返回 + if len(self.sell_request_list) == 0: + return + + self.log.info("开始处理待卖出股票队列") + current_data = DataHelper.get_current_data() + + for stock in self.sell_request_list: + if stock not in context.portfolio.positions: + # 股票已不在持仓中,可能已通过其他方式卖出 + continue + + position = context.portfolio.positions[stock] + if position.closeable_amount <= 0: + # 没有可卖出数量 + continue + + # 检查是否可以交易 + if current_data[stock].paused: + self.log.warning(f"股票 {stock} 已暂停交易,跳过卖出") + continue + + if current_data[stock].low_limit >= current_data[stock].last_price: + self.log.warning(f"股票 {stock} 已触及跌停,跳过卖出") + continue + + # 尝试卖出 + # self.position_monitor.increment_attempt(stock) + success = self.strategy.close_position(context, position) + + if success: + # self.position_monitor.mark_as_successful(stock) + self.log.info(f"成功卖出股票 {stock}") + else: + self.log.warning(f"卖出股票 {stock} 失败,已累计尝试 {self.position_monitor.pending_sells[stock]['attempts']} 次") + + # 更新日志状态 + self.position_monitor.log_status() + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass diff --git a/task/trade_afternoon_task.py b/task/trade_afternoon_task.py new file mode 100644 index 0000000..fc7ae2f --- /dev/null +++ b/task/trade_afternoon_task.py @@ -0,0 +1,57 @@ +from strategy import trade_strategy +from task.base_task import base_task +from task.check_high_volume_func_task import check_high_volume_func_task +from task.check_limit_up_func_task import check_limit_up_func_task +from task.check_remain_amount_func_task import check_remain_amount_func_task +from task.process_pending_buy_task import process_pending_buy_task +from task.process_pending_sells_task import process_pending_sells_task + +class trade_afternoon_task(base_task): + """ + 下午交易任务: + 1. 检查是否有因为涨停破板触发的卖出信号; + 2. 如启用了成交量监控,则检测是否有异常成交量; + 3. 检查账户中是否需要补仓; + 4. 处理待买入和待卖出队列。 + + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + + def __init__(self, strategy: trade_strategy): + super().__init__(strategy) + + + + def begin(self, context): + self.no_trading_today_signal = self.config['no_trading_today_signal'] + self.HV_control = bool(self.config['HV_control']) # 是否启用成交量监控 + + def run(self, context): + + if not self.no_trading_today_signal: + check_limit_up_task = check_limit_up_func_task(self.strategy, sub_task=self.sub_task) + check_limit_up_task.process(context) + + if self.HV_control: + check_high_volume_task = check_high_volume_func_task(self.strategy) + check_high_volume_task.process(context) + + # 先处理待卖出股票 + process_pending_sells_task = process_pending_sells_task(self.strategy) + process_pending_sells_task.process(context) + + # 再检查是否需要补仓 + check_remain_amount_task = check_remain_amount_func_task(self.strategy, sub_task=self.sub_task) + check_remain_amount_task.process(context) + + # 最后处理待买入股票 + sells_task = process_pending_buy_task(self.strategy) + sells_task.process(context) + pass + + def handle(self, context): + pass + + def end(self, context): + pass diff --git a/test.py b/test.py new file mode 100644 index 0000000..5078d12 --- /dev/null +++ b/test.py @@ -0,0 +1,1120 @@ +import logging +import os +from datetime import timedelta + +import requests +from typing import Any, Optional, List, Dict + +import pandas as pd + +from strategy import trade_strategy + + +class network_config: + + def __init__(self): + """ + 初始化网络配置 + """ + self.BASE_URL = "http://101.34.79.70:8081" # 替换为你的 NocoDB 地址 + API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 + self.headers = { + "xc-token": API_TOKEN + } + + self.project_table = { + "strategy_config": "mw1zi3dr5sy03em", # 查看项目URL获取 + "strategy_base": "mtvgo6pbrlmbgjc", + "strategy_config_template": "m4nty4o5t6sxzts", + "strategy_logs": "m8ebfypvbfpk38p" + } + + pass + + def register_strategy(self, strategy_name: str, template_id: str = ""): + """ + 注册策略 + :param strategy_name: 策略名称 + :return: None + """ + PROJECT_ID = self.project_table['strategy_base'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + data = { + + "text": strategy_name, + "memo": template_id, + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + "modify_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + + } + response = requests.post(url, headers=self.headers, data=data) + response.raise_for_status() # 检查错误 + result = response.json() + template_list = self.get_template_list(template_id) + for template in template_list: + self.insert_config(result["id"], template) + + return result["id"] + + def get_strategy_id(self, strategy_name: str): + """ + 获取策略ID + :param strategy_name: 策略名称 + :return: 策略ID + """ + PROJECT_ID = self.project_table['strategy_base'] + response = requests.get(f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records", headers=self.headers) + response.raise_for_status() + data = response.json() + records = data["list"] + return next((str(record["id"]) for record in records if record.get("text") == strategy_name), None) + + def get_template_list(self, strategy_id: str) -> any: + """ + 获取网络配置参数 + :param type: + :param strategy_id: + :param key: 配置参数的键 + :return: 配置参数的值 + """ + PROJECT_ID = self.project_table['strategy_config_template'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + params = dict(limit=100, offset=0, where=f"(template_id,eq,{strategy_id})") + try: + return requests.get(url, headers=self.headers, params=params).json()['list'] + except Exception as e: + return [] + + def get_data_list(self, strategy_id: str, type: str) -> any: + """ + 获取网络配置参数 + :param type: + :param strategy_id: + :param key: 配置参数的键 + :return: 配置参数的值 + """ + # 这里可以添加获取配置逻辑 + PROJECT_ID = self.project_table['strategy_config'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + params = dict(limit=100, offset=0, where=f"(strategy_id,eq,{strategy_id})") + try: + result = [] + records = requests.get(url, headers=self.headers, params=params).json()['list'] + for record in records: + if record.get("strategy_id") == str(strategy_id) and record.get("type") == type: + result.append(record) + return result + except Exception as e: + return [] + + def update_config(self, id: str, new_value: str): + """ + 更新网络配置参数 + + :return: + """ + PROJECT_ID = self.project_table['strategy_config'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + new_value = { + "id": id, + "new_value": new_value, + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + update_response = requests.patch(url, headers=self.headers, json=new_value) + update_response.raise_for_status() + return update_response.json() + + def insert_config(self, strategy_id: str, template: dict): + """ + 插入网络配置参数 + """ + PROJECT_ID = self.project_table['strategy_config'] + url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" + data = { + "strategy_id": strategy_id, + "type": "config", + "name": template['name'], + "data_type": template['data_type'], + "data_param": template['data_param'], + "memo": template['memo'], + "new_value": template['value'], + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + response = requests.post(url, headers=self.headers, data=data) + response.raise_for_status() # 检查错误 + + @staticmethod + def insert_logs(strategy_id: str, log_type: str, log_content: str): + """ + 插入日志记录 + :param strategy_id: 策略ID + :param log_type: 日志类型 + :param log_content: 日志内容 + :return: None + """ + PROJECT_ID = "m8ebfypvbfpk38p" + url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records" + data = { + "strategy_id": strategy_id, + "msg_type": log_type, + "message": log_content, + "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 + } + API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 + headers = { + "xc-token": API_TOKEN + } + response = requests.post(url, headers=headers, data=data) + response.raise_for_status() + + +class strategy_config: + + def __init__(self, + strategy_name: str, + strategy_template_id: str, + config_save: network_config, + log): + """ + 基础必要初始化策略配置 + """ + # 策略名 + self.strategy_name = strategy_name + # 配置模版id + self.template_id = strategy_template_id + # 配置变更保存方式 + self.connection = config_save + # 日志记录器 + self.logger = Logger(log) + + """ + 实例化strategy_config类时,初始化必要的属性 + """ + # 策略ID + self.strategy_id = None + # 配置对应主键 + self.configkey_id = {} + # 配置信息 + self.config = {} + + # 核心缓存 + self.strategy_state = state() + + # 初始化配置文件 + self.init_from_db() + + def init_from_db(self): + self.strategy_id = self.connection.register_strategy(self.strategy_name, self.template_id) + + self.logger.info("初始化策略配置", self.strategy_id) + self.logger.info(' 注册策略序列:' + str(self.strategy_id), self.strategy_id) + config_list = self.connection.get_data_list(self.strategy_id, 'config') + self.config = {item['name']: item for item in config_list} + self.configkey_id = {item['name']: item['id'] for item in config_list} + + self.logger.info('确认配置参数:', self.strategy_id) + for config in config_list: + self.logger.info(f" 参数名: {config['name']}, 值: {config['new_value']}, 备注: {config['memo']}", self.strategy_id) + self.logger.info('确认配置参数完毕, 如有错误请立刻停止执行', self.strategy_id) + + def get_all_configs(self) -> dict: + """ + 获取所有策略配置参数 + :return: 包含所有配置参数的字典 + """ + return self.config + + def set_config(self, key: str, value: str) -> None: + """ + 通过字典语法设置配置项,并自动触发数据库更新 + + Args: + key (str): 配置项名称 + value (Any): 配置项的新值 + """ + id = self.configkey_id.get(key) + self.connection.update_config(id, value) + + +# if __name__ == "__main__": +# config_instance = strategy_config('aaa', '10000000', network_config(), None) + + +from datetime import datetime + + +class hold_data: + def __init__(self, data: dict): + self.stock = None + self.price = None + self.amount = None + self.cost = None + self.profit = None + self.profit_rate = None + + +class state: + def __init__(self): + self.data = {} + self.init() + + # 初始化 + def init(self): + self.data = { + "create_date": "", + "position_list": {}, + "sell_request_list": {}, + "buy_request_list": {}, + "high_stock_list": {}, + "low_stock_list": {}, + "reason_to_sell": "", + "history_buy_list": {}, + "history_sell_list": {}, + } + + def get_buy_requests(self): + return self.data["buy_request_list"] + + def clear_buy_requests(self): + """ + 清空买入请求列表 + """ + self.data["buy_request_list"] = {} + + def reset(self): + self.init() + + def set_state_datetime(self, key: str, value: datetime) -> None: + """ + 设置状态值 + :param key: 状态键 + :param value: 状态值 + """ + self.data[key] = value + + def set_hold_stock(self, stock: str, data: dict) -> None: + """ + 设置持仓股票数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["position_list"][stock] = hd + + def set_high_stock(self, stock: str, data: dict) -> None: + """ + 设置高位股票数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["high_stock_list"][stock] = hd + + def set_low_stock(self, stock: str, data: dict) -> None: + """ + 设置低位股票数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["low_stock_list"][stock] = hd + + def set_sell_request(self, stock: str, data: dict, reason: str) -> None: + """ + 设置卖出请求数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["sell_request_list"][stock + "_" + reason] = hd + + def set_buy_request(self, stock: str, data: dict, reason: str) -> None: + """ + 设置买入请求数据 + :param stock: 股票代码 + :param data: hold_data 对象 + """ + hd = hold_data(data) + if not isinstance(hd, hold_data): + raise ValueError("data must be an instance of hold_data") + self.data["buy_request_list"][stock + "_" + reason] = hd + + def clear_buys_requests(self): + """ + 清空买入请求列表 + """ + self.data["buy_request_list"] = {} + + def get_sell_reason(self): + return self.data["reason_to_sell"] + + def set_sell_reason(self, reason: str): + """ + 设置卖出原因 + :param reason: 卖出原因 + """ + self.data["reason_to_sell"] = reason + + def get_sell_requests(self): + """ + 获取卖出请求列表 + :return: 卖出请求列表 + """ + return self.data["sell_request_list"] + + def get_yesterday_high_list(self): + """ + 获取昨日涨停股票列表 + :return: 昨日涨停股票列表 + """ + return self.data["high_stock_list"] + + +class conf: + + @staticmethod + def to_int(value: str, params, separator: str = '_'): + return int(value) + + @staticmethod + def to_float(value: str, params, separator: str = '_'): + return float(value) + + @staticmethod + def to_str(value: str, params, separator: str = '_'): + return str(value) + + @staticmethod + def to_bool(value: str, params, separator: str = '_'): + return bool(value) + + @staticmethod + def to_list(value: str, params, separator: str = '_'): + params_list = params.split(separator, -1) + return value.split(params_list) + + @staticmethod + def config_type(value: str): + data = { + "int": conf.to_int, + "float": conf.to_float, + "str": conf.to_str, + "bool": conf.to_bool, + "list": conf.to_list, + } + + return data[value] + + +class DataHelper: + """ + 数据操作辅助类 + 封装了数据接口的调用,包括 get_price 与 history 函数, + 并在内部捕获异常、输出中文错误日志,避免重复编写 try/except 代码。 + """ + + @staticmethod + def get_price_safe( + security: Any, + end_date: Any, + frequency: str, + fields: List[str], + count: int, + panel: bool = False, + skip_paused: bool = True, + fq: Optional[str] = None, + fill_paused: bool = False + ) -> Optional[pd.DataFrame]: + """ + 安全调用 get_price 数据接口 + 参数: + security: 单只股票代码或股票代码列表 + end_date: 数据截止日期 + frequency: 数据频率,如 "daily" 或 "1m" + fields: 需要获取的数据字段列表(例如 ['open', 'close']) + count: 请求数据的记录数 + panel: 是否返回面板数据(默认为 False) + skip_paused: 是否跳过停牌股票(默认为 True) + fq: 复权方式(例如"pre"或"post",默认 None) + fill_paused: 是否填充停牌数据(默认为 False) + 返回: + 返回包含数据的 DataFrame,如果出错则返回 None + """ + try: + # 调用聚宽提供的 get_price 获取数据 + df = get_price( + security, + end_date=end_date, + frequency=frequency, + fields=fields, + count=count, + panel=panel, + skip_paused=skip_paused, + fq=fq, + fill_paused=fill_paused + ) + return df + except Exception as e: + # 输出中文错误日志,并返回 None + # logger.error(f"获取 {security} 的价格数据时出错: {e}") + return None + + @staticmethod + def get_current_data(): + """ + 获取当前市场数据 + 返回: + 当前市场数据的字典,包含股票代码到数据对象的映射 + """ + try: + # 调用聚宽提供的 get_current_data 获取当前数据 + return get_current_data() + except Exception as e: + # Logger.error(f"获取当前市场数据时出错: {e}") + return {} + + @staticmethod + def get_history_safe( + security: Any, + unit: str, + field: str, + count: int, + ) -> Optional[Dict[str, List[float]]]: + """ + 安全调用 history 数据接口,批量获取历史数据 + + 参数: + security: 单只或多只股票代码 + unit: 数据单位,例如 "1m" 表示1分钟数据 + field: 请求数据字段名称,如 "close"(收盘价) + count: 请求历史数据记录数 + + 返回: + 返回一个字典,映射股票代码到对应的数据列表;出错则返回 None + """ + try: + # 调用聚宽的 history 函数获取数据 + data = history(count, unit=unit, field=field, security_list=security) + return data + except Exception as e: + return None + + @staticmethod + def get_index_stocks(index_code: str) -> List[str]: + """ + 获取指定指数的成分股列表 + + 参数: + index_code: 指数代码,例如 "000300.XSHG" 表示沪深300指数 + date: 可选参数,指定查询日期,默认为 None,表示查询最新成分股 + + 返回: + 成分股代码列表 + """ + try: + # 调用聚宽的 get_index_stocks 函数获取成分股 + return get_index_stocks(index_code) + except Exception as e: + return [] + + @staticmethod + def get_security_info(security: str) -> Optional[Any]: + """ + 获取指定股票的安全信息 + + 参数: + security: 股票代码,例如 "000001.XSHE" + + 返回: + 股票的安全信息对象,如果出错则返回 None + """ + try: + # 调用聚宽的 get_security_info 函数获取股票信息 + return get_security_info(security) + except Exception as e: + return None + + @staticmethod + def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]: + """ + 选股模块: + 1. 从指定股票池(如 399101.XSHE 指数成分股)中获取初步股票列表; + 2. 应用多个过滤器筛选股票(次新股、科创股、ST、停牌、涨跌停等); + 3. 基于基本面数据(EPS、市值)排序后返回候选股票列表。 + 参数: + context: 聚宽平台传入的交易上下文对象 + 返回: + 筛选后的候选股票代码列表 + """ + # 从指定指数中获取初步股票列表 + initial_list: List[str] = DataHelper.get_index_stocks(index_stocks) + + # 依次应用过滤器,筛去不符合条件的股票 + initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股 + initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票 + initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票 + initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票 + initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票 + initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票 + + # 利用基本面查询获取股票代码和EPS数据,并按照市值升序排序 + q = query(valuation.code, indicator.eps) \ + .filter(valuation.code.in_(initial_list)) \ + .order_by(valuation.market_cap.asc()) + df = get_fundamentals(q) + stock_list: List[str] = list(df.code) + stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大 + # 取前2倍目标持仓股票数作为候选池 + final_list: List[str] = stock_list[:2 * int(stock_num)] + + # 查询并输出候选股票的财务信息(如财报日期、营业收入、EPS) + if final_list: + info_query = query( + valuation.code, + income.pubDate, + income.statDate, + income.operating_revenue, + indicator.eps + ).filter(valuation.code.in_(final_list)) + df_info = get_fundamentals(info_query) + # for _, row in df_info.iterrows(): + # log.info( + # f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')},EPS {row.get('eps', 'N/A')}") + return final_list + + @staticmethod + def filter_paused_stock(stock_list: List[str]) -> List[str]: + """ + 过滤停牌的股票 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 未停牌的股票代码列表 + """ + current_data = DataHelper.get_current_data() + return [stock for stock in stock_list if not current_data[stock].paused] + + @staticmethod + def filter_st_stock(stock_list: List[str]) -> List[str]: + """ + 过滤带有 ST 或其他风险标识的股票 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 无 ST 或风险标识的股票代码列表 + """ + current_data = DataHelper.get_current_data() + return [stock for stock in stock_list if (not current_data[stock].is_st) and + ('ST' not in current_data[stock].name) and + ('*' not in current_data[stock].name) and + ('退' not in current_data[stock].name)] + + @staticmethod + def filter_kcbj_stock(stock_list: List[str]) -> List[str]: + """ + 过滤科创、北交股票 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表(排除以 '4'、'8' 开头以及以 '68' 起始的股票) + """ + return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')] + + @staticmethod + def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤当天已经涨停的股票(若未持仓则过滤) + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) + current_data = DataHelper.get_current_data() + if history_data is None: + return stock_list + return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or + (history_data.get(stock, [0])[-1] < current_data[stock].high_limit)] + + @staticmethod + def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤当天已经跌停的股票(若未持仓则过滤) + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) + current_data = DataHelper.get_current_data() + if history_data is None: + return stock_list + return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or + (history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)] + + @staticmethod + def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤次新股:排除上市时间不足375天的股票 + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + yesterday = context.previous_date + return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))] + + @staticmethod + def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]: + """ + 过滤股价高于设定上限(up_price)的股票(非持仓股票参与过滤) + 参数: + context: 交易上下文对象 + stock_list: 待过滤的股票代码列表 + 返回: + 过滤后的股票代码列表 + """ + history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) + if history_data is None: + return stock_list + return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or + history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price] + + @staticmethod + def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]: + """ + 过滤掉当日已买入的股票,避免重复下单 + 参数: + stock_list: 待过滤的股票代码列表 + 返回: + 未买入的股票代码列表 + """ + return [stock for stock in stock_list if stock not in not_buy_again] + + +class Logger: + """ + 简单的日志操作类,支持控制台和文件输出 + 可以使用现有的logger对象或创建新的实例 + """ + _instance = None + + def __new__(cls, existing_logger=None, *args, **kwargs): + if existing_logger is not None: + # 如果提供了现有的logger对象,创建新实例并使用该对象 + instance = super().__new__(cls) + instance.logger = existing_logger + return instance + + # 否则使用单例模式 + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, existing_logger=None, log_file="logs/app.log"): + # 如果使用现有logger,直接返回 + if existing_logger is not None: + return + + # 如果已经初始化过,直接返回 + if hasattr(self, 'logger'): + return + + # 创建logs目录(如果不存在) + os.makedirs(os.path.dirname(log_file), exist_ok=True) + + # 初始化logger + self.logger = logging.getLogger('AppLogger') + self.logger.setLevel(logging.DEBUG) + + # 清除可能存在的处理器 + self.logger.handlers.clear() + + # 设置日志格式 + formatter = logging.Formatter( + '%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 添加控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + # 添加文件处理器 + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + def debug(self, message, strategy_id=None): + """记录调试级别的日志""" + self.logger.debug(message) + try: + network_config.insert_logs(strategy_id, "debug", message) + except: + pass + + def info(self, message, strategy_id=None): + """记录信息级别的日志""" + self.logger.info(message) + try: + network_config.insert_logs(strategy_id, "info ", message) + except: + pass + + def warning(self, message, strategy_id=None): + """记录警告级别的日志""" + self.logger.warning(message) + try: + network_config.insert_logs(strategy_id, "warn", message) + except: + pass + + def error(self, message, strategy_id=None): + """记录错误级别的日志""" + self.logger.error(message) + try: + network_config.insert_logs(strategy_id, "error", message) + except: + pass + + def critical(self, message, strategy_id=None): + """记录严重错误级别的日志""" + self.logger.critical(message) + try: + network_config.insert_logs(strategy_id, "critical", message) + except: + pass +class exchange: + + def close_positions(self, context: Any, position: Any): + """ + 关闭所有持仓 + """ + raise NotImplementedError("This method should be implemented by subclasses") + + def buy_security(self, context: Any, target_list: List[str]) -> None: + """ + 买入操作:对目标股票执行买入,下单资金均摊分配 + """ + raise NotImplementedError("This method should be implemented by subclasses") + + def open_position(self, security: str, value: float) -> bool: + """ + 开仓操作:尝试买入指定股票 + 参数: + security: 股票代码 + value: 分配给该股票的资金 + 返回: + 若下单成功(部分或全部成交)返回 True,否则返回 False + """ + raise NotImplementedError("This method should be implemented by subclasses") + + +class jq_channel(exchange): + + def __init__(self): + pass + + def order_target_value_(self, stock: str, value: float) -> Any: + """ + 封装 order_target_value 函数进行下单,同时记录中文日志和异常信息 + 参数: + security: 股票代码 + value: 下单目标金额 + 返回: + 下单后生成的订单对象;若失败返回 None + """ + try: + order = order_target_value(stock, value) + return order + except Exception as e: + return None + + def close_positions(self, context: Any, position: Any): + """ + 平仓操作:尽可能将指定股票仓位全部卖出 + 参数: + position: 持仓对象 + 返回: + 若下单后订单全部成交返回 True,否则返回 False + """ + security = position.security + order = self.order_target_value_(security, 0) + + if order is None: + # log.error(f"股票 {security} 下单失败,可能是API调用错误") + return False + + if order.status == OrderStatus.held and order.filled == order.amount: + # log.info(f"股票 {security} 卖出订单完全成交") + return True + else: + # log.warning(f"股票 {security} 卖出订单部分成交或未成交,状态: {order.status}, 已成交: {order.filled}/{order.amount}") + return False + + def buy_security(self, context: Any, target_list: List[str]) -> None: + """ + 买入操作:对目标股票执行买入,下单资金均摊分配 + + 参数: + context: 聚宽平台传入的交易上下文对象 + target_list: 目标股票代码列表 + """ + position_count = len(context.portfolio.positions) + target_num = len(target_list) + if target_num > position_count: + try: + value = context.portfolio.cash / (target_num - position_count) + except ZeroDivisionError as e: + # log.error(f"资金分摊时除零错误: {e}") + return + for stock in target_list: + if context.portfolio.positions[stock].total_amount == 0: + if self.open_position(stock, value): + # log.info(f"已买入股票 {stock},分配资金 {value:.2f}") + # self.not_buy_again.append(stock) + if len(context.portfolio.positions) == target_num: + break + + def open_position(self, security: str, value: float) -> bool: + """ + 开仓操作:尝试买入指定股票 + 参数: + security: 股票代码 + value: 分配给该股票的资金 + 返回: + 若下单成功(部分或全部成交)返回 True,否则返回 False + """ + order = self.order_target_value_(security, value) + if order is not None and order.filled > 0: + return True + return False + + +class trade_strategy: + + def __init__(self, strategy_name, strategy_template_id, config_save, log, channel: exchange): + # -------必要参数------- + self.log = Logger(log) # 日志记录器 + self.strategy_config = strategy_config(strategy_name, + strategy_template_id, + config_save, + self.log) + + self.state = state() # 策略状态管理器,跟踪策略运行状态 + self.channel = channel + + # 添加持仓监控和管理器 + + def init_strategy(self) -> None: + """ + 策略初始化函数 + 配置交易环境参数,包括防未来数据、基准、滑点、订单成本以及日志输出等级。 + 参数: + context: 聚宽平台传入的交易上下文对象 + """ + # 启用防未来数据以确保历史回测的严谨性 + self.log.info("初始化策略TradingStrategy配置...") + + set_option('avoid_future_data', True) + # 设置策略基准为上证指数 + set_benchmark(self.strategy_config.config["benchmark_stock"]) + # 使用真实市场价格,模拟实际交易 + set_option('use_real_price', True) + # 设置固定滑点,确保价格偏差不大 + set_slippage(FixedSlippage(3 / 10000)) + # 设置订单成本,包括印花税和佣金 + set_order_cost(OrderCost( + open_tax=0, + close_tax=0.001, # 卖出时0.1%的印花税 + open_commission=2.5 / 10000, + close_commission=2.5 / 10000, + close_today_commission=0, + min_commission=5 # 最低佣金为5元 + ), type='stock') + # 设置日志输出等级(中文日志输出) + self.log.logger.set_level('order', 'error') + self.log.logger.set_level('system', 'error') + self.log.logger.set_level('strategy', 'debug') + + def set_target_list(self, target_list): + self.target_list = target_list.copy() + + def get_target_list(self): + """获取本次调仓候选股票列表""" + return self.target_list + + def get_configs(self) -> dict: + """ + 获取策略配置 + :return: strategy_config 对象 + """ + return self.strategy_config.get_all_configs() + + def get_strategy_config(self) -> strategy_config: + """ + 获取策略配置字典 + :return: 包含所有配置参数的字典 + """ + return self.strategy_config + + def close_position(self, context, stock: str): + """ + 平仓操作:尽可能将指定股票仓位全部卖出 + 参数: + position: 持仓对象 + 返回: + 若下单后订单全部成交返回 True,否则返回 False + """ + self.channel.close_positions(context, stock) + + def open_position(self, security: str, value: float) -> bool: + """ + 开仓操作:尝试买入指定股票 + 参数: + security: 股票代码 + value: 分配给该股票的资金 + 返回: + 若下单成功(部分或全部成交)返回 True,否则返回 False + """ + return self.channel.open_position(security, value) + + +class base_task: + def __init__(self, strategy: trade_strategy, sub_task=None): + self.strategy = strategy + self.name = "base_task" + self.remark = "base_task" + self.memo = "base_task" + self.sub_task = sub_task + + self.strategy_config = self.strategy.get_strategy_config() + self.configs = self.strategy.get_configs() + self.log = self.strategy.log + + def init(self, context: Any): + pass + + def config(self, context: Any): + pass + + def run(self, context: Any): + pass + + def handle(self, context: Any): + pass + + def end(self, context: Any): + pass + + def process(self, context: Any): + self.init(context) + self.config(context) + self.run(context) + self.handle(context) + self.end(context) + + def get_config(self, key: str): + configs = self.configs.get(key) + data_type = configs['data_type'] + data_params = configs['data_params'] + func = conf.config_type(data_type) + return func(self.configs[key]['new_value'], data_params) + + +from kit.kit import DataHelper +from strategy import trade_strategy +from task.base_task import base_task + + +class check_state_before_task(base_task): + """ + 重置state信息 + 最前置信息 + """ + + def __init__(self, strategy: trade_strategy, sub_task=None): + super().__init__(strategy, sub_task) + + def init(self, context): + self.log.info("--------------------------") + self.log.info("重置运行中环境数据") + + def config(self, context): + # 重置state信息 + self.strategy.state.reset() + self.hold_stock = [position.security for position in list(context.portfolio.positions.values())] + self.yesterday_high_list = [] + self.yesterday_low_list = [] + + def run(self, context): + positions = context.portfolio.positions + for stock in positions: + self.strategy.state.set_hold_stock(stock.security, stock.__dict__) + + if self.hold_stock: + # 获取持仓股票昨日数据(包括收盘价、涨停价、跌停价) + df = DataHelper.get_price_safe( + self.hold_stock, + end_date=context.previous_date, + frequency='daily', + fields=['close', 'high_limit', 'low_limit'], + count=1, + panel=False, + fill_paused=False + ) + if df is not None and not df.empty: + # 过滤出收盘价等于涨停价的股票,作为昨日涨停股票 + self.yesterday_high_list = list(df[df['close'] == df['high_limit']]['code']) + self.yesterday_low_list = list(df[df['close'] == df['low_limit']]['code']) + + def handle(self, context): + for stock in self.yesterday_high_list: + self.strategy.state.set_high_stock(stock['code'], stock) + for stock in self.yesterday_low_list: + self.strategy.state.set_low_stock(stock['code'], stock) + + +strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel()) + + +def check_state_before_func(context: Any): + task = check_state_before_task(strategy) + task.process(context) + + +def initialize(context: Any) -> None: + # # 初始化策略参数 + strategy.init_strategy() # 策略初始化函数 + + # # 注册调度任务 + run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态 + # run_daily(check_holdings_yesterday_func, time='9:00') + # run_daily(prepare_stock_list_func, time='9:05') + # + # run_daily(sell_stocks_func, time='10:00') # 每日检查止损条件 + # + # run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 + # run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓 + # run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票 + # run_daily(process_pending_sells_func, time='13:05') # 午盘开盘后也处理一次卖出 + # run_daily(process_pending_buys_func, time='13:15') # 午盘开盘后也处理一次买入 + # run_daily(trade_afternoon_func, time='14:30') + # run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出 + # run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入 + # run_daily(close_account_func, time='14:50') + # run_weekly(print_position_info_func, 5, time='15:10') # 周五收盘后打印持仓信息 + pass +# initialize(None) # 调用初始化函数,开始策略执行