from typing import Any from kit.conf import conf from strategy import trade_strategy class base_task: def __init__(self, strategy: trade_strategy, sub_task=None): self.strategy = strategy self.name = "base_task" self.remark = "base_task" self.memo = "base_task" self.sub_task = sub_task self.strategy_config = self.strategy.get_strategy_config() self.configs = self.strategy.get_configs() self.log = self.strategy.log def init(self, context: Any): pass def config(self, context: Any): pass def run(self, context: Any): pass def handle(self, context: Any): pass def end(self, context: Any): pass def process(self, context: Any): self.init(context) self.config(context) self.run(context) self.handle(context) self.end(context) def get_config(self, key: str): configs = self.configs.get(key) data_type = configs['data_type'] data_params = configs['data_param'] func = conf.config_type(data_type) return func(self.configs[key]['new_value'], data_params)