easyquant/kit/kit.py

294 lines
12 KiB
Python
Raw Permalink Normal View History

2025-07-03 23:39:31 +08:00
from datetime import timedelta
from typing import Any, Optional, List, Dict
import pandas as pd
class DataHelper:
"""
数据操作辅助类
封装了数据接口的调用包括 get_price history 函数
并在内部捕获异常输出中文错误日志避免重复编写 try/except 代码
"""
@staticmethod
def get_price_safe(
security: Any,
end_date: Any,
frequency: str,
fields: List[str],
count: int,
panel: bool = False,
skip_paused: bool = True,
fq: Optional[str] = None,
fill_paused: bool = False
) -> Optional[pd.DataFrame]:
"""
安全调用 get_price 数据接口
参数:
security: 单只股票代码或股票代码列表
end_date: 数据截止日期
frequency: 数据频率 "daily" "1m"
fields: 需要获取的数据字段列表例如 ['open', 'close']
count: 请求数据的记录数
panel: 是否返回面板数据默认为 False
skip_paused: 是否跳过停牌股票默认为 True
fq: 复权方式例如"pre""post"默认 None
fill_paused: 是否填充停牌数据默认为 False
返回:
返回包含数据的 DataFrame如果出错则返回 None
"""
try:
# 调用聚宽提供的 get_price 获取数据
df = get_price(
security,
end_date=end_date,
frequency=frequency,
fields=fields,
count=count,
panel=panel,
skip_paused=skip_paused,
fq=fq,
fill_paused=fill_paused
)
return df
except Exception as e:
# 输出中文错误日志,并返回 None
# logger.error(f"获取 {security} 的价格数据时出错: {e}")
return None
@staticmethod
def get_current_data():
"""
获取当前市场数据
返回:
当前市场数据的字典包含股票代码到数据对象的映射
"""
try:
# 调用聚宽提供的 get_current_data 获取当前数据
return get_current_data()
except Exception as e:
# Logger.error(f"获取当前市场数据时出错: {e}")
return {}
@staticmethod
def get_history_safe(
security: Any,
unit: str,
field: str,
count: int,
) -> Optional[Dict[str, List[float]]]:
"""
安全调用 history 数据接口批量获取历史数据
参数:
security: 单只或多只股票代码
unit: 数据单位例如 "1m" 表示1分钟数据
field: 请求数据字段名称 "close"收盘价
count: 请求历史数据记录数
返回:
返回一个字典映射股票代码到对应的数据列表出错则返回 None
"""
try:
# 调用聚宽的 history 函数获取数据
data = history(count, unit=unit, field=field, security_list=security)
return data
except Exception as e:
return None
@staticmethod
def get_index_stocks(index_code: str) -> List[str]:
"""
获取指定指数的成分股列表
参数:
index_code: 指数代码例如 "000300.XSHG" 表示沪深300指数
date: 可选参数指定查询日期默认为 None表示查询最新成分股
返回:
成分股代码列表
"""
try:
# 调用聚宽的 get_index_stocks 函数获取成分股
return get_index_stocks(index_code)
except Exception as e:
return []
@staticmethod
def get_security_info(security: str) -> Optional[Any]:
"""
获取指定股票的安全信息
参数:
security: 股票代码例如 "000001.XSHE"
返回:
股票的安全信息对象如果出错则返回 None
"""
try:
# 调用聚宽的 get_security_info 函数获取股票信息
return get_security_info(security)
except Exception as e:
return None
@staticmethod
def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]:
"""
选股模块
1. 从指定股票池 399101.XSHE 指数成分股中获取初步股票列表
2. 应用多个过滤器筛选股票次新股科创股ST停牌涨跌停等
3. 基于基本面数据EPS市值排序后返回候选股票列表
参数:
context: 聚宽平台传入的交易上下文对象
返回:
筛选后的候选股票代码列表
"""
# 从指定指数中获取初步股票列表
initial_list: List[str] = DataHelper.get_index_stocks(index_stocks)
# 依次应用过滤器,筛去不符合条件的股票
initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股
initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票
initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票
initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票
initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票
initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票
# 利用基本面查询获取股票代码和EPS数据并按照市值升序排序
q = query(valuation.code, indicator.eps) \
.filter(valuation.code.in_(initial_list)) \
.order_by(valuation.market_cap.asc())
df = get_fundamentals(q)
stock_list: List[str] = list(df.code)
stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大
# 取前2倍目标持仓股票数作为候选池
final_list: List[str] = stock_list[:2 * int(stock_num)]
# 查询并输出候选股票的财务信息如财报日期、营业收入、EPS
if final_list:
info_query = query(
valuation.code,
income.pubDate,
income.statDate,
income.operating_revenue,
indicator.eps
).filter(valuation.code.in_(final_list))
df_info = get_fundamentals(info_query)
# for _, row in df_info.iterrows():
# log.info(
# f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')}EPS {row.get('eps', 'N/A')}")
return final_list
@staticmethod
def filter_paused_stock(stock_list: List[str]) -> List[str]:
"""
过滤停牌的股票
参数:
stock_list: 待过滤的股票代码列表
返回:
未停牌的股票代码列表
"""
current_data = DataHelper.get_current_data()
return [stock for stock in stock_list if not current_data[stock].paused]
@staticmethod
def filter_st_stock(stock_list: List[str]) -> List[str]:
"""
过滤带有 ST 或其他风险标识的股票
参数:
stock_list: 待过滤的股票代码列表
返回:
ST 或风险标识的股票代码列表
"""
current_data = DataHelper.get_current_data()
return [stock for stock in stock_list if (not current_data[stock].is_st) and
('ST' not in current_data[stock].name) and
('*' not in current_data[stock].name) and
('退' not in current_data[stock].name)]
@staticmethod
def filter_kcbj_stock(stock_list: List[str]) -> List[str]:
"""
过滤科创北交股票
参数:
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表排除以 '4''8' 开头以及以 '68' 起始的股票
"""
return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')]
@staticmethod
def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤当天已经涨停的股票若未持仓则过滤
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
current_data = DataHelper.get_current_data()
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
(history_data.get(stock, [0])[-1] < current_data[stock].high_limit)]
@staticmethod
def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤当天已经跌停的股票若未持仓则过滤
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
current_data = DataHelper.get_current_data()
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
(history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)]
@staticmethod
def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤次新股排除上市时间不足375天的股票
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
yesterday = context.previous_date
return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))]
@staticmethod
def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]:
"""
过滤股价高于设定上限up_price的股票非持仓股票参与过滤
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price]
@staticmethod
def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]:
"""
过滤掉当日已买入的股票避免重复下单
参数:
stock_list: 待过滤的股票代码列表
返回:
未买入的股票代码列表
"""
2025-07-05 17:31:46 +08:00
return [stock for stock in stock_list if stock not in not_buy_again]