Skip to content

Commit

Permalink
fix: truncation of standard streams (#653)
Browse files Browse the repository at this point in the history
* fix: truncation of standard streams

This change fixes a race condition in the console output worker
that sometimes caused outputs to not be sent to the frontend.

* typos
  • Loading branch information
akshayka authored Jan 25, 2024
1 parent caa23a5 commit 36a0bb8
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions marimo/_messaging/console_output_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,34 +75,38 @@ def buffered_writer(
was noticeably faster than the builtin queue.Queue in testing.)
"""

# only have a non-None timeout when there's at least one output buffered
timeout: Optional[float] = None
# only have a non-None timer when there's at least one output buffered
#
# when the timer expires, all buffered outputs are flushed
timer: Optional[float] = None

outputs_buffered_per_cell: dict[CellId_t, list[ConsoleMsg]] = {}
while True:
with cv:
# We wait for messages until we've timed-out
while timeout is None or timeout > 0:
# We wait for messages until the timer (if any) expires
while timer is None or timer > 0:
time_started_waiting = time.time()
# if we have at least one buffered output, wait for a finite
# amount of time; otherwise, timeout is None and we wait
# until we are notified.
cv.wait(timeout=timeout)
# if the timer is set or if the message queue is empty, wait;
# otherwise, no timer is set but we received a message, so
# process it
if timer is not None or not msg_queue:
cv.wait(timeout=timer)
while msg_queue:
_add_output_to_buffer(
msg_queue.popleft(), outputs_buffered_per_cell
)
if outputs_buffered_per_cell and timeout is None:
if outputs_buffered_per_cell and timer is None:
# start the timeout timer
timeout = TIMEOUT_S
elif timeout is not None:
timer = TIMEOUT_S
elif timer is not None:
time_waited = time.time() - time_started_waiting
timeout -= time_waited
timer -= time_waited

# the timeout has expired
# the timer has expired: flush the outputs
for cell_id, buffer in outputs_buffered_per_cell.items():
for output in buffer:
_write_console_output(
stream, output.stream, cell_id, output.data
)
outputs_buffered_per_cell = {}
timeout = None
timer = None

0 comments on commit 36a0bb8

Please sign in to comment.