Skip to content

Commit

Permalink
Merge pull request #117 from ken-lauer/enh_subproc_timeout
Browse files Browse the repository at this point in the history
ENH: 'with SubprocessTao.timeout' support
  • Loading branch information
ChristopherMayes authored Jan 8, 2025
2 parents 61c4fb5 + 974c6cd commit 4c36f00
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 3 deletions.
111 changes: 108 additions & 3 deletions pytao/subproc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextlib
import ctypes
import dataclasses
import io
Expand Down Expand Up @@ -250,13 +251,17 @@ def alive(self) -> bool:
# No exit code -> is still running
return self._subproc.poll() is None

def close(self):
def close(self) -> None:
"""Close the pipe."""
try:
self.send_receive("quit", "", raises=False)
except TaoDisconnectedError:
pass

def close_forcefully(self) -> None:
"""Close the pipe."""
self._subproc.terminate()

def _tao_subprocess(self):
"""Subprocess monitor thread. Cleans up after the subprocess ends."""

Expand Down Expand Up @@ -403,6 +408,76 @@ def send_receive_custom(self, func: Callable, kwargs: Dict[str, SupportedKwarg])
)


@contextlib.contextmanager
def subprocess_timeout_context(
taos: List[SubprocessTao],
timeout: float,
*,
timeout_hook: Optional[Callable[[], None]] = None,
):
"""
Context manager to set a timeout for a block of SubprocessTao calls.
Note that there is no possibility for a graceful timeout. In the event
of a timeout, all subprocesses will be terminated.
Parameters
----------
taos : list of SubprocessTao
timeout : float
The timeout duration in seconds.
timeout_hook : callable, optional
An alternative hook to call when the timeouts occur.
This replaces the built-in subprocess-closing hook.
Yields
------
None
Yields control back to the calling context.
Raises
------
TimeoutError
If the block of code does not execute within `when` seconds.
The Tao subprocesses are forcefully terminated at this point.
"""
timed_out = False

def monitor():
if evt.wait(timeout):
return

nonlocal timed_out
timed_out = True

if timeout_hook is not None:
timeout_hook()
else:
for tao in taos:
try:
tao.close_subprocess(force=True)
except Exception:
logger.debug("Subprocess close fail", exc_info=True)

evt = threading.Event()
monitor_thread = threading.Thread(daemon=True, target=monitor)
monitor_thread.start()
try:
yield
except TaoDisconnectedError:
if not timed_out:
# Tao disconnected, but not due to our timeout
raise
# Otherwise, the reason for disconnection was our timeout closure.

evt.set()
monitor_thread.join()
if timed_out:
raise TimeoutError(
f"Operation timed out after {timeout} seconds. Closing Tao subprocesses."
)


class SubprocessTao(Tao):
"""
Subprocess helper for Tao.
Expand Down Expand Up @@ -479,12 +554,42 @@ def subprocess_alive(self) -> bool:
return False
return self._subproc_pipe_.alive

def close_subprocess(self) -> None:
def close_subprocess(self, *, force: bool = False) -> None:
"""Close the Tao subprocess."""
if self._subproc_pipe_ is not None:
self._subproc_pipe_.close()
if force:
self._subproc_pipe_.close_forcefully()
else:
self._subproc_pipe_.close()
self._subproc_pipe_ = None

@contextlib.contextmanager
def timeout(self, when: float):
"""
Context manager to set a timeout for a block of SubprocessTao calls.
Note that there is no possibility for a graceful timeout. In the event
of a timeout, the subprocess will be terminated.
Parameters
----------
when : float
The timeout duration in seconds.
Yields
------
None
Yields control back to the calling context.
Raises
------
TimeoutError
If the block of code does not execute within `when` seconds.
The Tao subprocess is forcefully terminated at this point.
"""
with subprocess_timeout_context([self], timeout=when):
yield

def __enter__(self):
return self

Expand Down
25 changes: 25 additions & 0 deletions pytao/tests/test_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,28 @@ def test_custom_command_exception(subproc_tao: SubprocessTao) -> None:
with pytest.raises(TaoCommandError) as ex:
subproc_tao.subprocess_call(failure_func, a=3)
assert "ValueError: test got kwargs: {'a': 3}" in str(ex.value)


def tao_custom_command_sleep(tao: Tao, delay: float):
assert isinstance(tao, Tao)
print(f"Sleeping for {delay}")
time.sleep(delay)
print("Done")
return delay


def test_custom_command_timeout(subproc_tao: SubprocessTao) -> None:
with SubprocessTao(
init_file="$ACC_ROOT_DIR/regression_tests/pipe_test/csr_beam_tracking/tao.init",
noplot=True,
) as tao:
with pytest.raises(TimeoutError):
with tao.timeout(0.1):
res = tao.subprocess_call(tao_custom_command_sleep, delay=10.0)
print("subproc result was", res)


def test_custom_command_timeout_success(subproc_tao: SubprocessTao) -> None:
with subproc_tao.timeout(10.0):
res = subproc_tao.subprocess_call(tao_custom_command, value=1)
assert res == 2

0 comments on commit 4c36f00

Please sign in to comment.