Skip to content

Commit

Permalink
Add streaming pause/unpause queue to support many simultaneous streams (
Browse files Browse the repository at this point in the history
  • Loading branch information
BryceBeagle authored Aug 5, 2021
1 parent 8c61b26 commit 209cb6f
Show file tree
Hide file tree
Showing 35 changed files with 1,029 additions and 762 deletions.
2 changes: 1 addition & 1 deletion brainframe_qt/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.28.0"
__version__ = "0.28.4"
39 changes: 36 additions & 3 deletions brainframe_qt/api_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
import typing

from PyQt5.QtCore import QObject
from brainframe.api import BrainFrameAPI
from .streaming import StreamManager, StreamManagerAPI

# API instance that is later monkeypatched to be a singleton
api: BrainFrameAPI = StreamManagerAPI()
# Singleton API instance
api = BrainFrameAPI()

# Must come after api import
from .streaming import StreamManager

# Set using init_stream_manager before use
#
# Note: Creating singleton QObjects is a bit of a hassle because they need a parent
# object. Flutter has a really cool system called InheritedWidget/Provider that makes
# passing objects like this around really easily. I'd like to implement something
# similar, but for now this is the only singleton object we have so I think this
# should be begrudgingly "ok".
_stream_manager: StreamManager = typing.cast(StreamManager, None)


def init_stream_manager(*, parent: QObject) -> None:
global _stream_manager

if _stream_manager is not None:
raise RuntimeError("StreamManager has already been initialized")

_stream_manager = StreamManager(parent=parent)


def get_stream_manager() -> StreamManager:
if _stream_manager is None:
raise RuntimeError(
f"StreamManager has not been initialized yet. Call "
f"{init_stream_manager.__name__} first."
)

return _stream_manager
3 changes: 1 addition & 2 deletions brainframe_qt/api_utils/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .stream_manager import StreamManager, StreamManagerAPI
from .stream_manager import StreamManager
from .synced_reader import SyncedStreamReader
from .stream_listener import StreamListener
28 changes: 0 additions & 28 deletions brainframe_qt/api_utils/streaming/stream_listener.py

This file was deleted.

276 changes: 156 additions & 120 deletions brainframe_qt/api_utils/streaming/stream_manager.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,175 @@
import logging
import typing
from threading import RLock
from typing import Dict, List, Optional

from brainframe.api import BrainFrameAPI, StatusReceiver
from brainframe.api.bf_codecs import StreamConfiguration
from gstly import gobject_init
from gstly.stream_reader import GstStreamReader, StreamReader
from PyQt5.QtCore import QObject

from brainframe.api.bf_codecs import StreamConfiguration

from brainframe_qt.api_utils import api
from .synced_reader import SyncedStreamReader


class StreamManager:
"""Keeps track of existing Stream objects, and creates new ones as
necessary.
"""
class StreamManager(QObject):
"""Keeps track of existing Stream objects, and creates new ones as necessary"""

REHOSTED_VIDEO_TYPES = [StreamConfiguration.ConnType.WEBCAM,
StreamConfiguration.ConnType.FILE]
"""These video types are re-hosted by the server."""
_MAX_ACTIVE_STREAMS = 5
"""Number of streams to run concurrently"""

def __init__(self, status_receiver: StatusReceiver):
self._stream_readers = {}
self._status_receiver = status_receiver
self._async_closing_streams = []
"""A list of StreamReader objects that are closing or may have finished
closing"""
def __init__(self, *, parent: QObject):
super().__init__(parent=parent)

def start_streaming(self,
stream_config: StreamConfiguration,
url: str) -> SyncedStreamReader:
"""Starts reading from the stream using the given information, or
returns an existing reader if we're already reading this stream.
self._stream_lock = RLock()

:param stream_config: The stream to connect to
:param url: The URL to stream on
:return: A Stream object
self._running_streams: List[int] = []
"""Currently running streams. Does not include stay-alive streams.
Max length should be _NUM_ACTIVE_STREAMS
"""
if not self.is_streaming(stream_config.id):
# pipeline will be None if not in the options
pipeline: str = stream_config.connection_options.get("pipeline")

latency = StreamReader.DEFAULT_LATENCY
if stream_config.connection_type in self.REHOSTED_VIDEO_TYPES:
latency = StreamReader.REHOSTED_LATENCY
gobject_init.start()

# Streams created with a premises are always proxied from that
# premises
proxied = stream_config.premises_id is not None

stream_reader = GstStreamReader(
url,
latency=latency,
runtime_options=stream_config.runtime_options,
pipeline_str=pipeline,
proxied=proxied)
synced_stream_reader = SyncedStreamReader(
stream_config.id,
stream_reader,
self._status_receiver)
self._stream_readers[stream_config.id] = synced_stream_reader

return self._stream_readers[stream_config.id]

def is_streaming(self, stream_id: int) -> bool:
"""Checks if the the manager has a stream reader for the given stream
id.
:param stream_id: The stream ID to check
:return: True if the stream manager has a stream reader, false
otherwise
"""
return stream_id in self._stream_readers
self._paused_streams: List[int] = []
"""Currently paused streams"""

def close_stream(self, stream_id: int) -> None:
"""Close a specific stream and remove the reference.
self.stream_readers: Dict[int, SyncedStreamReader] = {}
"""All StreamReaders currently instantiated, paused or unpaused"""

:param stream_id: The ID of the stream to delete
"""
stream = self.close_stream_async(stream_id)
stream.wait_until_closed()
self._init_signals()

def _init_signals(self) -> None:
self.destroyed.connect(self.close)

def close(self) -> None:
"""Close all streams and remove references"""
for stream_id in self._stream_readers.copy().keys():
self.close_stream_async(stream_id)
self._stream_readers = {}

for stream in self._async_closing_streams:
stream.wait_until_closed()
self._async_closing_streams.remove(stream)

def close_stream_async(self, stream_id: int) -> SyncedStreamReader:
stream = self._stream_readers.pop(stream_id)
self._async_closing_streams.append(stream)
stream.close()
return stream


class StreamManagerAPI(BrainFrameAPI):
"""Augments the API class to manage and provide a StreamManager."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._stream_manager = None

def delete_stream_configuration(self, stream_id,
timeout=120):
super().delete_stream_configuration(stream_id, timeout)
if self._stream_manager is not None \
and self._stream_manager.is_streaming(stream_id):
self._stream_manager.close_stream_async(stream_id)

def get_stream_manager(self):
"""Returns a singleton StreamManager object"""
# Lazily import streaming code to avoid OpenCV dependencies unless
# necessary
from brainframe_qt.api_utils.streaming import StreamManager

if self._stream_manager is None:
self._stream_manager = StreamManager(self.get_status_receiver())
return self._stream_manager

def get_stream_reader(self, stream_config: StreamConfiguration):
"""Get the SyncedStreamReader for the given stream_configuration.
:param stream_config: The stream configuration to open.
:return: A SyncedStreamReader object
"""Request and wait for all streams to close"""
logging.info("Initiating StreamManager close")
self._close()
logging.info("StreamManager close finished")

def delete_stream(self, stream_id: int, timeout: int = 120) -> None:
"""[blocking API] Delete a stream through the API and initiate the closing of
its corresponding StreamReader
"""
api.delete_stream_configuration(stream_id, timeout=timeout)
self.stop_streaming(stream_id)

def pause_streaming(self, stream_id) -> None:
self._set_stream_paused(stream_id, True)
self._ensure_running_streams()

def resume_streaming(self, stream_id) -> None:
self._set_stream_paused(stream_id, False)
self._ensure_running_streams()

def start_streaming(
self,
stream_conf: StreamConfiguration,
url: str,
) -> SyncedStreamReader:
"""Starts reading from the stream using the given information, or returns an
existing reader if we're already reading this stream.
:param stream_conf: The stream to connect to
:param url: The URL to stream on
:return: A SyncedStreamReader for the stream
"""
url = self.get_stream_url(stream_config.id)
logging.info("API: Opening stream on url " + url)
return self._start_stream(stream_conf, url)

return self.get_stream_manager().start_streaming(stream_config, url)
def stop_streaming(self, stream_id: int) -> None:
"""Requests a stream to close asynchronously
def close(self):
super().close()
if self._stream_manager is not None:
self._stream_manager.close()
self._stream_manager = None
:param stream_id: The ID of the stream to delete
"""
self._stop_stream(stream_id)
self._ensure_running_streams()

def _close(self) -> None:
with self._stream_lock:
streams = self.stream_readers.copy()
for stream_id, stream_reader in streams.items():
self._stop_stream(stream_id)
stream_reader.wait_until_closed()

def _create_synced_reader(
self, stream_conf: StreamConfiguration, url: str
) -> SyncedStreamReader:

synced_stream_reader = SyncedStreamReader(
stream_conf,
url,
# No parent if moving to a different thread
parent=typing.cast(QObject, None),
)

return synced_stream_reader

def _ensure_running_streams(self) -> None:
"""Pause/unpause streams if over/under max"""
with self._stream_lock:
while (
len(self._running_streams) < self._MAX_ACTIVE_STREAMS
and self._paused_streams
):
self._set_stream_paused(self._paused_streams[0], False)

for _running_stream_id in self._running_streams[self._MAX_ACTIVE_STREAMS:]:
self._set_stream_paused(_running_stream_id, paused=True)

def _forget_stream(self, stream_id: int) -> None:
with self._stream_lock:
self._set_stream_paused(stream_id, paused=True)

self._paused_streams.remove(stream_id)
self.stream_readers.pop(stream_id)

def _get_stream_reader(
self,
stream_conf: StreamConfiguration,
url: str,
) -> Optional[SyncedStreamReader]:
with self._stream_lock:
if stream_conf.id in self.stream_readers:
# If it's paused, then run self._unpause_stream, otherwise, return it
stream_reader = self.stream_readers[stream_conf.id]
else:
stream_reader = self._create_synced_reader(stream_conf, url)
self.stream_readers[stream_conf.id] = stream_reader

return stream_reader

def _set_stream_paused(self, stream_id: int, paused: bool) -> None:
with self._stream_lock:
stream_reader = self.stream_readers[stream_id]

if paused:
stream_reader.pause_streaming()
destination_list = self._paused_streams
else:
stream_reader.resume_streaming()
destination_list = self._running_streams

if stream_id in self._paused_streams:
self._paused_streams.remove(stream_id)
if stream_id in self._running_streams:
self._running_streams.remove(stream_id)

destination_list.insert(0, stream_id)

def _start_stream(
self,
stream_conf: StreamConfiguration,
url: str,
) -> SyncedStreamReader:
with self._stream_lock:
stream_reader = self._get_stream_reader(stream_conf, url)

if stream_conf.id not in self._running_streams:
self._set_stream_paused(stream_conf.id, False)

self._ensure_running_streams()

return stream_reader

def _stop_stream(self, stream_id: int) -> None:
with self._stream_lock:
stream_reader = self.stream_readers[stream_id]
self._forget_stream(stream_id)
stream_reader.close()
Loading

0 comments on commit 209cb6f

Please sign in to comment.