forked from idanya/algo-trader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstrategy.py
25 lines (19 loc) · 926 Bytes
/
strategy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from typing import Optional, List
from entities.candle import Candle
from entities.strategy import Strategy
from entities.strategy_signal import StrategySignal
from pipeline.processor import Processor
from pipeline.shared_context import SharedContext
from trade.signals_executor import SignalsExecutor
class StrategyProcessor(Processor):
def __init__(self, strategies: List[Strategy], signals_executor: SignalsExecutor,
next_processor: Optional[Processor]) -> None:
super().__init__(next_processor)
self.signals_executor = signals_executor
self.strategies = strategies
def process(self, context: SharedContext, candle: Candle):
signals: List[StrategySignal] = []
for strategy in self.strategies:
signals += strategy.process(context, candle) or []
self.signals_executor.execute(candle, signals)
super().process(context, candle)