Skip to content

Commit

Permalink
[MIX]manylinux build fix (#63)
Browse files Browse the repository at this point in the history
* manylinux build fix

* replace source location

* aliyun source list instead

* fix typo of source list file name
  • Loading branch information
ashione authored Apr 4, 2024
1 parent 78bad38 commit 5ca25e1
Show file tree
Hide file tree
Showing 27 changed files with 771 additions and 543 deletions.
20 changes: 20 additions & 0 deletions .github/aliyun-source.list
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse
deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse
deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted
deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe
deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse
deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ env:
jobs:
streaming-debian-manylinux-pipeline:
timeout-minutes: 120
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
container: quay.io/pypa/manylinux_2_24_x86_64

steps:
- uses: actions/checkout@v2

- name: Apt get update and Install bazel
run: |
cp /etc/apt/sources.list /etc/apt/sources.list.bak
cp .github/sources.list /etc/apt/sources.list
cp .github/ubuntu-source.list /etc/apt/sources.list
apt-get update
apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git
apt-get update
apt-get install -yq gcc g++
sh scripts/install-bazel.sh
Expand Down
8 changes: 0 additions & 8 deletions .github/sources.list

This file was deleted.

14 changes: 14 additions & 0 deletions .github/ubuntu-source.list
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
deb http://archive.ubuntu.com/ubuntu/ jammy main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy main restricted universe multiverse

deb http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted universe multiverse

deb http://archive.ubuntu.com/ubuntu/ jammy-security main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy-security main restricted universe multiverse

deb http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse
# deb-src http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse

deb http://archive.canonical.com/ubuntu/ jammy partner
# deb-src http://archive.canonical.com/ubuntu/ jammy partner
1 change: 1 addition & 0 deletions scripts/suppress_output
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/bash
# Run a command, suppressing output unless it hangs or crashes.
CURRENT_DIR=$(dirname "${BASH_SOURCE:-$0}")

TMPFILE="$(mktemp)"
PID=$$
Expand Down
96 changes: 49 additions & 47 deletions streaming/python/raystreaming/runtime/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def __init__(self, channel_id_str: str):
channel_id_str: string representation of channel id
"""
self.channel_id_str = channel_id_str
self.object_qid = ray.ObjectRef(
channel_id_str_to_bytes(channel_id_str))
self.object_qid = ray.ObjectRef(channel_id_str_to_bytes(channel_id_str))

def __eq__(self, other):
if other is None:
Expand Down Expand Up @@ -100,7 +99,6 @@ def channel_bytes_to_str(id_bytes):


class Message(ABC):

@property
@abstractmethod
def body(self):
Expand Down Expand Up @@ -131,12 +129,7 @@ class DataMessage(Message):
DataMessage represents data between upstream and downstream operator.
"""

def __init__(self,
body,
timestamp,
message_id,
channel_id,
is_empty_message=False):
def __init__(self, body, timestamp, message_id, channel_id, is_empty_message=False):
self.__body = body
self.__timestamp = timestamp
self.__channel_id = channel_id
Expand Down Expand Up @@ -226,25 +219,29 @@ class ChannelCreationParametersBuilder:
"""

_java_reader_async_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessage",
"([B)V")
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessage", "([B)V"
)
_java_reader_sync_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessageSync",
"([B)[B")
"io.ray.streaming.runtime.worker.JobWorker", "onReaderMessageSync", "([B)[B"
)
_java_writer_async_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessage",
"([B)V")
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessage", "([B)V"
)
_java_writer_sync_function_descriptor = JavaFunctionDescriptor(
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessageSync",
"([B)[B")
"io.ray.streaming.runtime.worker.JobWorker", "onWriterMessageSync", "([B)[B"
)
_python_reader_async_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_reader_message", "JobWorker")
"raystreaming.runtime.worker", "on_reader_message", "JobWorker"
)
_python_reader_sync_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_reader_message_sync", "JobWorker")
"raystreaming.runtime.worker", "on_reader_message_sync", "JobWorker"
)
_python_writer_async_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_writer_message", "JobWorker")
"raystreaming.runtime.worker", "on_writer_message", "JobWorker"
)
_python_writer_sync_function_descriptor = PythonFunctionDescriptor(
"raystreaming.runtime.worker", "on_writer_message_sync", "JobWorker")
"raystreaming.runtime.worker", "on_writer_message_sync", "JobWorker"
)

def get_parameters(self):
return self._parameters
Expand Down Expand Up @@ -272,41 +269,47 @@ def build_output_queue_parameters(self, to_actors):
)
return self

def build_parameters(self, actors, java_async_func, java_sync_func,
py_async_func, py_sync_func):
def build_parameters(
self, actors, java_async_func, java_sync_func, py_async_func, py_sync_func
):
for handle in actors:
parameter = None
if handle._ray_actor_language == Language.PYTHON:
parameter = _streaming.ChannelCreationParameter(
handle._ray_actor_id, py_async_func, py_sync_func)
handle._ray_actor_id, py_async_func, py_sync_func
)
else:
parameter = _streaming.ChannelCreationParameter(
handle._ray_actor_id, java_async_func, java_sync_func)
handle._ray_actor_id, java_async_func, java_sync_func
)
self._parameters.append(parameter)
return self

@staticmethod
def set_python_writer_function_descriptor(async_function, sync_function):
ChannelCreationParametersBuilder._python_writer_async_function_descriptor = (
async_function)
async_function
)
ChannelCreationParametersBuilder._python_writer_sync_function_descriptor = (
sync_function)
sync_function
)

@staticmethod
def set_python_reader_function_descriptor(async_function, sync_function):
ChannelCreationParametersBuilder._python_reader_async_function_descriptor = (
async_function)
async_function
)
ChannelCreationParametersBuilder._python_reader_sync_function_descriptor = (
sync_function)
sync_function
)


class DataWriter:
"""Data Writer is a wrapper of streaming c++ DataWriter, which sends data
to downstream workers
"""

def __init__(self, output_channels, to_actors: List[ActorHandle],
conf: dict):
def __init__(self, output_channels, to_actors: List[ActorHandle], conf: dict):
"""Get DataWriter of output channels
Args:
output_channels: output channels ids
Expand All @@ -320,8 +323,7 @@ def __init__(self, output_channels, to_actors: List[ActorHandle],
]
creation_parameters = ChannelCreationParametersBuilder()
creation_parameters.build_output_queue_parameters(to_actors)
channel_size = conf.get(Config.CHANNEL_SIZE,
Config.CHANNEL_SIZE_DEFAULT)
channel_size = conf.get(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT)
py_msg_ids = [0 for _ in range(len(output_channels))]
config_bytes = _to_native_conf(conf)
is_mock = conf[Config.CHANNEL_TYPE] == Config.MEMORY_CHANNEL
Expand Down Expand Up @@ -365,8 +367,8 @@ def get_output_checkpoints(self) -> List[int]:

def clear_checkpoint(self, checkpoint_id):
logger.info(
"producer start to clear checkpoint, checkpoint_id={}".format(
checkpoint_id))
"producer start to clear checkpoint, checkpoint_id={}".format(checkpoint_id)
)
self.writer.clear_checkpoint(checkpoint_id)

def stop(self):
Expand All @@ -384,8 +386,9 @@ class DataReader:
from channels of upstream workers
"""

def __init__(self, input_channels: List, from_actors: List[ActorHandle],
conf: dict):
def __init__(
self, input_channels: List, from_actors: List[ActorHandle], conf: dict
):
"""Get DataReader of input channels
Args:
input_channels: input channels
Expand Down Expand Up @@ -416,8 +419,11 @@ def __init__(self, input_channels: List, from_actors: List[ActorHandle],
self.__creation_status = {}
for q, status in queues_creation_status.items():
self.__creation_status[q] = ChannelCreationStatus(status)
logger.info("create DataReader succeed, creation_status={}".format(
self.__creation_status))
logger.info(
"create DataReader succeed, creation_status={}".format(
self.__creation_status
)
)

def read(self, timeout_millis):
"""Read data from channel
Expand Down Expand Up @@ -459,11 +465,11 @@ def _to_native_conf(conf):
config.op_name = conf[Config.STREAMING_OP_NAME]
# TODO set operator type
if Config.STREAMING_RING_BUFFER_CAPACITY in conf:
config.ring_buffer_capacity = int(
conf[Config.STREAMING_RING_BUFFER_CAPACITY])
config.ring_buffer_capacity = int(conf[Config.STREAMING_RING_BUFFER_CAPACITY])
if Config.STREAMING_EMPTY_MESSAGE_INTERVAL in conf:
config.empty_message_interval = int(
conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL])
conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL]
)
if Config.FLOW_CONTROL_TYPE in conf:
config.flow_control_type = int(conf[Config.FLOW_CONTROL_TYPE])
if Config.WRITER_CONSUMED_STEP in conf:
Expand All @@ -475,20 +481,17 @@ def _to_native_conf(conf):


class ChannelInitException(Exception):

def __init__(self, msg, abnormal_channels):
self.abnormal_channels = abnormal_channels
self.msg = msg


class ChannelInterruptException(Exception):

def __init__(self, msg=None):
self.msg = msg


class ChannelRecoverInfo:

def __init__(self, queue_creation_status_map=None):
if queue_creation_status_map is None:
queue_creation_status_map = {}
Expand All @@ -505,8 +508,7 @@ def get_data_lost_queues(self):
return data_lost_queues

def __str__(self):
return "QueueRecoverInfo [dataLostQueues=%s]" % (
self.get_data_lost_queues())
return "QueueRecoverInfo [dataLostQueues=%s]" % (self.get_data_lost_queues())


class ChannelCreationStatus(Enum):
Expand Down
Loading

0 comments on commit 5ca25e1

Please sign in to comment.