easyquant/task/check_positions_stop_loss_task.py
2025-07-04 14:35:51 +08:00

86 lines
4.1 KiB
Python

from typing import Any
from kit.kit import DataHelper
from strategy import trade_strategy
from task.base_task import base_task
class check_positions_stop_loss_task(base_task):
def __init__(self, strategy: trade_strategy):
super().__init__(strategy)
def config(self, context: Any):
self.stoploss_switch = self.get_config('stoploss_switch') # 是否启用止损策略
self.hold_list = self.strategy.state.data['position_list'] # 持仓列表
self.today_trade_switch = self.get_config('today_trade_switch') # 今日交易开关
self.stoploss_strategy = self.get_config('stoploss_strategy') # 止损策略
self.stoploss_stock_rate_data = self.get_config('stoploss_stock_rate_data') # 个股止损阀值
self.stoploss_market_rate_data = self.get_config('stoploss_market_rate_data') # 市场止损阀值
# self.reason_to_sell = self.strategy.state.get_sell_reason() # 卖出原因
self.index_stocks = self.get_config('index_stocks') # 指数代码
self.temp_sells_list = {}
def run(self, context: Any):
if len(self.hold_list) == 0:
self.log.debug("当前无持仓,跳过止损检查。")
return
if self.stoploss_switch and self.today_trade_switch:
if self.stoploss_strategy == 1 or self.stoploss_strategy == 3:
# 个股止盈或止损判断
for stock in list(context.portfolio.positions.keys()):
pos = context.portfolio.positions[stock]
if pos.price >= pos.avg_cost * 2:
# 通过持仓监控器注册卖出请求,而不是直接卖出
# self.strategy.state.set_sell_request(stock, pos, 'take_profit')
self.sell_request(stock,pos,'take_profit')
self.log.debug(f"股票 {stock} 实现100%盈利,执行止盈卖出。")
elif pos.price < pos.avg_cost * self.stoploss_stock_rate_data:
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.strategy.state.set_sell_request(stock, pos, 'stop_loss')
self.sell_request(stock,pos,'take_profit')
self.log.debug(f"股票 {stock} 触及止损阈值,执行卖出。")
#self.strategy.state.set_sell_reason("stoploss")
if self.stoploss_strategy == 2 or self.stoploss_strategy == 3:
# 大盘止损判断,若整体市场跌幅过大则平仓所有股票
stock_list = DataHelper.get_index_stocks(self.index_stocks)
df = DataHelper.get_price_safe(
stock_list,
end_date=context.previous_date,
frequency='daily',
fields=['close', 'open'],
count=1,
panel=False
)
if df is not None and not df.empty:
down_ratio = (df['close'] / df['open']).mean()
if down_ratio <= self.stoploss_market_rate_data:
#self.strategy.state.set_sell_reason("stoploss")
self.log.debug(f"市场检测到跌幅(平均跌幅 {down_ratio:.2%}),卖出所有持仓。")
for stock in list(context.portfolio.positions.keys()):
pos = context.portfolio.positions[stock]
# 通过持仓监控器注册卖出请求,而不是直接卖出
self.sell_request(stock, pos, 'market_stop_loss')
def handle(self, context: Any):
# 将出售的数据传递给state
pass
def end(self, context: Any):
self.log.info("-----------------")
self.log.info("止损策略开关:{}".format("启用" if self.stoploss_switch else "不启用"))
self.log.info("今日交易开关:{}")
self.log.info("止损策略方式:{}")
self.log.info("当前卖出请求")
pass
def sell_request(self,stock,data,reason):
self.temp_sells_list[f"{stock}_{reason}"] = data