easyquant/test.py
2025-07-08 23:00:32 +08:00

1567 lines
57 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
from datetime import timedelta
import datetime
import requests
from typing import Any, Optional, List, Dict
import pandas as pd
class network_config:
def __init__(self):
"""
初始化网络配置
"""
self.BASE_URL = "http://101.34.79.70:8081" # 替换为你的 NocoDB 地址
API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建
self.headers = {
"xc-token": API_TOKEN
}
self.project_table = {
"strategy_config": "mw1zi3dr5sy03em", # 查看项目URL获取
"strategy_base": "mtvgo6pbrlmbgjc",
"strategy_config_template": "m4nty4o5t6sxzts",
"strategy_logs": "m8ebfypvbfpk38p",
"today_state": "mosii8hqry8xx9b"
}
pass
def register_strategy(self, strategy_name: str, template_id: str = ""):
"""
注册策略
:param strategy_name: 策略名称
:return: None
"""
PROJECT_ID = self.project_table['strategy_base']
url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records"
data = {
"text": strategy_name,
"memo": template_id,
"create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间
"modify_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间
}
response = requests.post(url, headers=self.headers, data=data)
response.raise_for_status() # 检查错误
result = response.json()
template_list = self.get_template_list(template_id)
for template in template_list:
self.insert_config(result["id"], template)
return result["id"]
def get_strategy_id(self, strategy_name: str):
"""
获取策略ID
:param strategy_name: 策略名称
:return: 策略ID
"""
PROJECT_ID = self.project_table['strategy_base']
response = requests.get(f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records", headers=self.headers)
response.raise_for_status()
data = response.json()
records = data["list"]
return next((str(record["id"]) for record in records if record.get("text") == strategy_name), None)
def get_template_list(self, strategy_id: str) -> any:
"""
获取网络配置参数
:param type:
:param strategy_id:
:param key: 配置参数的键
:return: 配置参数的值
"""
PROJECT_ID = self.project_table['strategy_config_template']
url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records"
params = dict(limit=100, offset=0, where=f"(template_id,eq,{strategy_id})")
try:
return requests.get(url, headers=self.headers, params=params).json()['list']
except Exception as e:
return []
def get_data_list(self, strategy_id: str, type: str) -> any:
"""
获取网络配置参数
:param type:
:param strategy_id:
:param key: 配置参数的键
:return: 配置参数的值
"""
# 这里可以添加获取配置逻辑
PROJECT_ID = self.project_table['strategy_config']
url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records"
params = dict(limit=100, offset=0, where=f"(strategy_id,eq,{strategy_id})")
try:
result = []
records = requests.get(url, headers=self.headers, params=params).json()['list']
for record in records:
if record.get("strategy_id") == str(strategy_id) and record.get("type") == type:
result.append(record)
return result
except Exception as e:
return []
def update_config(self, id: str, new_value: str):
"""
更新网络配置参数
:return:
"""
PROJECT_ID = self.project_table['strategy_config']
url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records"
new_value = {
"id": id,
"new_value": new_value,
"create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
update_response = requests.patch(url, headers=self.headers, json=new_value)
update_response.raise_for_status()
return update_response.json()
def insert_config(self, strategy_id: str, template: dict):
"""
插入网络配置参数
"""
PROJECT_ID = self.project_table['strategy_config']
url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records"
data = {
"strategy_id": strategy_id,
"type": "config",
"name": template['name'],
"data_type": template['data_type'],
"data_param": template['data_param'],
"memo": template['memo'],
"new_value": template['value'],
"create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
response = requests.post(url, headers=self.headers, data=data)
response.raise_for_status() # 检查错误
# def update_check(self, id: str, old_value: str, new_value: str):
# """
# 更新网络配置参数
#
# :return:
# """
# PROJECT_ID = self.project_table['strategy_config']
# url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records/"
# new_value = {
# "id": id,
# "old_value": old_value,
# "new_value": new_value,
# "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# }
# update_response = requests.patch(url, headers=self.headers, json=new_value)
# update_response.raise_for_status()
# return update_response.json()
#
# def insert_check(self, strategy_id: str, key: str, value: str):
# """
# 插入网络配置参数
# """
# PROJECT_ID = self.project_table['strategy_config']
# url = f"{self.BASE_URL}/api/v2/tables/{PROJECT_ID}/records"
# data = {
# "strategy_id": strategy_id,
# "type": "check",
# "name": key,
# "new_value": value,
# "create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间
#
# }
# response = requests.post(url, headers=self.headers, data=data)
# response.raise_for_status() # 检查错误
@staticmethod
def insert_logs(strategy_id: str, log_type: str, log_content: str):
"""
插入日志记录
:param strategy_id: 策略ID
:param log_type: 日志类型
:param log_content: 日志内容
:return: None
"""
PROJECT_ID = "m8ebfypvbfpk38p"
url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records"
data = {
"strategy_id": strategy_id,
"msg_type": log_type,
"message": log_content,
"create_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用当前时间
}
API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建
headers = {
"xc-token": API_TOKEN
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
@staticmethod
def insert_state(strategy_id: str, state_str: str, trade_date: str):
"""
:param strategy_id:
:param state_str:
:param trade_date:
:return:
"""
PROJECT_ID = "mosii8hqry8xx9b"
url = f"http://101.34.79.70:8081/api/v2/tables/{PROJECT_ID}/records"
data = {
"strategy_id": strategy_id,
"trade_date": trade_date, # 使用当前时间
"state": state_str
}
API_TOKEN = "mgIUTo_BATQnVPE3czsHLOLLgcQmnVv1qs7gJTIO" # 在用户设置中创建
headers = {
"xc-token": API_TOKEN
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
class strategy_config:
def __init__(self,
strategy_name: str,
strategy_template_id: str,
config_save: network_config,
log):
"""
基础必要初始化策略配置
"""
# 策略名
self.strategy_name = strategy_name
# 配置模版id
self.template_id = strategy_template_id
# 配置变更保存方式
self.connection = config_save
# 日志记录器
self.logger = Logger(log)
"""
实例化strategy_config类时初始化必要的属性
"""
# 策略ID
self.strategy_id = None
# 配置对应主键
self.configkey_id = {}
# 配置信息
self.config = {}
# 核心缓存
self.strategy_state = state()
# 初始化配置文件
self.init_from_db()
def init_from_db(self):
self.strategy_id = self.connection.register_strategy(self.strategy_name, self.template_id)
self.logger.info("初始化策略配置", self.strategy_id)
self.logger.info(' 注册策略序列:' + str(self.strategy_id), self.strategy_id)
config_list = self.connection.get_data_list(self.strategy_id, 'config')
self.config = {item['name']: item for item in config_list}
self.configkey_id = {item['name']: item['id'] for item in config_list}
self.logger.info('确认配置参数:', self.strategy_id)
for config in config_list:
self.logger.info(f" 参数名: {config['name']}, 值: {config['new_value']}, 备注: {config['memo']}", self.strategy_id)
self.logger.info('确认配置参数完毕, 如有错误请立刻停止执行', self.strategy_id)
def get_all_configs(self) -> dict:
"""
获取所有策略配置参数
:return: 包含所有配置参数的字典
"""
return self.config
def set_config(self, key: str, value: str) -> None:
"""
通过字典语法设置配置项,并自动触发数据库更新
Args:
key (str): 配置项名称
value (Any): 配置项的新值
"""
id = self.configkey_id.get(key)
self.connection.update_config(id, value)
self.config[key]['new_value'] = value
def get_config(self, key: str):
func = conf.config_type(key)
return func(key)
class hold_data:
def __init__(self, data: dict):
self.stock = None
self.price = None
self.amount = None
self.cost = None
self.profit = None
self.profit_rate = None
class state:
def __init__(self):
self.data = {}
self.init()
# 初始化
def init(self):
self.data = {
"create_date": "",
"position_list": {},
"sell_request_list": {},
"buy_request_list": {},
"high_stock_list": {},
"low_stock_list": {},
"reason_to_sell": "",
"history_buy_list": {},
"history_sell_list": {},
}
def get_buy_requests(self):
return self.data["buy_request_list"]
def clear_buy_requests(self):
"""
清空买入请求列表
"""
self.data["buy_request_list"] = {}
def reset(self):
self.init()
def set_state_datetime(self, key: str, value: datetime) -> None:
"""
设置状态值
:param key: 状态键
:param value: 状态值
"""
self.data[key] = value
def set_hold_stock(self, stock: str, data: dict) -> None:
"""
设置持仓股票数据
:param stock: 股票代码
:param data: hold_data 对象
"""
hd = hold_data(data)
if not isinstance(hd, hold_data):
raise ValueError("data must be an instance of hold_data")
self.data["position_list"][stock] = hd
def set_high_stock(self, stock: str, data: dict) -> None:
"""
设置高位股票数据
:param stock: 股票代码
:param data: hold_data 对象
"""
hd = hold_data(data)
if not isinstance(hd, hold_data):
raise ValueError("data must be an instance of hold_data")
self.data["high_stock_list"][stock] = hd
def set_low_stock(self, stock: str, data: dict) -> None:
"""
设置低位股票数据
:param stock: 股票代码
:param data: hold_data 对象
"""
hd = hold_data(data)
if not isinstance(hd, hold_data):
raise ValueError("data must be an instance of hold_data")
self.data["low_stock_list"][stock] = hd
def set_sell_request(self, stock: str, data: dict, reason: str) -> None:
"""
设置卖出请求数据
:param stock: 股票代码
:param data: hold_data 对象
"""
hd = hold_data(data)
if not isinstance(hd, hold_data):
raise ValueError("data must be an instance of hold_data")
self.data["sell_request_list"][stock + "_" + reason] = hd
def set_buy_request(self, stock: str, data: dict, reason: str) -> None:
"""
设置买入请求数据
:param stock: 股票代码
:param data: hold_data 对象
"""
hd = hold_data(data)
if not isinstance(hd, hold_data):
raise ValueError("data must be an instance of hold_data")
self.data["buy_request_list"][stock + "_" + reason] = hd
def clear_buys_requests(self):
"""
清空买入请求列表
"""
self.data["buy_request_list"] = {}
def get_sell_reason(self):
return self.data["reason_to_sell"]
def set_sell_reason(self, reason: str):
"""
设置卖出原因
:param reason: 卖出原因
"""
self.data["reason_to_sell"] = reason
def get_sell_requests(self):
"""
获取卖出请求列表
:return: 卖出请求列表
"""
return self.data["sell_request_list"]
def get_yesterday_high_list(self):
"""
获取昨日涨停股票列表
:return: 昨日涨停股票列表
"""
return self.data["high_stock_list"]
def get_hold_list(self):
return self.data["position_list"]
class conf:
@staticmethod
def to_int(value: str, params, separator: str = '_'):
return int(value)
@staticmethod
def to_float(value: str, params, separator: str = '_'):
return float(value)
@staticmethod
def to_str(value: str, params, separator: str = '_'):
return str(value)
@staticmethod
def to_bool(value: str, params, separator: str = '_'):
return bool(value)
@staticmethod
def to_list(value: str, separator: str = '_'):
return value.split(separator)
@staticmethod
def config_type(value: str):
data = {
"int": conf.to_int,
"float": conf.to_float,
"str": conf.to_str,
"bool": conf.to_bool,
"list": conf.to_list,
}
return data[value]
class DataHelper:
"""
数据操作辅助类
封装了数据接口的调用,包括 get_price 与 history 函数,
并在内部捕获异常、输出中文错误日志,避免重复编写 try/except 代码。
"""
@staticmethod
def get_price_safe(
security: Any,
end_date: Any,
frequency: str,
fields: List[str],
count: int,
panel: bool = False,
skip_paused: bool = True,
fq: Optional[str] = None,
fill_paused: bool = False
) -> Optional[pd.DataFrame]:
"""
安全调用 get_price 数据接口
参数:
security: 单只股票代码或股票代码列表
end_date: 数据截止日期
frequency: 数据频率,如 "daily""1m"
fields: 需要获取的数据字段列表(例如 ['open', 'close']
count: 请求数据的记录数
panel: 是否返回面板数据(默认为 False
skip_paused: 是否跳过停牌股票(默认为 True
fq: 复权方式(例如"pre""post",默认 None
fill_paused: 是否填充停牌数据(默认为 False
返回:
返回包含数据的 DataFrame如果出错则返回 None
"""
try:
# 调用聚宽提供的 get_price 获取数据
df = get_price(
security,
end_date=end_date,
frequency=frequency,
fields=fields,
count=count,
panel=panel,
skip_paused=skip_paused,
fq=fq,
fill_paused=fill_paused
)
return df
except Exception as e:
# 输出中文错误日志,并返回 None
# logger.error(f"获取 {security} 的价格数据时出错: {e}")
return None
@staticmethod
def get_current_data():
"""
获取当前市场数据
返回:
当前市场数据的字典,包含股票代码到数据对象的映射
"""
try:
# 调用聚宽提供的 get_current_data 获取当前数据
return get_current_data()
except Exception as e:
# Logger.error(f"获取当前市场数据时出错: {e}")
return {}
@staticmethod
def get_history_safe(
security: Any,
unit: str,
field: str,
count: int,
) -> Optional[Dict[str, List[float]]]:
"""
安全调用 history 数据接口,批量获取历史数据
参数:
security: 单只或多只股票代码
unit: 数据单位,例如 "1m" 表示1分钟数据
field: 请求数据字段名称,如 "close"(收盘价)
count: 请求历史数据记录数
返回:
返回一个字典,映射股票代码到对应的数据列表;出错则返回 None
"""
try:
# 调用聚宽的 history 函数获取数据
data = history(count, unit=unit, field=field, security_list=security)
return data
except Exception as e:
return None
@staticmethod
def get_index_stocks(index_code: str) -> List[str]:
"""
获取指定指数的成分股列表
参数:
index_code: 指数代码,例如 "000300.XSHG" 表示沪深300指数
date: 可选参数,指定查询日期,默认为 None表示查询最新成分股
返回:
成分股代码列表
"""
try:
# 调用聚宽的 get_index_stocks 函数获取成分股
return get_index_stocks(index_code)
except Exception as e:
return []
@staticmethod
def get_security_info(security: str) -> Optional[Any]:
"""
获取指定股票的安全信息
参数:
security: 股票代码,例如 "000001.XSHE"
返回:
股票的安全信息对象,如果出错则返回 None
"""
try:
# 调用聚宽的 get_security_info 函数获取股票信息
return get_security_info(security)
except Exception as e:
return None
@staticmethod
def get_stock_list(context: Any, index_stocks: str, stock_num: int) -> List[str]:
"""
选股模块:
1. 从指定股票池(如 399101.XSHE 指数成分股)中获取初步股票列表;
2. 应用多个过滤器筛选股票次新股、科创股、ST、停牌、涨跌停等
3. 基于基本面数据EPS、市值排序后返回候选股票列表。
参数:
context: 聚宽平台传入的交易上下文对象
返回:
筛选后的候选股票代码列表
"""
# 从指定指数中获取初步股票列表
initial_list: List[str] = DataHelper.get_index_stocks(index_stocks)
# 依次应用过滤器,筛去不符合条件的股票
initial_list = DataHelper.filter_new_stock(context, initial_list) # 过滤次新股
initial_list = DataHelper.filter_kcbj_stock(initial_list) # 过滤科创/北交股票
initial_list = DataHelper.filter_st_stock(initial_list) # 过滤ST或风险股票
initial_list = DataHelper.filter_paused_stock(initial_list) # 过滤停牌股票
initial_list = DataHelper.filter_limitup_stock(context, initial_list) # 过滤当日涨停(未持仓时)的股票
initial_list = DataHelper.filter_limitdown_stock(context, initial_list) # 过滤当日跌停(未持仓时)的股票
# 利用基本面查询获取股票代码和EPS数据并按照市值升序排序
q = query(valuation.code, indicator.eps) \
.filter(valuation.code.in_(initial_list)) \
.order_by(valuation.market_cap.asc())
df = get_fundamentals(q)
stock_list: List[str] = list(df.code)
stock_list = stock_list[:50] # 限制数据规模,防止一次处理数据过大
# 取前2倍目标持仓股票数作为候选池
final_list: List[str] = stock_list[:2 * int(stock_num)]
# 查询并输出候选股票的财务信息如财报日期、营业收入、EPS
if final_list:
info_query = query(
valuation.code,
income.pubDate,
income.statDate,
income.operating_revenue,
indicator.eps
).filter(valuation.code.in_(final_list))
df_info = get_fundamentals(info_query)
# for _, row in df_info.iterrows():
# log.info(
# f"股票 {row['code']}:报告日期 {row.get('pubDate', 'N/A')},统计日期 {row.get('statDate', 'N/A')},营业收入 {row.get('operating_revenue', 'N/A')}EPS {row.get('eps', 'N/A')}")
return final_list
@staticmethod
def filter_paused_stock(stock_list: List[str]) -> List[str]:
"""
过滤停牌的股票
参数:
stock_list: 待过滤的股票代码列表
返回:
未停牌的股票代码列表
"""
current_data = DataHelper.get_current_data()
return [stock for stock in stock_list if not current_data[stock].paused]
@staticmethod
def filter_st_stock(stock_list: List[str]) -> List[str]:
"""
过滤带有 ST 或其他风险标识的股票
参数:
stock_list: 待过滤的股票代码列表
返回:
无 ST 或风险标识的股票代码列表
"""
current_data = DataHelper.get_current_data()
return [stock for stock in stock_list if (not current_data[stock].is_st) and
('ST' not in current_data[stock].name) and
('*' not in current_data[stock].name) and
('退' not in current_data[stock].name)]
@staticmethod
def filter_kcbj_stock(stock_list: List[str]) -> List[str]:
"""
过滤科创、北交股票
参数:
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表(排除以 '4''8' 开头以及以 '68' 起始的股票)
"""
return [stock for stock in stock_list if stock[0] not in ('4', '8') and not stock.startswith('68')]
@staticmethod
def filter_limitup_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤当天已经涨停的股票(若未持仓则过滤)
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
current_data = DataHelper.get_current_data()
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
(history_data.get(stock, [0])[-1] < current_data[stock].high_limit)]
@staticmethod
def filter_limitdown_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤当天已经跌停的股票(若未持仓则过滤)
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
current_data = DataHelper.get_current_data()
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
(history_data.get(stock, [float('inf')])[-1] > current_data[stock].low_limit)]
@staticmethod
def filter_new_stock(context: Any, stock_list: List[str]) -> List[str]:
"""
过滤次新股排除上市时间不足375天的股票
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
yesterday = context.previous_date
return [stock for stock in stock_list if not (yesterday - DataHelper.get_security_info(stock).start_date < timedelta(days=375))]
@staticmethod
def filter_highprice_stock(self, context: Any, stock_list: List[str]) -> List[str]:
"""
过滤股价高于设定上限up_price的股票非持仓股票参与过滤
参数:
context: 交易上下文对象
stock_list: 待过滤的股票代码列表
返回:
过滤后的股票代码列表
"""
history_data = DataHelper.get_history_safe(stock_list, unit='1m', field='close', count=1)
if history_data is None:
return stock_list
return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or
history_data.get(stock, [self.up_price + 1])[-1] <= self.up_price]
@staticmethod
def filter_not_buy_again(stock_list: List[str], not_buy_again) -> List[str]:
"""
过滤掉当日已买入的股票,避免重复下单
参数:
stock_list: 待过滤的股票代码列表
返回:
未买入的股票代码列表
"""
return [stock for stock in stock_list if stock not in not_buy_again]
class Logger:
"""
简单的日志操作类,支持控制台和文件输出
可以使用现有的logger对象或创建新的实例
"""
_instance = None
def __new__(cls, existing_logger=None, *args, **kwargs):
if existing_logger is not None:
# 如果提供了现有的logger对象创建新实例并使用该对象
instance = super().__new__(cls)
instance.logger = existing_logger
return instance
# 否则使用单例模式
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, existing_logger=None, log_file="logs/app.log"):
# 如果使用现有logger直接返回
if existing_logger is not None:
return
# 如果已经初始化过,直接返回
if hasattr(self, 'logger'):
return
# 创建logs目录如果不存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 初始化logger
self.logger = logging.getLogger('AppLogger')
self.logger.setLevel(logging.DEBUG)
# 清除可能存在的处理器
self.logger.handlers.clear()
# 设置日志格式
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 添加控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# 添加文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def debug(self, message, strategy_id=None):
"""记录调试级别的日志"""
self.logger.debug(message)
try:
network_config.insert_logs(strategy_id, "debug", message)
except:
pass
def info(self, message, strategy_id=None):
"""记录信息级别的日志"""
self.logger.info(message)
try:
network_config.insert_logs(strategy_id, "info ", message)
except:
pass
def warning(self, message, strategy_id=None):
"""记录警告级别的日志"""
self.logger.warning(message)
try:
network_config.insert_logs(strategy_id, "warn", message)
except:
pass
def error(self, message, strategy_id=None):
"""记录错误级别的日志"""
self.logger.error(message)
try:
network_config.insert_logs(strategy_id, "error", message)
except:
pass
def critical(self, message, strategy_id=None):
"""记录严重错误级别的日志"""
self.logger.critical(message)
try:
network_config.insert_logs(strategy_id, "critical", message)
except:
pass
class exchange:
def close_positions(self, context: Any, position: Any):
"""
关闭所有持仓
"""
raise NotImplementedError("This method should be implemented by subclasses")
def buy_security(self, context: Any, target_list: List[str]) -> None:
"""
买入操作:对目标股票执行买入,下单资金均摊分配
"""
raise NotImplementedError("This method should be implemented by subclasses")
def open_position(self, security: str, value: float) -> bool:
"""
开仓操作:尝试买入指定股票
参数:
security: 股票代码
value: 分配给该股票的资金
返回:
若下单成功(部分或全部成交)返回 True否则返回 False
"""
raise NotImplementedError("This method should be implemented by subclasses")
class jq_channel(exchange):
def __init__(self):
pass
def order_target_value_(self, stock: str, value: float) -> Any:
"""
封装 order_target_value 函数进行下单,同时记录中文日志和异常信息
参数:
security: 股票代码
value: 下单目标金额
返回:
下单后生成的订单对象;若失败返回 None
"""
try:
order = order_target_value(stock, value)
return order
except Exception as e:
return None
def close_positions(self, context: Any, position: Any):
"""
平仓操作:尽可能将指定股票仓位全部卖出
参数:
position: 持仓对象
返回:
若下单后订单全部成交返回 True否则返回 False
"""
security = position.security
order = self.order_target_value_(security, 0)
if order is None:
# log.error(f"股票 {security} 下单失败可能是API调用错误")
return False
if order.status == OrderStatus.held and order.filled == order.amount:
# log.info(f"股票 {security} 卖出订单完全成交")
return True
else:
# log.warning(f"股票 {security} 卖出订单部分成交或未成交,状态: {order.status}, 已成交: {order.filled}/{order.amount}")
return False
def buy_security(self, context: Any, target_list: List[str]) -> None:
"""
买入操作:对目标股票执行买入,下单资金均摊分配
参数:
context: 聚宽平台传入的交易上下文对象
target_list: 目标股票代码列表
"""
position_count = len(context.portfolio.positions)
target_num = len(target_list)
if target_num > position_count:
try:
value = context.portfolio.cash / (target_num - position_count)
except ZeroDivisionError as e:
# log.error(f"资金分摊时除零错误: {e}")
return
for stock in target_list:
if context.portfolio.positions[stock].total_amount == 0:
if self.open_position(stock, value):
# log.info(f"已买入股票 {stock},分配资金 {value:.2f}")
# self.not_buy_again.append(stock)
if len(context.portfolio.positions) == target_num:
break
def open_position(self, security: str, value: float) -> bool:
"""
开仓操作:尝试买入指定股票
参数:
security: 股票代码
value: 分配给该股票的资金
返回:
若下单成功(部分或全部成交)返回 True否则返回 False
"""
order = self.order_target_value_(security, value)
if order is not None and order.filled > 0:
return True
return False
class trade_strategy:
def __init__(self, strategy_name, strategy_template_id, config_save, log, channel: exchange):
# -------必要参数-------
self.log = Logger(log) # 日志记录器
self.strategy_config = strategy_config(strategy_name,
strategy_template_id,
config_save,
self.log)
self.state = state() # 策略状态管理器,跟踪策略运行状态
self.channel = channel
# 添加持仓监控和管理器
def init_strategy(self) -> None:
"""
策略初始化函数
配置交易环境参数,包括防未来数据、基准、滑点、订单成本以及日志输出等级。
参数:
context: 聚宽平台传入的交易上下文对象
"""
# 启用防未来数据以确保历史回测的严谨性
self.log.info("初始化策略TradingStrategy配置...")
set_option('avoid_future_data', True)
# 设置策略基准为上证指数
# set_benchmark(self.strategy_config.config["benchmark_stock"])
set_benchmark("000001.XSHG")
# 使用真实市场价格,模拟实际交易
set_option('use_real_price', True)
# 设置固定滑点,确保价格偏差不大
set_slippage(FixedSlippage(3 / 10000))
# 设置订单成本,包括印花税和佣金
set_order_cost(OrderCost(
open_tax=0,
close_tax=0.001, # 卖出时0.1%的印花税
open_commission=2.5 / 10000,
close_commission=2.5 / 10000,
close_today_commission=0,
min_commission=5 # 最低佣金为5元
), type='stock')
# 设置日志输出等级(中文日志输出)
self.log.logger.set_level('order', 'error')
self.log.logger.set_level('system', 'error')
self.log.logger.set_level('strategy', 'debug')
def set_target_list(self, target_list):
self.target_list = target_list.copy()
def get_target_list(self):
"""获取本次调仓候选股票列表"""
return self.target_list
def get_configs(self) -> dict:
"""
获取策略配置
:return: strategy_config 对象
"""
return self.strategy_config.get_all_configs()
def get_strategy_config(self) -> strategy_config:
"""
获取策略配置字典
:return: 包含所有配置参数的字典
"""
return self.strategy_config
def close_position(self, context, stock: str):
"""
平仓操作:尽可能将指定股票仓位全部卖出
参数:
position: 持仓对象
返回:
若下单后订单全部成交返回 True否则返回 False
"""
self.channel.close_positions(context, stock)
def open_position(self, security: str, value: float) -> bool:
"""
开仓操作:尝试买入指定股票
参数:
security: 股票代码
value: 分配给该股票的资金
返回:
若下单成功(部分或全部成交)返回 True否则返回 False
"""
return self.channel.open_position(security, value)
class base_task:
def __init__(self, strategy: trade_strategy, sub_task=None):
self.strategy = strategy
self.name = "base_task"
self.remark = "base_task"
self.memo = "base_task"
self.sub_task = sub_task
self.strategy_config = self.strategy.get_strategy_config()
self.configs = self.strategy.get_configs()
self.log = self.strategy.log
def init(self, context: Any):
pass
def config(self, context: Any):
pass
def run(self, context: Any):
pass
def handle(self, context: Any):
pass
def end(self, context: Any):
pass
def process(self, context: Any):
self.init(context)
self.config(context)
self.run(context)
self.handle(context)
self.end(context)
def get_config(self, key: str):
configs = self.configs.get(key)
data_type = configs['data_type']
data_params = configs['data_param']
func = conf.config_type(data_type)
return func(self.configs[key]['new_value'], data_params)
class check_state_before_task(base_task):
"""
重置state信息
最前置信息
"""
def __init__(self, strategy: trade_strategy, sub_task=None):
super().__init__(strategy, sub_task)
def init(self, context):
self.log.info("--------------------------")
self.log.info("重置运行中环境数据")
def config(self, context):
# 重置state信息
self.strategy.state.reset()
self.hold_stock = [position.security for position in list(context.portfolio.positions.values())]
self.yesterday_high_list = []
self.yesterday_low_list = []
def run(self, context):
positions = context.portfolio.positions
for stock in positions:
self.strategy.state.set_hold_stock(stock.security, stock.__dict__)
if self.hold_stock:
# 获取持仓股票昨日数据(包括收盘价、涨停价、跌停价)
df = DataHelper.get_price_safe(
self.hold_stock,
end_date=context.previous_date,
frequency='daily',
fields=['close', 'high_limit', 'low_limit'],
count=1,
panel=False,
fill_paused=False
)
if df is not None and not df.empty:
# 过滤出收盘价等于涨停价的股票,作为昨日涨停股票
self.yesterday_high_list = list(df[df['close'] == df['high_limit']]['code'])
self.yesterday_low_list = list(df[df['close'] == df['low_limit']]['code'])
def handle(self, context):
for stock in self.yesterday_high_list:
self.strategy.state.set_high_stock(stock['code'], stock)
for stock in self.yesterday_low_list:
self.strategy.state.set_low_stock(stock['code'], stock)
class check_market_env_task(base_task):
"""
交易环境确认
1、判断是否为特殊月份1月/4月
2、如果是特殊月份且通过了清仓检查则清仓所有持仓
3、如果不是特殊月份且没有交易信号则重置交易信号
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.current_date = context.current_dt.date()
self.current_month = self.current_date.month
self.current_day = self.current_date.day
# 判断是否执行空仓策略
self.filter_month_clear_switch = self.get_config('filter_month_clear_switch')
self.filer_month_data = self.get_config('filter_month_data')
# 判断是否为特殊月 = True时 等于 执行1月/4月 全部清仓
self.in_special_month = str(self.current_month) in self.filer_month_data
self.today_trade_switch = self.get_config('today_trade_switch') # 是否为不交易信号
def run(self, context: Any):
#
if self.filter_month_clear_switch and self.in_special_month:
# 进入特殊月份,应清仓
for stock in self.strategy.state.data['position_list'].keys():
self.strategy.state.set_sell_request(stock, self.strategy.state.data['position_list'][stock], f"enter_{self.current_month}_month")
# 清仓后就不交易
self.today_trade_switch = False
elif not self.filter_month_clear_switch:
# 不在特殊月 或 不执行空仓策略 可以交易
self.today_trade_switch = True
else:
# 如果是特殊月份但没有清仓条件,则继续交易
self.today_trade_switch = True
def handle(self, context: Any):
# 保存状态
old_today_trade_switch = self.get_config("today_trade_switch")
if old_today_trade_switch != self.today_trade_switch:
self.strategy_config.set_config("today_trade_switch", self.today_trade_switch) # 临时变量
self.log.info(f"交易环境更新: today_trade_switch 状态更新为 {self.today_trade_switch}")
def end(self, context: Any):
self.log.info("-----------------")
self.log.info("一.今日环境确认:")
self.log.info(" 当前日期: {}".format(self.current_date))
self.log.info(" 空仓策略: {}".format("开启" if self.filter_month_clear_switch else "关闭"))
self.log.info(" 今日交易: {}".format("交易" if self.today_trade_switch else "清仓"))
self.log.info(" 特殊月份: {}".format(",".join(str(item) for item in self.filer_month_data)))
self.log.info(" 当前月份: {}".format(self.current_month))
# 今日清仓
class check_positions_stop_loss_task(base_task):
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略
self.hold_list = self.strategy.state.data['position_list'] # 持仓列表
self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关
self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略
self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值
self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值
# self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因
self.index_stocks = self.get_config('index_stocks') # 指数代码
self.temp_sells_list = {}
def run(self, context: Any):
if len(self.hold_list) == 0:
self.log.debug("当前无持仓,跳过止损检查。")
return
if self.stoploss_switch and self.today_trade_switch:
if self.stoploss_strategy == 1 or self.stoploss_strategy == 3:
# 个股止盈或止损判断
for stock in list(context.portfolio.positions.keys()):
pos = context.portfolio.positions[stock]
if pos.price >= pos.avg_cost * 2:
# 通过持仓监控器注册卖出请求,而不是直接卖出
# self.strategy.state.set_sell_request(stock, pos, 'take_profit')
self.sell_request(stock, pos, 'take_profit')
self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。")
elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data:
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.strategy.state.set_sell_request(stock, pos, 'stop_loss')
self.sell_request(stock, pos, 'take_profit')
self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。")
# self.strategy.state.set_sell_reason("stoploss")
if self.stoploss_strategy == 2 or self.stoploss_strategy == 3:
# 大盘止损判断,若整体市场跌幅过大则平仓所有股票
stock_list = DataHelper.get_index_stocks(self.index_stocks)
df = DataHelper.get_price_safe(
stock_list,
end_date=context.previous_date,
frequency='daily',
fields=['close', 'open'],
count=1,
panel=False
)
if df is not None and not df.empty:
down_ratio = (df['close'] / df['open']).mean()
if down_ratio <= self.stoploss_market_rate_data:
# self.strategy.state.set_sell_reason("stoploss")
self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。")
for stock in list(context.portfolio.positions.keys()):
pos = context.portfolio.positions[stock]
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.sell_request(stock, pos, 'market_stop_loss')
def handle(self, context: Any):
# 保存状态
for key, value in self.temp_sells_list.items():
stock, reason = key.split('_')
self.strategy.state.set_sell_request(stock, value, reason)
pass
def end(self, context: Any):
self.log.info("-----------------")
self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用"))
self.log.info("今日交易开关:{}".format("交易" if self.today_trade_switch else "不交易"))
self.log.info("止损策略方式:{}".format("个股止损" if self.stoploss_strategy == 1 else "大盘止损" if self.stoploss_strategy == 2 else "联合止损" if
self.stoploss_strategy == 3 else "异常"))
self.log.info("当前卖出请求")
for key, value in self.temp_sells_list.items():
self.log.info(f" {key}: {value}")
# 总卖出请求
sells_list = self.strategy.state.get_sell_requests()
if len(sells_list) > 0:
self.log.info("总卖出请求:")
for stock, data in sells_list.items():
self.log.info(f" {stock}: {data}")
pass
def sell_request(self, stock, data, reason):
self.temp_sells_list[f"{stock}_{reason}"] = data
class buy_stocks_func_task(base_task):
"""
每周调仓策略:
如果非空仓日,先选股得到目标股票列表,再卖出当前持仓中不在目标列表且昨日未涨停的股票,
最后买入目标股票,同时记录当天买入情况避免重复下单。
参数:
context: 聚宽平台传入的交易上下文对象
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
# 每次调仓的目标股票数量
self.stock_num = self.get_config('stock_num')
# 获取当前持仓列表
self.hold_list = self.strategy.state.get_hold_list()
self.index_stocks = self.get_config('index_stocks')
# 调仓的目标股票列表排序
self.target_list = DataHelper.get_stock_list(context, self.index_stocks, self.stock_num)
# 限制目标股票列表长度为 stock_num
self.temp_target_list = self.target_list[:self.stock_num]
# 昨日涨停股票列表
self.yesterday_HL_list: List[str] = self.strategy.state.get_yesterday_high_list()
self.today_trade_switch = self.get_config('today_trade_switch')
self.temp_sells_list = {}
self.temp_buys_list = {}
def run(self, context: Any):
if not self.today_trade_switch:
self.log.info("今日非交易日,跳过调仓操作。")
return
# 重置当天已买入记录
# self.position_manager.reset_not_buy_again()
# 取目标持仓数以内的股票作为调仓目标
# self.log.info(f"每周调仓目标股票: {self.temp_target_list}")
# 遍历当前持仓,若股票不在目标列表,则执行卖出操作
for stock in self.hold_list:
if stock not in self.temp_target_list:
pos = context.portfolio.positions.get(stock)
self.log.info(f"调仓决策:卖出股票 {stock}")
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.sell_request(stock, pos, 'rebalance')
# self.strategy.state.set_sell_request(stock, pos, 'rebalance')
# self.sell_request(stock, pos, 'rebalance')
else:
self.log.info(f"调仓决策:继续持有股票 {stock}")
# 对目标股票执行买入操作前,先将其注册到待买入队列
buy_targets = [stock for stock in self.temp_target_list if stock not in self.hold_list]
if buy_targets:
for stock in buy_targets:
pos = context.portfolio.positions.get(stock)
self.buys_request(stock, pos, 'buy')
self.log.info(f"调仓决策:将 {stock} 加入待买入队列")
def handle(self, context: Any):
pass
def end(self, context: Any):
pass
# 这里可以添加任何必要的清理或总结操作
def sell_request(self, stock, data, reason):
self.temp_sells_list[f"{stock}_{reason}"] = data
def buys_request(self, stock, data, reason):
self.temp_buys_list[f"{stock}_{reason}"] = data
class process_pending_sells_task(base_task):
"""
处理待卖出股票队列
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.sell_request_list = self.strategy.state.get_sell_requests()
def run(self, context: Any):
# 如果没有待卖出股票,直接返回
if len(self.sell_request_list) == 0:
self.log.info("没有待卖出股票,跳过处理")
return
self.log.info("开始处理待卖出股票队列")
current_data = DataHelper.get_current_data()
for stock in self.sell_request_list:
self.log.info(f"处理待卖出股票: {stock}")
if stock not in context.portfolio.positions:
# 股票已不在持仓中,可能已通过其他方式卖出
self.log.warning(f" 股票 {stock} 不在当前持仓中,跳过处理")
continue
position = context.portfolio.positions[stock]
if position.closeable_amount <= 0:
# 没有可卖出数量
self.log.warning(f" 股票 {stock} 没有可卖出数量,跳过处理")
continue
# 检查是否可以交易
if current_data[stock].paused:
self.log.warning(f" 股票 {stock} 已暂停交易,跳过卖出")
continue
if current_data[stock].low_limit >= current_data[stock].last_price:
self.log.warning(f" 股票 {stock} 已触及跌停,跳过卖出")
continue
# 尝试卖出
# self.position_monitor.increment_attempt(stock)
success = self.strategy.close_position(context, position)
if success:
# self.position_monitor.mark_as_successful(stock)
self.log.info(f"成功卖出股票 {stock}")
else:
self.log.warning(f"卖出股票 {stock} 失败")
def handle(self, context: Any):
pass
def end(self, context: Any):
pass
class process_pending_buy_task(base_task):
"""
处理待买入股票队列
"""
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.today_trade_switch = self.get_config('today_trade_switch')
self.stock_num = self.get_config('stock_num')
self.buy_requests_list = self.strategy.state.get_buy_requests()
self.hold_list = [position.security for position in list(context.portfolio.positions.values())]
self.buy_symbol_money_up_limit = self.get_config('buy_symbol_money_up_limit')
self.buy_symbol_money_total = self.get_config('buy_symbol_money_total')
def run(self, context: Any):
# 如果在特殊月份1月/4月或没有待买入股票直接返回
if self.today_trade_switch or not self.buy_requests_list:
self.log.info("今日非交易日或没有待买入股票,跳过处理")
return
self.log.info("开始处理待买入股票队列")
current_data = DataHelper.get_current_data()
position_count = len(self.hold_list)
target_num = int(self.stock_num)
# 如果持仓已满,不再买入
if position_count >= target_num:
self.log.info("当前持仓已满,清空待买入队列")
self.strategy.state.clear_buy_requests()
return
# 计算可买入的股票数量和分配资金
buy_count = min(len(self.buy_requests_list), target_num - position_count)
if buy_count <= 0:
self.log.info("没有可买入的股票,或持仓已满")
return
# 计算每只股票可分配的资金
if context.portfolio.cash < self.buy_symbol_money_total:
self.log.warning(f"可用资金 {context.portfolio.cash:.2f} 元不足,无法满足总资金要求 {self.buy_symbol_money_total}")
return
try:
value = context.portfolio.cash / buy_count
except ZeroDivisionError:
self.log.error("资金分摊时除零错误")
return
# 确保每只股票有足够的买入金额
if value < self.buy_symbol_money_up_limit: # 假设最小买入金额为5000元
self.log.warning(f"每只股票分配资金不足: {value:.2f}元,取消本次买入")
return
# 逐个买入股票
for idx, stock in enumerate(self.buy_requests_list[:buy_count]):
# 跳过已持仓的股票
if stock in self.hold_list:
self.log.warning(f"股票 {stock} 已在持仓中,跳过买入")
continue
# 检查是否可交易
if current_data[stock].paused:
self.log.warning(f"股票 {stock} 处于停牌状态,跳过买入")
continue
if current_data[stock].high_limit <= current_data[stock].last_price:
self.log.warning(f"股票 {stock} 已涨停,跳过买入")
continue
# 执行买入
if self.strategy.open_position(stock, value):
self.log.info(f"成功买入股票 {stock},分配资金 {value:.2f}")
# 删除购买列表
# 如果持仓已满,停止买入
if len(context.portfolio.positions) >= target_num:
break
def handle(self, context: Any):
pass
def end(self, context: Any):
pass
strategy = trade_strategy("测试策略", 10000000, network_config(), log, jq_channel())
def check_state_before_func(context: Any):
task = check_state_before_task(strategy)
task.process(context)
def check_market_env_func(context: Any):
task = check_market_env_task(strategy)
task.process(context)
def check_positions_stop_loss_func(context: Any):
task = check_positions_stop_loss_task(strategy)
task.process(context)
def buy_stocks_func(context: Any):
task = buy_stocks_func_task(strategy)
task.process(context)
def process_pending_sells_func(context: Any):
task = process_pending_sells_task(strategy)
task.process(context)
def process_pending_buys_func(context: Any):
task = process_pending_buy_task(strategy)
task.process(context)
def initialize(context: Any) -> None:
# # 初始化策略参数
strategy.init_strategy() # 策略初始化函数
# # 注册调度任务
run_daily(check_state_before_func, time='9:01') # 开盘前先检查持仓状态
run_daily(check_market_env_func, time='9:05')
run_daily(check_positions_stop_loss_func, time='9:10')
run_weekly(buy_stocks_func, 2, time='10:15') # 每周调仓
run_daily(process_pending_sells_func, time='10:15') # 处理待卖出股票
run_daily(process_pending_buys_func, time='10:20') # 处理待买入股票
# run_daily(trade_afternoon_func, time='14:30')
run_daily(process_pending_sells_func, time='14:40') # 收盘前再处理一次卖出
run_daily(process_pending_buys_func, time='14:45') # 收盘前再处理一次买入
# run_daily(close_account_func, time='14:50')
# run_weekly(print_position_info_func, 5, time='15:10') # 周五收盘后打印持仓信息
pass
# initialize(None) # 调用初始化函数,开始策略执行