easyquant/task/check_state_before_task.py
2025-07-03 23:39:31 +08:00

102 lines
3.9 KiB
Python

import json
from typing import Any
from config.network_config import network_config
from kit.kit import DataHelper
from strategy import trade_strategy
from task.base_task import base_task
class check_state_before_task(base_task):
"""
重置state信息
最前置信息
"""
def __init__(self, strategy: trade_strategy, sub_task=None):
super().__init__(strategy, sub_task)
def init(self, context: Any):
self.log.info("--------------------------")
self.log.info("重置运行中环境数据")
def config(self, context: Any):
# 重置state信息
self.hold_stock = [position.security for position in list(context.portfolio.positions.values())]
self.hold_stock_data = {position.security: position for position in list(context.portfolio.positions.values())}
self.yesterday_high_list = []
self.yesterday_low_list = []
def run(self, context: Any):
positions = context.portfolio.positions
for stock in positions:
self.strategy.state.set_hold_stock(stock.security, stock.__dict__)
if self.hold_stock:
# 获取持仓股票昨日数据(包括收盘价、涨停价、跌停价)
df = DataHelper.get_price_safe(
self.hold_stock,
end_date=context.previous_date,
frequency='daily',
fields=['close', 'high_limit', 'low_limit'],
count=1,
panel=False,
fill_paused=False
)
if df is not None and not df.empty:
# 过滤出收盘价等于涨停价的股票,作为昨日涨停股票
self.yesterday_high_list = list(df[df['close'] == df['high_limit']])
self.yesterday_low_list = list(df[df['close'] == df['low_limit']])
def handle(self, context: Any):
for stock in self.yesterday_high_list:
self.strategy.state.set_high_stock(stock['code'], stock)
for stock in self.yesterday_low_list:
self.strategy.state.set_low_stock(stock['code'], stock)
def end(self, context: Any):
self.log.info("-----------------")
self.log.info("一.截止昨日持仓情况:")
self.hold_stock_info()
self.yesterday_high_list_info()
self.yesterday_low_list_info()
self.log.info("二.保存缓存信息")
self.state_save(context)
self.log.info("三.重置缓存信息")
self.strategy.state.reset()
#
def hold_stock_info(self):
if self.hold_stock:
self.log.info(" 持仓信息: {} 只,昨日涨停信息: {} 只,昨日跌停信息: {}".format(len(self.hold_stock), len(self.yesterday_high_list),
len(self.yesterday_low_list)))
for positions in self.hold_stock:
data = self.hold_stock_data
self.log.debug(" 持仓信息: {}".format(positions))
else:
self.log.info(" 无持仓")
def yesterday_high_list_info(self):
if self.yesterday_high_list:
self.log.info(" 涨停信息: {}".format(len(self.yesterday_high_list)))
for stock in self.yesterday_high_list:
self.log.debug(" 涨停信息: {}".format(stock))
else:
self.log.info(" 无涨停")
def yesterday_low_list_info(self):
if self.yesterday_high_list:
self.log.info(" 跌停信息: {}".format(len(self.yesterday_high_list)))
for stock in self.yesterday_high_list:
self.log.debug(" 跌停信息: {}".format(stock))
else:
self.log.info(" 无跌停")
def state_save(self, context: Any):
json_data = json.dumps(self.strategy.state)
trade_date = context.current_dt.date()
network_config.insert_state(self.strategy_config.strategy_id, json_data, trade_date.strftime("%Y-%m-%d %H:%M:%S"))