import logging import os from datetime import timedelta import datetime import requests from typing import Any, Optional, List, Dict import pandas as pd class network_config: def __init__(self): """ 初始化网络配置 """ self.BASE_URL = "http://101.34.79.70:8081" # 替换为你的 NocoDB 地址 API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 self.headers = { "xc-token": API_TOKEN } self.project_table = { "strategy_config": "mw1zi3dr5sy03em", # 查看项目URL获取 "strategy_base": "mtvgo6pbrlmbgjc", "strategy_config_template": "m4nty4o5t6sxzts", "strategy_logs": "m8ebfypvbfpk38p", "today_state": "mosii8hqry8xx9b" } pass def register_strategy(self, strategy_name: str, template_id: str = ""): """ 注册策略 :param strategy_name: 策略名称 :return: None """ PROJECT_ID = self.project_table['strategy_base'] url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" data = { "text": strategy_name, "memo": template_id, "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 "modify_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 } response = requests.post(url, headers=self.headers, data=data) response.raise_for_status() # 检查错误 result = response.json() template_list = self.get_template_list(template_id) for template in template_list: self.insert_config(result["id"], template) return result["id"] def get_strategy_id(self, strategy_name: str): """ 获取策略ID :param strategy_name: 策略名称 :return: 策略ID """ PROJECT_ID = self.project_table['strategy_base'] response = requests.get(f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records", headers=self.headers) response.raise_for_status() data = response.json() records = data["list"] return next((str(record["id"]) for record in records if record.get("text") == strategy_name), None) def get_template_list(self, strategy_id: str) -> any: """ 获取网络配置参数 :param type: :param strategy_id: :param key: 配置参数的键 :return: 配置参数的值 """ PROJECT_ID = self.project_table['strategy_config_template'] url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" params = dict(limit=100, offset=0, where=f"(template_id,eq,{strategy_id})") try: return requests.get(url, headers=self.headers, params=params).json()['list'] except Exception as e: return [] def get_data_list(self, strategy_id: str, type: str) -> any: """ 获取网络配置参数 :param type: :param strategy_id: :param key: 配置参数的键 :return: 配置参数的值 """ # 这里可以添加获取配置逻辑 PROJECT_ID = self.project_table['strategy_config'] url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" params = dict(limit=100, offset=0, where=f"(strategy_id,eq,{strategy_id})") try: result = [] records = requests.get(url, headers=self.headers, params=params).json()['list'] for record in records: if record.get("strategy_id") == str(strategy_id) and record.get("type") == type: result.append(record) return result except Exception as e: return [] def update_config(self, id: str, new_value: str): """ 更新网络配置参数 :return: """ PROJECT_ID = self.project_table['strategy_config'] url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" new_value = { "id": id, "new_value": new_value, "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } update_response = requests.patch(url, headers=self.headers, json=new_value) update_response.raise_for_status() return update_response.json() def insert_config(self, strategy_id: str, template: dict): """ 插入网络配置参数 """ PROJECT_ID = self.project_table['strategy_config'] url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" data = { "strategy_id": strategy_id, "type": "config", "name": template['name'], "data_type": template['data_type'], "data_param": template['data_param'], "memo": template['memo'], "new_value": template['value'], "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } response = requests.post(url, headers=self.headers, data=data) response.raise_for_status() # 检查错误 # def update_check(self, id: str, old_value: str, new_value: str): # """ # 更新网络配置参数 # # :return: # """ # PROJECT_ID = self.project_table['strategy_config'] # url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records/" # new_value = { # "id": id, # "old_value": old_value, # "new_value": new_value, # "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # } # update_response = requests.patch(url, headers=self.headers, json=new_value) # update_response.raise_for_status() # return update_response.json() # # def insert_check(self, strategy_id: str, key: str, value: str): # """ # 插入网络配置参数 # """ # PROJECT_ID = self.project_table['strategy_config'] # url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records" # data = { # "strategy_id": strategy_id, # "type": "check", # "name": key, # "new_value": value, # "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 # # } # response = requests.post(url, headers=self.headers, data=data) # response.raise_for_status() # 检查错误 @staticmethod def insert_logs(strategy_id: str, log_type: str, log_content: str): """ 插入日志记录 :param strategy_id: 策略ID :param log_type: 日志类型 :param log_content: 日志内容 :return: None """ PROJECT_ID = "m8ebfypvbfpk38p" url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records" data = { "strategy_id": strategy_id, "msg_type": log_type, "message": log_content, "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间 } API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 headers = { "xc-token": API_TOKEN } response = requests.post(url, headers=headers, data=data) response.raise_for_status() @staticmethod def insert_state(strategy_id: str,state_str:str,trade_date:str): """ :param strategy_id: :param state_str: :param trade_date: :return: """ PROJECT_ID = "mosii8hqry8xx9b" url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records" data = { "strategy_id": strategy_id, "trade_date": trade_date, # 使用当前时间 "state": state_str } API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建 headers = { "xc-token": API_TOKEN } response = requests.post(url, headers=headers, data=data) response.raise_for_status() class strategy_config: def __init__(self, strategy_name: str, strategy_template_id: str, config_save: network_config, log): """ 基础必要初始化策略配置 """ # 策略名 self.strategy_name = strategy_name # 配置模版id self.template_id = strategy_template_id # 配置变更保存方式 self.connection = config_save # 日志记录器 self.logger = Logger(log) """ 实例化strategy_config类时,初始化必要的属性 """ # 策略ID self.strategy_id = None # 配置对应主键 self.configkey_id = {} # 配置信息 self.config = {} # 核心缓存 self.strategy_state = state() # 初始化配置文件 self.init_from_db() def init_from_db(self): self.strategy_id = self.connection.register_strategy(self.strategy_name, self.template_id) self.logger.info("初始化策略配置", self.strategy_id) self.logger.info(' 注册策略序列:' + str(self.strategy_id), self.strategy_id) config_list = self.connection.get_data_list(self.strategy_id, 'config') self.config = {item['name']: item for item in config_list} self.configkey_id = {item['name']: item['id'] for item in config_list} self.logger.info('确认配置参数:', self.strategy_id) for config in config_list: self.logger.info(f" 参数名: {config['name']}, 值: {config['new_value']}, 备注: {config['memo']}", self.strategy_id) self.logger.info('确认配置参数完毕, 如有错误请立刻停止执行', self.strategy_id) def get_all_configs(self) -> dict: """ 获取所有策略配置参数 :return: 包含所有配置参数的字典 """ return self.config def set_config(self, key: str, value: str) -> None: """ 通过字典语法设置配置项,并自动触发数据库更新 Args: key (str): 配置项名称 value (Any): 配置项的新值 """ id = self.configkey_id.get(key) self.connection.update_config(id, value) self.config[key]['new_value'] = value def get_config(self, key: str): func = conf.config_type(key) return func(key) class hold_data: def __init__(self, data: dict): self.stock = None self.price = None self.amount = None self.cost = None self.profit = None self.profit_rate = None class state: def __init__(self): self.data = {} self.init() # 初始化 def init(self): self.data = { "create_date": "", "position_list": {}, "sell_request_list": {}, "buy_request_list": {}, "high_stock_list": {}, "low_stock_list": {}, "reason_to_sell": "", "history_buy_list": {}, "history_sell_list": {}, } def get_buy_requests(self): return self.data["buy_request_list"] def clear_buy_requests(self): """ 清空买入请求列表 """ self.data["buy_request_list"] = {} def reset(self): self.init() def set_state_datetime(self, key: str, value: datetime) -> None: """ 设置状态值 :param key: 状态键 :param value: 状态值 """ self.data[key] = value def set_hold_stock(self, stock: str, data: dict) -> None: """ 设置持仓股票数据 :param stock: 股票代码 :param data: hold_data 对象 """ hd = hold_data(data) if not isinstance(hd, hold_data): raise ValueError("data must be an instance of hold_data") self.data["position_list"][stock] = hd def set_high_stock(self, stock: str, data: dict) -> None: """ 设置高位股票数据 :param stock: 股票代码 :param data: hold_data 对象 """ hd = hold_data(data) if not isinstance(hd, hold_data): raise ValueError("data must be an instance of hold_data") self.data["high_stock_list"][stock] = hd def set_low_stock(self, stock: str, data: dict) -> None: """ 设置低位股票数据 :param stock: 股票代码 :param data: hold_data 对象 """ hd = hold_data(data) if not isinstance(hd, hold_data): raise ValueError("data must be an instance of hold_data") self.data["low_stock_list"][stock] = hd def set_sell_request(self, stock: str, data: dict, reason: str) -> None: """ 设置卖出请求数据 :param stock: 股票代码 :param data: hold_data 对象 """ hd = hold_data(data) if not isinstance(hd, hold_data): raise ValueError("data must be an instance of hold_data") self.data["sell_request_list"][stock + "_" + reason] = hd def set_buy_request(self, stock: str, data: dict, reason: str) -> None: """ 设置买入请求数据 :param stock: 股票代码 :param data: hold_data 对象 """ hd = hold_data(data) if not isinstance(hd, hold_data): raise ValueError("data must be an instance of hold_data") self.data["buy_request_list"][stock + "_" + reason] = hd def clear_buys_requests(self): """ 清空买入请求列表 """ self.data["buy_request_list"] = {} def get_sell_reason(self): return self.data["reason_to_sell"] def set_sell_reason(self, reason: str): """ 设置卖出原因 :param reason: 卖出原因 """ self.data["reason_to_sell"] = reason def get_sell_requests(self): """ 获取卖出请求列表 :return: 卖出请求列表 """ return self.data["sell_request_list"] def get_yesterday_high_list(self): """ 获取昨日涨停股票列表 :return: 昨日涨停股票列表 """ return self.data["high_stock_list"] def get_hold_list(self): return self.data["position_list"] class conf: @staticmethod def to_int(value: str, params, separator: str = '_'): return int(value) @staticmethod def to_float(value: str, params, separator: str = '_'): return float(value) @staticmethod def to_str(value: str, params, separator: str = '_'): return str(value) @staticmethod def to_bool(value: str, params, separator: str = '_'): return bool(value) @staticmethod def to_list(value: str, separator: str = '_'): return value.split(separator) @staticmethod def config_type(value: str): data = { "int": conf.to_int, "float": conf.to_float, "str": conf.to_str, "bool": conf.to_bool, "list": conf.to_list, } return data[value] class DataHelper: """ 数据操作辅助类 封装了数据接口的调用,包括 get_price 与 history 函数, 并在内部捕获异常、输出中文错误日志,避免重复编写 try/except 代码。 """ @staticmethod def get_price_safe( security: Any, end_date: Any, frequency: str, fields: List[str], count: int, panel: bool = False, skip_paused: bool = True, fq: Optional[str] = None, fill_paused: bool = False ) -> Optional[pd.DataFrame]: """ 安全调用 get_price 数据接口 参数: security: 单只股票代码或股票代码列表 end_date: 数据截止日期 frequency: 数据频率,如 "daily" 或 "1m" fields: 需要获取的数据字段列表(例如 ['open', 'close']) count: 请求数据的记录数 panel: 是否返回面板数据(默认为 False) skip_paused: 是否跳过停牌股票(默认为 True) fq: 复权方式(例如"pre"或"post",默认 None) fill_paused: 是否填充停牌数据(默认为 False) 返回: 返回包含数据的 DataFrame,如果出错则返回 None """ try: # 调用聚宽提供的 get_price 获取数据 df = get_price( security, end_date=end_date, frequency=frequency, fields=fields, count=count, panel=panel, skip_paused=skip_paused, fq=fq, fill_paused=fill_paused ) return df except Exception as e: # 输出中文错误日志,并返回 None # logger.error(f"获取 {security} 的价格数据时出错: {e}") return None @staticmethod def get_current_data(): """ 获取当前市场数据 返回: 当前市场数据的字典,包含股票代码到数据对象的映射 """ try: # 调用聚宽提供的 get_current_data 获取当前数据 return get_current_data() except Exception as e: # Logger.error(f"获取当前市场数据时出错: {e}") return {} @staticmethod def get_history_safe( security: Any, unit: str, field: str, count: int, ) -> Optional[Dict[str, List[float]]]: """ 安全调用 history 数据接口,批量获取历史数据 参数: security: 单只或多只股票代码 unit: 数据单位,例如 "1m" 表示1分钟数据 field: 请求数据字段名称,如 "close"(收盘价) count: 请求历史数据记录数 返回: 返回一个字典,映射股票代码到对应的数据列表;出错则返回 None """ try: # 调用聚宽的 history 函数获取数据 data = history(count, unit=unit, field=field, security_list=security) return data except Exception as e: return None @staticmethod def get_index_stocks(index_code: str) -> List[str]: """ 获取指定指数的成分股列表 参数: index_code: 指数代码,例如 "000300.XSHG" 表示沪深300指数 date: 可选参数,指定查询日期,默认为 None,表示查询最新成分股 返回: 成分股代码列表 """ try: # 调用聚宽的 get_index_stocks 函数获取成分股 return get_index_stocks(index_code) except Exception as e: return [] @staticmethod def get_security_info(security: str) -> Optional[Any]: """ 获取指定股票的安全信息 参数: security: 股票代码,例如 "000001.XSHE" 返回: 股票的安全信息对象,如果出错则返回 None """ try: # 调用聚宽的 get_security_info 函数获取股票信息 return get_security_info(security) except Exception as e: return None @staticmethod def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]: """ 选股模块: 1. 从指定股票池(如 399101.XSHE 指数成分股)中获取初步股票列表; 2. 应用多个过滤器筛选股票(次新股、科创股、ST、停牌、涨跌停等); 3. 基于基本面数据(EPS、市值)排序后返回候选股票列表。 参数: context: 聚宽平台传入的交易上下文对象 返回: 筛选后的候选股票代码列表 """ # 从指定指数中获取初步股票列表 initial_list: List[str] = DataHelper.get_index_stocks(index_stocks) # 依次应用过滤器,筛去不符合条件的股票 initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股 initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票 initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票 initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票 initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票 initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票 # 利用基本面查询获取股票代码和EPS数据,并按照市值升序排序 q = query(valuation.code, indicator.eps) \ .filter(valuation.code.in_(initial_list)) \ .order_by(valuation.market_cap.asc()) df = get_fundamentals(q) stock_list: List[str] = list(df.code) stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大 # 取前2倍目标持仓股票数作为候选池 final_list: List[str] = stock_list[:2 * int(stock_num)] # 查询并输出候选股票的财务信息(如财报日期、营业收入、EPS) if final_list: info_query = query( valuation.code, income.pubDate, income.statDate, income.operating_revenue, indicator.eps ).filter(valuation.code.in_(final_list)) df_info = get_fundamentals(info_query) # for _, row in df_info.iterrows(): # log.info( # f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')},EPS {row.get('eps', 'N/A')}") return final_list @staticmethod def filter_paused_stock(stock_list: List[str]) -> List[str]: """ 过滤停牌的股票 参数: stock_list: 待过滤的股票代码列表 返回: 未停牌的股票代码列表 """ current_data = DataHelper.get_current_data() return [stock for stock in stock_list if not current_data[stock].paused] @staticmethod def filter_st_stock(stock_list: List[str]) -> List[str]: """ 过滤带有 ST 或其他风险标识的股票 参数: stock_list: 待过滤的股票代码列表 返回: 无 ST 或风险标识的股票代码列表 """ current_data = DataHelper.get_current_data() return [stock for stock in stock_list if (not current_data[stock].is_st) and ('ST' not in current_data[stock].name) and ('*' not in current_data[stock].name) and ('退' not in current_data[stock].name)] @staticmethod def filter_kcbj_stock(stock_list: List[str]) -> List[str]: """ 过滤科创、北交股票 参数: stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表(排除以 '4'、'8' 开头以及以 '68' 起始的股票) """ return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')] @staticmethod def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]: """ 过滤当天已经涨停的股票(若未持仓则过滤) 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) current_data = DataHelper.get_current_data() if history_data is None: return stock_list return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or (history_data.get(stock, [0])[-1] < current_data[stock].high_limit)] @staticmethod def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]: """ 过滤当天已经跌停的股票(若未持仓则过滤) 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) current_data = DataHelper.get_current_data() if history_data is None: return stock_list return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or (history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)] @staticmethod def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]: """ 过滤次新股:排除上市时间不足375天的股票 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ yesterday = context.previous_date return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))] @staticmethod def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]: """ 过滤股价高于设定上限(up_price)的股票(非持仓股票参与过滤) 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) if history_data is None: return stock_list return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price] @staticmethod def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]: """ 过滤掉当日已买入的股票,避免重复下单 参数: stock_list: 待过滤的股票代码列表 返回: 未买入的股票代码列表 """ return [stock for stock in stock_list if stock not in not_buy_again] class Logger: """ 简单的日志操作类,支持控制台和文件输出 可以使用现有的logger对象或创建新的实例 """ _instance = None def __new__(cls, existing_logger=None, *args, **kwargs): if existing_logger is not None: # 如果提供了现有的logger对象,创建新实例并使用该对象 instance = super().__new__(cls) instance.logger = existing_logger return instance # 否则使用单例模式 if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self, existing_logger=None, log_file="logs/app.log"): # 如果使用现有logger,直接返回 if existing_logger is not None: return # 如果已经初始化过,直接返回 if hasattr(self, 'logger'): return # 创建logs目录(如果不存在) os.makedirs(os.path.dirname(log_file), exist_ok=True) # 初始化logger self.logger = logging.getLogger('AppLogger') self.logger.setLevel(logging.DEBUG) # 清除可能存在的处理器 self.logger.handlers.clear() # 设置日志格式 formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 添加控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) self.logger.addHandler(console_handler) # 添加文件处理器 file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) def debug(self, message, strategy_id=None): """记录调试级别的日志""" self.logger.debug(message) try: network_config.insert_logs(strategy_id, "debug", message) except: pass def info(self, message, strategy_id=None): """记录信息级别的日志""" self.logger.info(message) try: network_config.insert_logs(strategy_id, "info ", message) except: pass def warning(self, message, strategy_id=None): """记录警告级别的日志""" self.logger.warning(message) try: network_config.insert_logs(strategy_id, "warn", message) except: pass def error(self, message, strategy_id=None): """记录错误级别的日志""" self.logger.error(message) try: network_config.insert_logs(strategy_id, "error", message) except: pass def critical(self, message, strategy_id=None): """记录严重错误级别的日志""" self.logger.critical(message) try: network_config.insert_logs(strategy_id, "critical", message) except: pass class exchange: def close_positions(self, context: Any, position: Any): """ 关闭所有持仓 """ raise NotImplementedError("This method should be implemented by subclasses") def buy_security(self, context: Any, target_list: List[str]) -> None: """ 买入操作:对目标股票执行买入,下单资金均摊分配 """ raise NotImplementedError("This method should be implemented by subclasses") def open_position(self, security: str, value: float) -> bool: """ 开仓操作:尝试买入指定股票 参数: security: 股票代码 value: 分配给该股票的资金 返回: 若下单成功(部分或全部成交)返回 True,否则返回 False """ raise NotImplementedError("This method should be implemented by subclasses") class jq_channel(exchange): def __init__(self): pass def order_target_value_(self, stock: str, value: float) -> Any: """ 封装 order_target_value 函数进行下单,同时记录中文日志和异常信息 参数: security: 股票代码 value: 下单目标金额 返回: 下单后生成的订单对象;若失败返回 None """ try: order = order_target_value(stock, value) return order except Exception as e: return None def close_positions(self, context: Any, position: Any): """ 平仓操作:尽可能将指定股票仓位全部卖出 参数: position: 持仓对象 返回: 若下单后订单全部成交返回 True,否则返回 False """ security = position.security order = self.order_target_value_(security, 0) if order is None: # log.error(f"股票 {security} 下单失败,可能是API调用错误") return False if order.status == OrderStatus.held and order.filled == order.amount: # log.info(f"股票 {security} 卖出订单完全成交") return True else: # log.warning(f"股票 {security} 卖出订单部分成交或未成交,状态: {order.status}, 已成交: {order.filled}/{order.amount}") return False def buy_security(self, context: Any, target_list: List[str]) -> None: """ 买入操作:对目标股票执行买入,下单资金均摊分配 参数: context: 聚宽平台传入的交易上下文对象 target_list: 目标股票代码列表 """ position_count = len(context.portfolio.positions) target_num = len(target_list) if target_num > position_count: try: value = context.portfolio.cash / (target_num - position_count) except ZeroDivisionError as e: # log.error(f"资金分摊时除零错误: {e}") return for stock in target_list: if context.portfolio.positions[stock].total_amount == 0: if self.open_position(stock, value): # log.info(f"已买入股票 {stock},分配资金 {value:.2f}") # self.not_buy_again.append(stock) if len(context.portfolio.positions) == target_num: break def open_position(self, security: str, value: float) -> bool: """ 开仓操作:尝试买入指定股票 参数: security: 股票代码 value: 分配给该股票的资金 返回: 若下单成功(部分或全部成交)返回 True,否则返回 False """ order = self.order_target_value_(security, value) if order is not None and order.filled > 0: return True return False class trade_strategy: def __init__(self, strategy_name, strategy_template_id, config_save, log, channel: exchange): # -------必要参数------- self.log = Logger(log) # 日志记录器 self.strategy_config = strategy_config(strategy_name, strategy_template_id, config_save, self.log) self.state = state() # 策略状态管理器,跟踪策略运行状态 self.channel = channel # 添加持仓监控和管理器 def init_strategy(self) -> None: """ 策略初始化函数 配置交易环境参数,包括防未来数据、基准、滑点、订单成本以及日志输出等级。 参数: context: 聚宽平台传入的交易上下文对象 """ # 启用防未来数据以确保历史回测的严谨性 self.log.info("初始化策略TradingStrategy配置...") set_option('avoid_future_data', True) # 设置策略基准为上证指数 # set_benchmark(self.strategy_config.config["benchmark_stock"]) set_benchmark("000001.XSHG") # 使用真实市场价格,模拟实际交易 set_option('use_real_price', True) # 设置固定滑点,确保价格偏差不大 set_slippage(FixedSlippage(3 / 10000)) # 设置订单成本,包括印花税和佣金 set_order_cost(OrderCost( open_tax=0, close_tax=0.001, # 卖出时0.1%的印花税 open_commission=2.5 / 10000, close_commission=2.5 / 10000, close_today_commission=0, min_commission=5 # 最低佣金为5元 ), type='stock') # 设置日志输出等级(中文日志输出) self.log.logger.set_level('order', 'error') self.log.logger.set_level('system', 'error') self.log.logger.set_level('strategy', 'debug') def set_target_list(self, target_list): self.target_list = target_list.copy() def get_target_list(self): """获取本次调仓候选股票列表""" return self.target_list def get_configs(self) -> dict: """ 获取策略配置 :return: strategy_config 对象 """ return self.strategy_config.get_all_configs() def get_strategy_config(self) -> strategy_config: """ 获取策略配置字典 :return: 包含所有配置参数的字典 """ return self.strategy_config def close_position(self, context, stock: str): """ 平仓操作:尽可能将指定股票仓位全部卖出 参数: position: 持仓对象 返回: 若下单后订单全部成交返回 True,否则返回 False """ self.channel.close_positions(context, stock) def open_position(self, security: str, value: float) -> bool: """ 开仓操作:尝试买入指定股票 参数: security: 股票代码 value: 分配给该股票的资金 返回: 若下单成功(部分或全部成交)返回 True,否则返回 False """ return self.channel.open_position(security, value) class base_task: def __init__(self, strategy: trade_strategy, sub_task=None): self.strategy = strategy self.name = "base_task" self.remark = "base_task" self.memo = "base_task" self.sub_task = sub_task self.strategy_config = self.strategy.get_strategy_config() self.configs = self.strategy.get_configs() self.log = self.strategy.log def init(self, context: Any): pass def config(self, context: Any): pass def run(self, context: Any): pass def handle(self, context: Any): pass def end(self, context: Any): pass def process(self, context: Any): self.init(context) self.config(context) self.run(context) self.handle(context) self.end(context) def get_config(self, key: str): configs = self.configs.get(key) data_type = configs['data_type'] data_params = configs['data_param'] func = conf.config_type(data_type) return func(self.configs[key]['new_value'], data_params) class check_state_before_task(base_task): """ 重置state信息 最前置信息 """ def __init__(self, strategy: trade_strategy, sub_task=None): super().__init__(strategy, sub_task) def init(self, context): self.log.info("--------------------------") self.log.info("重置运行中环境数据") def config(self, context): # 重置state信息 self.strategy.state.reset() self.hold_stock = [position.security for position in list(context.portfolio.positions.values())] self.yesterday_high_list = [] self.yesterday_low_list = [] def run(self, context): positions = context.portfolio.positions for stock in positions: self.strategy.state.set_hold_stock(stock.security, stock.__dict__) if self.hold_stock: # 获取持仓股票昨日数据(包括收盘价、涨停价、跌停价) df = DataHelper.get_price_safe( self.hold_stock, end_date=context.previous_date, frequency='daily', fields=['close', 'high_limit', 'low_limit'], count=1, panel=False, fill_paused=False ) if df is not None and not df.empty: # 过滤出收盘价等于涨停价的股票,作为昨日涨停股票 self.yesterday_high_list = list(df[df['close'] == df['high_limit']]['code']) self.yesterday_low_list = list(df[df['close'] == df['low_limit']]['code']) def handle(self, context): for stock in self.yesterday_high_list: self.strategy.state.set_high_stock(stock['code'], stock) for stock in self.yesterday_low_list: self.strategy.state.set_low_stock(stock['code'], stock) class check_market_env_task(base_task): """ 交易环境确认 1、判断是否为特殊月份(1月/4月) 2、如果是特殊月份且通过了清仓检查,则清仓所有持仓 3、如果不是特殊月份且没有交易信号,则重置交易信号 """ def __init__(self, strategy: trade_strategy): super().__init__(strategy) def config(self, context: Any): self.current_date = context.current_dt.date() self.current_month = self.current_date.month self.current_day = self.current_date.day # 判断是否执行空仓策略 self.filter_month_clear_switch = self.get_config('filter_month_clear_switch') self.filer_month_data = self.get_config('filter_month_data') # 判断是否为特殊月 = True时 等于 执行1月/4月 全部清仓 self.in_special_month = str(self.current_month) in self.filer_month_data self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号 def run(self, context: Any): # if self.filter_month_clear_switch and self.in_special_month: # 进入特殊月份,应清仓 for stock in self.strategy.state.data['position_list'].keys(): self.strategy.state.set_sell_request(stock, self.strategy.state.data['position_list'][stock], f"enter_{self.current_month}_month") # 清仓后就不交易 self.today_trade_switch = False elif not self.filter_month_clear_switch: # 不在特殊月 或 不执行空仓策略 可以交易 self.today_trade_switch = True else: # 如果是特殊月份但没有清仓条件,则继续交易 self.today_trade_switch = True def handle(self, context: Any): # 保存状态 old_today_trade_switch = self.get_config("today_trade_switch") if old_today_trade_switch != self.today_trade_switch: self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量 self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}") def end(self, context: Any): self.log.info("-----------------") self.log.info("一.今日环境确认:") self.log.info(" 当前日期: {}".format(self.current_date)) self.log.info(" 空仓策略: {}".format("开启" if self.filter_month_clear_switch else "关闭")) self.log.info(" 今日交易: {}".format("交易" if self.today_trade_switch else "清仓")) self.log.info(" 特殊月份: {}".format(",".join(str(item) for item in self.filer_month_data))) self.log.info(" 当前月份: {}".format(self.current_month)) # 今日清仓 class check_positions_stop_loss_task(base_task): def __init__(self, strategy: trade_strategy): super().__init__(strategy) def config(self, context: Any): self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略 self.hold_list = self.strategy.state.data['position_list'] # 持仓列表 self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关 self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略 self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值 self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值 # self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因 self.index_stocks = self.get_config('index_stocks') # 指数代码 self.temp_sells_list = {} def run(self, context: Any): if len(self.hold_list) == 0: self.log.debug("当前无持仓,跳过止损检查。") return if self.stoploss_switch and self.today_trade_switch: if self.stoploss_strategy == 1 or self.stoploss_strategy == 3: # 个股止盈或止损判断 for stock in list(context.portfolio.positions.keys()): pos = context.portfolio.positions[stock] if pos.price >= pos.avg_cost * 2: # 通过持仓监控器注册卖出请求,而不是直接卖出 # self.strategy.state.set_sell_request(stock, pos, 'take_profit') self.sell_request(stock, pos, 'take_profit') self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。") elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data: # 通过持仓监控器注册卖出请求,而不是直接卖出 self.strategy.state.set_sell_request(stock, pos, 'stop_loss') self.sell_request(stock, pos, 'take_profit') self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。") # self.strategy.state.set_sell_reason("stoploss") if self.stoploss_strategy == 2 or self.stoploss_strategy == 3: # 大盘止损判断,若整体市场跌幅过大则平仓所有股票 stock_list = DataHelper.get_index_stocks(self.index_stocks) df = DataHelper.get_price_safe( stock_list, end_date=context.previous_date, frequency='daily', fields=['close', 'open'], count=1, panel=False ) if df is not None and not df.empty: down_ratio = (df['close'] / df['open']).mean() if down_ratio <= self.stoploss_market_rate_data: # self.strategy.state.set_sell_reason("stoploss") self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。") for stock in list(context.portfolio.positions.keys()): pos = context.portfolio.positions[stock] # 通过持仓监控器注册卖出请求,而不是直接卖出 self.sell_request(stock, pos, 'market_stop_loss') def handle(self, context: Any): # 保存状态 for key, value in self.temp_sells_list.items(): stock, reason = key.split('_') self.strategy.state.set_sell_request(stock, value, reason) pass def end(self, context: Any): self.log.info("-----------------") self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用")) self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易")) self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if self.stoploss_strategy == 3 else "异常")) self.log.info("当前卖出请求") for key, value in self.temp_sells_list.items(): self.log.info(f" {key}: {value}") # 总卖出请求 sells_list = self.strategy.state.get_sell_requests() if len(sells_list) > 0: self.log.info("总卖出请求:") for stock, data in sells_list.items(): self.log.info(f" {stock}: {data}") pass def sell_request(self, stock, data, reason): self.temp_sells_list[f"{stock}_{reason}"] = data class buy_stocks_func_task(base_task): """ 每周调仓策略: 如果非空仓日,先选股得到目标股票列表,再卖出当前持仓中不在目标列表且昨日未涨停的股票, 最后买入目标股票,同时记录当天买入情况避免重复下单。 参数: context: 聚宽平台传入的交易上下文对象 """ def __init__(self, strategy: trade_strategy): super().__init__(strategy) def config(self, context: Any): # 每次调仓的目标股票数量 self.stock_num = self.get_config('stock_num') # 获取当前持仓列表 self.hold_list = self.strategy.state.get_hold_list() self.index_stocks = self.get_config('index_stocks') self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num) self.temp_target_list = self.target_list[:self.stock_num] # 昨日涨停股票列表 self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list() self.today_trade_switch = self.get_config('today_trade_switch') self.temp_sells_list = {} self.temp_buys_list = {} def run(self, context: Any): if not self.today_trade_switch: self.log.info("今日非交易日,跳过调仓操作。") return # 重置当天已买入记录 # self.position_manager.reset_not_buy_again() # 取目标持仓数以内的股票作为调仓目标 # self.log.info(f"每周调仓目标股票: {self.temp_target_list}") # 遍历当前持仓,若股票不在目标列表,则执行卖出操作 for stock in self.hold_list: if stock not in self.temp_target_list: pos = context.portfolio.positions.get(stock) self.log.info(f"调仓决策:卖出股票 {stock}") # 通过持仓监控器注册卖出请求,而不是直接卖出 self.sell_request(stock, pos, 'rebalance') # self.strategy.state.set_sell_request(stock, pos, 'rebalance') # self.sell_request(stock, pos, 'rebalance') else: self.log.info(f"调仓决策:继续持有股票 {stock}") # 对目标股票执行买入操作前,先将其注册到待买入队列 buy_targets = [stock for stock in self.temp_target_list if stock not in self.hold_list] if buy_targets: for stock in buy_targets: pos = context.portfolio.positions.get(stock) # self.strategy.state.set_buy_request(stock, pos, 'buy') self.buys_request(stock, pos, 'buy') self.log.info(f"调仓决策:将 {len(buy_targets)} 只股票加入待买入队列: {buy_targets}") def handle(self, context: Any): self.strategy.set_target_list(self.temp_target_list) pass def end(self, context: Any): pass # 这里可以添加任何必要的清理或总结操作 def sell_request(self, stock, data, reason): self.temp_sells_list[f"{stock}_{reason}"] = data def buys_request(self, stock, data, reason): self.temp_buys_list[f"{stock}_{reason}"] = data strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel()) def check_state_before_func(context: Any): task = check_state_before_task(strategy) task.process(context) def check_market_env_func(context: Any): task = check_market_env_task(strategy) task.process(context) def check_positions_stop_loss_func(context: Any): task = check_positions_stop_loss_task(strategy) task.process(context) def buy_stocks_func(context: Any): task = buy_stocks_func_task(strategy) task.process(context) def initialize(context: Any) -> None: # # 初始化策略参数 strategy.init_strategy() # 策略初始化函数 # # 注册调度任务 run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态 run_daily(check_market_env_func, time='9:05') run_daily(check_positions_stop_loss_func, time='9:10') run_daily(buy_stocks_func, time='9:15') # 每周调仓 # # # run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票 # run_weekly(buy_stocks_func, 2, time='10:30') # 周二进行调仓 # run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票 # run_daily(process_pending_sells_func, time='13:05') # 午盘开盘后也处理一次卖出 # run_daily(process_pending_buys_func, time='13:15') # 午盘开盘后也处理一次买入 # run_daily(trade_afternoon_func, time='14:30') # run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出 # run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入 # run_daily(close_account_func, time='14:50') # run_weekly(print_position_info_func, 5, time='15:10') # 周五收盘后打印持仓信息 pass # initialize(None) # 调用初始化函数,开始策略执行