Skip to content

Commit

Permalink
streaming online-learing plugin package (#52)
Browse files Browse the repository at this point in the history
Co-authored-by: Lingxuan Zuo <[email protected]>
  • Loading branch information
ashione and Lingxuan Zuo authored Mar 16, 2024
1 parent 15522e5 commit 1ce7d26
Show file tree
Hide file tree
Showing 34 changed files with 2,830 additions and 0 deletions.
4 changes: 4 additions & 0 deletions training/python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
build/
dist/
.eggs/
runtime.egg-info
Empty file added training/python/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions training/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ray
raystreaming
26 changes: 26 additions & 0 deletions training/python/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os

from setuptools import setup, find_packages

# the module name
pkg_dir = "streaming"

current_dir = os.path.dirname(os.path.abspath(__file__))

os.chdir(current_dir)

setup(
name=pkg_dir,
version="0.0.1",
description='streaming',
keywords=("ray", "streaming", "runtime", "operator"),
author='The Authors of Antgroup',
packages=find_packages(
exclude=["*.tests", "*.tests.*", "tests.*", "tests"]),
platforms="any",
scripts=[],
include_package_data=True,
install_requires=[
'protobuf', 'schedule', 'psutil'
],
zip_safe=False)
Empty file.
Empty file.
Empty file.
56 changes: 56 additions & 0 deletions training/python/streaming/operator/base_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
from abc import ABCMeta, abstractmethod

logger = logging.getLogger(__name__)


class BaseOperator(metaclass=ABCMeta):
"""
Dynamic Module, methods like spring bean
"""

@abstractmethod
def init(self, config):
pass

@abstractmethod
def destroy(self):
pass

@abstractmethod
def run(self):
pass

@abstractmethod
def process(self, msg):
"""
:param msg: data value
:return:
"""
pass

@abstractmethod
def save_checkpoint(self, checkpoint_id, barrier_id=None):
"""
:param checkpoint_id: long, checkpoint id
:param barrier_id: long, barrier id
:return: byte[], operator state
"""
pass

@abstractmethod
def save_checkpoint_async(self, checkpoint_id, barrier_id=None):
"""
:param checkpoint_id: long, checkpoint id
:param barrier_id: long, barrier id
:return: byte[], operator state
"""
pass

@abstractmethod
def load_checkpoint(self, checkpoint_id):
"""
:param checkpoint_id: long, checkpoint id
:return
"""
pass
12 changes: 12 additions & 0 deletions training/python/streaming/operator/checkpoint_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from abc import ABCMeta, abstractmethod


class CheckpointListener(metaclass=ABCMeta):

@abstractmethod
def on_checkpoint_complete(self, checkpoint_id):
"""
:param checkpoint_id: checkpoint id
:return:
"""
raise NotImplementedError('function not implemented!')
Empty file.
28 changes: 28 additions & 0 deletions training/python/streaming/operator/constant/operator_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function


class OperatorConstants:
"""
Training constants
"""
NONE_BYTES = 8

# Stream reader
READER_START_OFFSET = "reader_start_offset"
READER_MAX_SLOT_SIZE = "reader_max_slot_size"
READER_MAX_BYTES = "reader_max_bytes"
READER_LOG_INTERVAL = "reader_log_interval_in_secs"
READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED = "reader_force_clear_when_full_and_all_consumed"

DEFAULT_QUEUE_START_OFFSET = 0
DEFAULT_QUEUE_MAX_SLOT_SIZE = 100000
DEFAULT_QUEUE_MAX_BYTES = 1024 * 1024 * 1024
DEFAULT_LOG_DETAIL_INTERVAL = 10
DEFAULT_FORCE_CLEAR = True
DEFAULT_RINGBUFFER_LOG_DETAIL_INTERVAL = 60 * 60

DEFAULT_CIRCULAR_BUFFER_MAX_SIZE = 100000
DEFAULT_CIRCULAR_BUFFER_MAX_BYTES = 1024 * 1024 * 1024
DEFAULT_CIRCULAR_BUFFER_WAIT_TIME = 0.2
Empty file.
41 changes: 41 additions & 0 deletions training/python/streaming/operator/impl/batch_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from streaming.operator.base_operator import BaseOperator
from streaming.operator.reader.tf_operator_reader import TFOperatorReader


class BatchOperator(BaseOperator):
"""
Batch operator
"""

def __init__(self):
self._reader = TFOperatorReader()

def init(self, config):
pass

def run(self):
pass

def init_and_run(self, config):
pass

def destroy(self):
pass

def save_checkpoint(self, checkpoint_id):
pass

def save_checkpoint_async(self, checkpoint_id):
pass

def save_checkpoint(self, checkpoint_id, barrier_id):
pass

def save_checkpoint_async(self, checkpoint_id, barrier_id):
pass

def load_checkpoint(self, checkpoint_id):
pass

def process(self, msg):
self._reader.put_data(msg)
74 changes: 74 additions & 0 deletions training/python/streaming/operator/impl/eval_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import json
import threading
import ray
from ray.streaming.function import SinkFunction
from streaming.operator.reader.eval_reader import EvalReader

logger = logging.getLogger(__name__)


class EvalSinkFunction(SinkFunction):
def __init__(self):
pass

def open(self, runtime_context):
self.job_config = runtime_context.get_job_config()
self._op_config = runtime_context.get_config()
self.task_index = runtime_context.get_task_index()
self.task_id = runtime_context.get_task_id()
self.parallelism = runtime_context.get_parallelism()
self.metric = runtime_context.get_metric()
self.job_id = ray.get_runtime_context().actor_id.job_id.hex()
logging.info(
"Open eval function. op config : {}, job config : {}.".format(
self._op_config, runtime_context.get_job_config()))
self.optimize_config()
logger.info("Initializing operator with config: {}".format(
self._op_config))
self._reader = EvalReader(self._op_config)
self._evaluate_thread_started = False
logger.info("Operator begin finish.")

def optimize_config(self):
for k, v in self._op_config.items():
try:
if isinstance(v,
str) and v.find("{") != -1 and v.find("}") != -1:
self._op_config[k] = json.loads(v)
except Exception:
pass

def get_all_actor_names(self):
task_index = 0
global_index = self.task_id - self.task_index
actor_names = [
f"{self.job_id}-{self.job_config['StreamingOpName']}-" +
f"{task_index + i}|{global_index+i}"
for i in range(0, self.parallelism)
]
return actor_names

def sink(self, record):
logging.debug(f"Sink record : {record}")
self._reader.put_data(record)
if not self._evaluate_thread_started:
logger.info("Starting evaluating thread!")
t = threading.Thread(target=self._evaluate)
t.setDaemon(True)
t.start()
logger.info("Evaluating thread started!")
self._evaluate_thread_started = True

def _evaluate(self):
"""
evaluator should override this method
:return:
"""
raise NotImplementedError('function _evaluate not implemented!')

def save_checkpoint(self, checkpoint_id):
pass

def load_checkpoint(self, checkpoint_id):
pass
25 changes: 25 additions & 0 deletions training/python/streaming/operator/impl/eval_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging

import ray
import json

from streaming.operator.impl.training_independent_actor import TrainingIndependentActorInterface

logger = logging.getLogger(__name__)


@ray.remote(max_restarts=-1)
class MockEvalExecutor:
def __init__(self, conf):
logger.info(f"Mock eval execution init, config {conf}")
conf_dict = json.loads(conf)
logger.info(f"Decoded config : {conf_dict}")


class EvalActorInterface(TrainingIndependentActorInterface):
def __init__(self, config=None):
super().__init__(config)
25 changes: 25 additions & 0 deletions training/python/streaming/operator/impl/optimizer_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging

import ray
import json

from streaming.operator.impl.training_independent_actor import TrainingIndependentActorInterface

logger = logging.getLogger(__name__)


@ray.remote(max_restarts=-1)
class MockOptimizerExecutor:
def __init__(self, conf):
logger.info(f"Mock auto scale execution init, config {conf}")
conf_dict = json.loads(conf)
logger.info(f"Decoded config : {conf_dict}")


class OptimizerInterface(TrainingIndependentActorInterface):
def __init__(self, config=None):
super().__init__(config)
25 changes: 25 additions & 0 deletions training/python/streaming/operator/impl/ps_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging

import ray
import json

from streaming.operator.impl.training_independent_actor import TrainingIndependentActorInterface

logger = logging.getLogger(__name__)


@ray.remote(max_restarts=-1)
class MockPsExecutor:
def __init__(self, conf):
logger.info(f"Mock ps execution init, config {conf}")
conf_dict = json.loads(conf)
logger.info(f"Decoded config : {conf_dict}")


class PsActorInterface(TrainingIndependentActorInterface):
def __init__(self, config=None):
super().__init__(config)
Loading

0 comments on commit 1ce7d26

Please sign in to comment.