From 974c6cdd5690dd7d1bb5a94b33fcce4e2eeec4f6 Mon Sep 17 00:00:00 2001 From: Ken Lauer <152229072+ken-lauer@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:05:59 -0800 Subject: [PATCH] ENH: 'with SubprocessTao.timeout' support --- pytao/subproc.py | 111 +++++++++++++++++++++++++++++++++++- pytao/tests/test_subproc.py | 25 ++++++++ 2 files changed, 133 insertions(+), 3 deletions(-) diff --git a/pytao/subproc.py b/pytao/subproc.py index 4a4ddfb6..d7ee87ab 100644 --- a/pytao/subproc.py +++ b/pytao/subproc.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextlib import ctypes import dataclasses import io @@ -248,13 +249,17 @@ def alive(self) -> bool: # No exit code -> is still running return self._subproc.poll() is None - def close(self): + def close(self) -> None: """Close the pipe.""" try: self.send_receive("quit", "", raises=False) except TaoDisconnectedError: pass + def close_forcefully(self) -> None: + """Close the pipe.""" + self._subproc.terminate() + def _tao_subprocess(self): """Subprocess monitor thread. Cleans up after the subprocess ends.""" @@ -400,6 +405,76 @@ def send_receive_custom(self, func: Callable, kwargs: Dict[str, SupportedKwarg]) ) +@contextlib.contextmanager +def subprocess_timeout_context( + taos: List[SubprocessTao], + timeout: float, + *, + timeout_hook: Optional[Callable[[], None]] = None, +): + """ + Context manager to set a timeout for a block of SubprocessTao calls. + + Note that there is no possibility for a graceful timeout. In the event + of a timeout, all subprocesses will be terminated. + + Parameters + ---------- + taos : list of SubprocessTao + timeout : float + The timeout duration in seconds. + timeout_hook : callable, optional + An alternative hook to call when the timeouts occur. + This replaces the built-in subprocess-closing hook. + + Yields + ------ + None + Yields control back to the calling context. + + Raises + ------ + TimeoutError + If the block of code does not execute within `when` seconds. + The Tao subprocesses are forcefully terminated at this point. + """ + timed_out = False + + def monitor(): + if evt.wait(timeout): + return + + nonlocal timed_out + timed_out = True + + if timeout_hook is not None: + timeout_hook() + else: + for tao in taos: + try: + tao.close_subprocess(force=True) + except Exception: + logger.debug("Subprocess close fail", exc_info=True) + + evt = threading.Event() + monitor_thread = threading.Thread(daemon=True, target=monitor) + monitor_thread.start() + try: + yield + except TaoDisconnectedError: + if not timed_out: + # Tao disconnected, but not due to our timeout + raise + # Otherwise, the reason for disconnection was our timeout closure. + + evt.set() + monitor_thread.join() + if timed_out: + raise TimeoutError( + f"Operation timed out after {timeout} seconds. Closing Tao subprocesses." + ) + + class SubprocessTao(Tao): """ Subprocess helper for Tao. @@ -448,12 +523,42 @@ def subprocess_alive(self) -> bool: return False return self._subproc_pipe_.alive - def close_subprocess(self) -> None: + def close_subprocess(self, *, force: bool = False) -> None: """Close the Tao subprocess.""" if self._subproc_pipe_ is not None: - self._subproc_pipe_.close() + if force: + self._subproc_pipe_.close_forcefully() + else: + self._subproc_pipe_.close() self._subproc_pipe_ = None + @contextlib.contextmanager + def timeout(self, when: float): + """ + Context manager to set a timeout for a block of SubprocessTao calls. + + Note that there is no possibility for a graceful timeout. In the event + of a timeout, the subprocess will be terminated. + + Parameters + ---------- + when : float + The timeout duration in seconds. + + Yields + ------ + None + Yields control back to the calling context. + + Raises + ------ + TimeoutError + If the block of code does not execute within `when` seconds. + The Tao subprocess is forcefully terminated at this point. + """ + with subprocess_timeout_context([self], timeout=when): + yield + def __enter__(self): return self diff --git a/pytao/tests/test_subproc.py b/pytao/tests/test_subproc.py index 4922fdf4..656c7d41 100644 --- a/pytao/tests/test_subproc.py +++ b/pytao/tests/test_subproc.py @@ -133,3 +133,28 @@ def test_custom_command_exception(subproc_tao: SubprocessTao) -> None: with pytest.raises(TaoCommandError) as ex: subproc_tao.subprocess_call(failure_func, a=3) assert "ValueError: test got kwargs: {'a': 3}" in str(ex.value) + + +def tao_custom_command_sleep(tao: Tao, delay: float): + assert isinstance(tao, Tao) + print(f"Sleeping for {delay}") + time.sleep(delay) + print("Done") + return delay + + +def test_custom_command_timeout(subproc_tao: SubprocessTao) -> None: + with SubprocessTao( + init_file="$ACC_ROOT_DIR/regression_tests/pipe_test/csr_beam_tracking/tao.init", + noplot=True, + ) as tao: + with pytest.raises(TimeoutError): + with tao.timeout(0.1): + res = tao.subprocess_call(tao_custom_command_sleep, delay=10.0) + print("subproc result was", res) + + +def test_custom_command_timeout_success(subproc_tao: SubprocessTao) -> None: + with subproc_tao.timeout(10.0): + res = subproc_tao.subprocess_call(tao_custom_command, value=1) + assert res == 2