easyquant/kit/kit.py
2025-07-05 17:31:46 +08:00

294 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import timedelta
from typing import Any, Optional, List, Dict
import pandas as pd
class DataHelper:
"""
数据操作辅助类
封装了数据接口的调用,包括 get_price 与 history 函数,
并在内部捕获异常、输出中文错误日志,避免重复编写 try/except 代码。
"""
@staticmethod
def get_price_safe(
security: Any,
end_date: Any,
frequency: str,
fields: List[str],
count: int,
panel: bool = False,
skip_paused: bool = True,
fq: Optional[str] = None,
fill_paused: bool = False
) -> Optional[pd.DataFrame]:
"""
安全调用 get_price 数据接口
参数:
security: 单只股票代码或股票代码列表
end_date: 数据截止日期
frequency: 数据频率,如 "daily""1m"
fields: 需要获取的数据字段列表(例如 ['open', 'close']
count: 请求数据的记录数
panel: 是否返回面板数据(默认为 False
skip_paused: 是否跳过停牌股票(默认为 True
fq: 复权方式(例如"pre""post",默认 None
fill_paused: 是否填充停牌数据(默认为 False
返回:
返回包含数据的 DataFrame如果出错则返回 None
"""
try:
# 调用聚宽提供的 get_price 获取数据
df = get_price(
security,
end_date=end_date,
frequency=frequency,
fields=fields,
count=count,
panel=panel,
skip_paused=skip_paused,
fq=fq,
fill_paused=fill_paused
)
return df
except Exception as e:
# 输出中文错误日志,并返回 None
# logger.error(f"获取 {security} 的价格数据时出错: {e}")
return None
@staticmethod
def get_current_data():
"""
获取当前市场数据
返回:
当前市场数据的字典,包含股票代码到数据对象的映射
"""
try:
# 调用聚宽提供的 get_current_data 获取当前数据
return get_current_data()
except Exception as e:
# Logger.error(f"获取当前市场数据时出错: {e}")
return {}
@staticmethod
def get_history_safe(
security: Any,
unit: str,
field: str,
count: int,
) -> Optional[Dict[str, List[float]]]:
"""
安全调用 history 数据接口,批量获取历史数据
参数:
security: 单只或多只股票代码
unit: 数据单位,例如 "1m" 表示1分钟数据
field: 请求数据字段名称,如 "close"(收盘价)
count: 请求历史数据记录数
返回:
返回一个字典,映射股票代码到对应的数据列表;出错则返回 None
"""
try:
# 调用聚宽的 history 函数获取数据
data = history(count, unit=unit, field=field, security_list=security)
return data
except Exception as e:
return None
@staticmethod
def get_index_stocks(index_code: str) -> List[str]:
"""
获取指定指数的成分股列表
参数:
index_code: 指数代码,例如 "000300.XSHG" 表示沪深300指数
date: 可选参数,指定查询日期,默认为 None表示查询最新成分股
返回:
成分股代码列表
"""
try:
# 调用聚宽的 get_index_stocks 函数获取成分股
return get_index_stocks(index_code)
except Exception as e:
return []
@staticmethod
def get_security_info(security: str) -> Optional[Any]:
"""
获取指定股票的安全信息
参数:
security: 股票代码,例如 "000001.XSHE"
返回:
股票的安全信息对象,如果出错则返回 None
"""
try:
# 调用聚宽的 get_security_info 函数获取股票信息
return get_security_info(security)
except Exception as e:
return None
@staticmethod
def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]:
"""
选股模块:
1. 从指定股票池(如 399101.XSHE 指数成分股)中获取初步股票列表;
2. 应用多个过滤器筛选股票次新股、科创股、ST、停牌、涨跌停等
3. 基于基本面数据EPS、市值排序后返回候选股票列表。
参数:
context: 聚宽平台传入的交易上下文对象
返回:
筛选后的候选股票代码列表
"""
# 从指定指数中获取初步股票列表
initial_list: List[str] = DataHelper.get_index_stocks(index_stocks)
# 依次应用过滤器,筛去不符合条件的股票
initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股
initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票
initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票
initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票
initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票
initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票
# 利用基本面查询获取股票代码和EPS数据并按照市值升序排序
q = query(valuation.code, indicator.eps) \
.filter(valuation.code.in_(initial_list)) \
.order_by(valuation.market_cap.asc())
df = get_fundamentals(q)
stock_list: List[str] = list(df.code)
stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大
# 取前2倍目标持仓股票数作为候选池
final_list: List[str] = stock_list[:2 * int(stock_num)]
# 查询并输出候选股票的财务信息如财报日期、营业收入、EPS
if final_list:
info_query = query(
valuation.code,
income.pubDate,
income.statDate,
income.operating_revenue,
indicator.eps
).filter(valuation.code.in_(final_list))
df_info = get_fundamentals(info_query)
# for _, row in df_info.iterrows():
# log.info(
# f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')}EPS {row.get('eps', 'N/A')}")
return final_list
@staticmethod
def filter_paused_stock(stock_list: List[str]) -> List[str]:
"""
过滤停牌的股票
参数:
stock_list: 待过滤的股票代码列表
返回:
未停牌的股票代码列表
"""
current_data = DataHelper.get_current_data()
return [stock for stock in stock_list if not current_data[stock].paused]
@staticmethod
def filter_st_stock(stock_list: List[str]) -> List[str]:
"""
过滤带有 ST 或其他风险标识的股票
参数:
stock_list: 待过滤的股票代码列表
返回:
无 ST 或风险标识的股票代码列表
"""
current_data = DataHelper.get_current_data()
return [stock for stock in stock_list if (not current_data[stock].is_st) and
('ST' not in current_data[stock].name) and
('*' not in current_data[stock].name) and
('退' not in current_data[stock].name)]
@staticmethod
def filter_kcbj_stock(stock_list: List[str]) -> List[str]:
"""
过滤科创、北交股票
参数:
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表(排除以 '4''8' 开头以及以 '68' 起始的股票)
"""
return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')]
@staticmethod
def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤当天已经涨停的股票(若未持仓则过滤)
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
current_data = DataHelper.get_current_data()
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
(history_data.get(stock, [0])[-1] < current_data[stock].high_limit)]
@staticmethod
def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤当天已经跌停的股票(若未持仓则过滤)
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
current_data = DataHelper.get_current_data()
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
(history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)]
@staticmethod
def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤次新股排除上市时间不足375天的股票
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
yesterday = context.previous_date
return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))]
@staticmethod
def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]:
"""
过滤股价高于设定上限up_price的股票非持仓股票参与过滤
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price]
@staticmethod
def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]:
"""
过滤掉当日已买入的股票,避免重复下单
参数:
stock_list: 待过滤的股票代码列表
返回:
未买入的股票代码列表
"""
return [stock for stock in stock_list if stock not in not_buy_again]