from datetime import timedelta from typing import Any, Optional, List, Dict import pandas as pd class DataHelper: """ 数据操作辅助类 封装了数据接口的调用,包括 get_price 与 history 函数, 并在内部捕获异常、输出中文错误日志,避免重复编写 try/except 代码。 """ @staticmethod def get_price_safe( security: Any, end_date: Any, frequency: str, fields: List[str], count: int, panel: bool = False, skip_paused: bool = True, fq: Optional[str] = None, fill_paused: bool = False ) -> Optional[pd.DataFrame]: """ 安全调用 get_price 数据接口 参数: security: 单只股票代码或股票代码列表 end_date: 数据截止日期 frequency: 数据频率,如 "daily" 或 "1m" fields: 需要获取的数据字段列表(例如 ['open', 'close']) count: 请求数据的记录数 panel: 是否返回面板数据(默认为 False) skip_paused: 是否跳过停牌股票(默认为 True) fq: 复权方式(例如"pre"或"post",默认 None) fill_paused: 是否填充停牌数据(默认为 False) 返回: 返回包含数据的 DataFrame,如果出错则返回 None """ try: # 调用聚宽提供的 get_price 获取数据 df = get_price( security, end_date=end_date, frequency=frequency, fields=fields, count=count, panel=panel, skip_paused=skip_paused, fq=fq, fill_paused=fill_paused ) return df except Exception as e: # 输出中文错误日志,并返回 None # logger.error(f"获取 {security} 的价格数据时出错: {e}") return None @staticmethod def get_current_data(): """ 获取当前市场数据 返回: 当前市场数据的字典,包含股票代码到数据对象的映射 """ try: # 调用聚宽提供的 get_current_data 获取当前数据 return get_current_data() except Exception as e: # Logger.error(f"获取当前市场数据时出错: {e}") return {} @staticmethod def get_history_safe( security: Any, unit: str, field: str, count: int, ) -> Optional[Dict[str, List[float]]]: """ 安全调用 history 数据接口,批量获取历史数据 参数: security: 单只或多只股票代码 unit: 数据单位,例如 "1m" 表示1分钟数据 field: 请求数据字段名称,如 "close"(收盘价) count: 请求历史数据记录数 返回: 返回一个字典,映射股票代码到对应的数据列表;出错则返回 None """ try: # 调用聚宽的 history 函数获取数据 data = history(count, unit=unit, field=field, security_list=security) return data except Exception as e: return None @staticmethod def get_index_stocks(index_code: str) -> List[str]: """ 获取指定指数的成分股列表 参数: index_code: 指数代码,例如 "000300.XSHG" 表示沪深300指数 date: 可选参数,指定查询日期,默认为 None,表示查询最新成分股 返回: 成分股代码列表 """ try: # 调用聚宽的 get_index_stocks 函数获取成分股 return get_index_stocks(index_code) except Exception as e: return [] @staticmethod def get_security_info(security: str) -> Optional[Any]: """ 获取指定股票的安全信息 参数: security: 股票代码,例如 "000001.XSHE" 返回: 股票的安全信息对象,如果出错则返回 None """ try: # 调用聚宽的 get_security_info 函数获取股票信息 return get_security_info(security) except Exception as e: return None @staticmethod def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]: """ 选股模块: 1. 从指定股票池(如 399101.XSHE 指数成分股)中获取初步股票列表; 2. 应用多个过滤器筛选股票(次新股、科创股、ST、停牌、涨跌停等); 3. 基于基本面数据(EPS、市值)排序后返回候选股票列表。 参数: context: 聚宽平台传入的交易上下文对象 返回: 筛选后的候选股票代码列表 """ # 从指定指数中获取初步股票列表 initial_list: List[str] = DataHelper.get_index_stocks(index_stocks) # 依次应用过滤器,筛去不符合条件的股票 initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股 initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票 initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票 initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票 initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票 initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票 # 利用基本面查询获取股票代码和EPS数据,并按照市值升序排序 q = query(valuation.code, indicator.eps) \ .filter(valuation.code.in_(initial_list)) \ .order_by(valuation.market_cap.asc()) df = get_fundamentals(q) stock_list: List[str] = list(df.code) stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大 # 取前2倍目标持仓股票数作为候选池 final_list: List[str] = stock_list[:2 * int(stock_num)] # 查询并输出候选股票的财务信息(如财报日期、营业收入、EPS) if final_list: info_query = query( valuation.code, income.pubDate, income.statDate, income.operating_revenue, indicator.eps ).filter(valuation.code.in_(final_list)) df_info = get_fundamentals(info_query) # for _, row in df_info.iterrows(): # log.info( # f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')},EPS {row.get('eps', 'N/A')}") return final_list @staticmethod def filter_paused_stock(stock_list: List[str]) -> List[str]: """ 过滤停牌的股票 参数: stock_list: 待过滤的股票代码列表 返回: 未停牌的股票代码列表 """ current_data = DataHelper.get_current_data() return [stock for stock in stock_list if not current_data[stock].paused] @staticmethod def filter_st_stock(stock_list: List[str]) -> List[str]: """ 过滤带有 ST 或其他风险标识的股票 参数: stock_list: 待过滤的股票代码列表 返回: 无 ST 或风险标识的股票代码列表 """ current_data = DataHelper.get_current_data() return [stock for stock in stock_list if (not current_data[stock].is_st) and ('ST' not in current_data[stock].name) and ('*' not in current_data[stock].name) and ('退' not in current_data[stock].name)] @staticmethod def filter_kcbj_stock(stock_list: List[str]) -> List[str]: """ 过滤科创、北交股票 参数: stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表(排除以 '4'、'8' 开头以及以 '68' 起始的股票) """ return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')] @staticmethod def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]: """ 过滤当天已经涨停的股票(若未持仓则过滤) 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) current_data = DataHelper.get_current_data() if history_data is None: return stock_list return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or (history_data.get(stock, [0])[-1] < current_data[stock].high_limit)] @staticmethod def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]: """ 过滤当天已经跌停的股票(若未持仓则过滤) 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) current_data = DataHelper.get_current_data() if history_data is None: return stock_list return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or (history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)] @staticmethod def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]: """ 过滤次新股:排除上市时间不足375天的股票 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ yesterday = context.previous_date return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))] @staticmethod def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]: """ 过滤股价高于设定上限(up_price)的股票(非持仓股票参与过滤) 参数: context: 交易上下文对象 stock_list: 待过滤的股票代码列表 返回: 过滤后的股票代码列表 """ history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1) if history_data is None: return stock_list return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price] @staticmethod def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]: """ 过滤掉当日已买入的股票,避免重复下单 参数: stock_list: 待过滤的股票代码列表 返回: 未买入的股票代码列表 """ return [stock for stock in stock_list if stock not in not_buy_again]