forked from idanya/algo-trader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.py
26 lines (19 loc) · 823 Bytes
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from __future__ import annotations
from abc import abstractmethod
from typing import Optional
from entities.candle import Candle
from entities.event import Event
from pipeline.shared_context import SharedContext
class Processor:
def __init__(self, next_processor: Optional[Processor]) -> None:
self.next_processor = next_processor
@abstractmethod
def process(self, context: SharedContext, candle: Candle):
if self.next_processor:
self.next_processor.process(context, candle)
def reprocess(self, context: SharedContext, candle: Candle):
if self.next_processor:
self.next_processor.reprocess(context, candle)
def event(self, context: SharedContext, event: Event):
if self.next_processor:
self.next_processor.event(context, event)