From 36a0bb81a82ade8048b0997f0a3835950b93bf72 Mon Sep 17 00:00:00 2001 From: Akshay Agrawal Date: Wed, 24 Jan 2024 19:53:58 -0800 Subject: [PATCH] fix: truncation of standard streams (#653) * fix: truncation of standard streams This change fixes a race condition in the console output worker that sometimes caused outputs to not be sent to the frontend. * typos --- marimo/_messaging/console_output_worker.py | 32 ++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/marimo/_messaging/console_output_worker.py b/marimo/_messaging/console_output_worker.py index b00a9374455..89c55ab9ece 100644 --- a/marimo/_messaging/console_output_worker.py +++ b/marimo/_messaging/console_output_worker.py @@ -75,34 +75,38 @@ def buffered_writer( was noticeably faster than the builtin queue.Queue in testing.) """ - # only have a non-None timeout when there's at least one output buffered - timeout: Optional[float] = None + # only have a non-None timer when there's at least one output buffered + # + # when the timer expires, all buffered outputs are flushed + timer: Optional[float] = None + outputs_buffered_per_cell: dict[CellId_t, list[ConsoleMsg]] = {} while True: with cv: - # We wait for messages until we've timed-out - while timeout is None or timeout > 0: + # We wait for messages until the timer (if any) expires + while timer is None or timer > 0: time_started_waiting = time.time() - # if we have at least one buffered output, wait for a finite - # amount of time; otherwise, timeout is None and we wait - # until we are notified. - cv.wait(timeout=timeout) + # if the timer is set or if the message queue is empty, wait; + # otherwise, no timer is set but we received a message, so + # process it + if timer is not None or not msg_queue: + cv.wait(timeout=timer) while msg_queue: _add_output_to_buffer( msg_queue.popleft(), outputs_buffered_per_cell ) - if outputs_buffered_per_cell and timeout is None: + if outputs_buffered_per_cell and timer is None: # start the timeout timer - timeout = TIMEOUT_S - elif timeout is not None: + timer = TIMEOUT_S + elif timer is not None: time_waited = time.time() - time_started_waiting - timeout -= time_waited + timer -= time_waited - # the timeout has expired + # the timer has expired: flush the outputs for cell_id, buffer in outputs_buffered_per_cell.items(): for output in buffer: _write_console_output( stream, output.stream, cell_id, output.data ) outputs_buffered_per_cell = {} - timeout = None + timer = None