Skip to content

Commit f0187ea

Browse files
authored
fix: flush console output buffer before marking cell idle (#9164)
Console messages (stdout/stderr) are batched by a background thread with a 10ms buffer for performance. Without an explicit flush, messages arrive at the frontend after the cell is marked idle and completed-run is sent. A subsequent run clears the console (console=[]) before the user sees the previous output. Add a FlushMarker sentinel to the console message queue. When the buffered writer encounters it, it flushes immediately and signals the caller. Implement flush() on ThreadSafeStdout/ThreadSafeStderr (previously no-ops with a TODO) and add a _flush_console post-execution hook that runs just before _set_status_idle. Closes #4821
1 parent 76cc73b commit f0187ea

5 files changed

Lines changed: 186 additions & 28 deletions

File tree

marimo/_messaging/console_output_worker.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Copyright 2026 Marimo. All rights reserved.
22
from __future__ import annotations
33

4+
import threading
45
import time
5-
from dataclasses import dataclass
6+
from dataclasses import dataclass, field
67
from typing import TYPE_CHECKING, Literal
78

89
from marimo._messaging.cell_output import CellChannel, CellOutput
@@ -29,6 +30,13 @@ class ConsoleMsg:
2930
mimetype: ConsoleMimeType
3031

3132

33+
@dataclass
34+
class FlushMarker:
35+
"""Sentinel that tells the buffered writer to flush immediately."""
36+
37+
done: threading.Event = field(default_factory=threading.Event)
38+
39+
3240
def _write_console_output(
3341
stream: Stream,
3442
stream_type: StreamT,
@@ -70,8 +78,24 @@ def _add_output_to_buffer(
7078
outputs_buffered_per_cell[cell_id] = [console_output]
7179

7280

81+
def _flush_outputs(
82+
outputs_buffered_per_cell: dict[CellId_t, list[ConsoleMsg]],
83+
stream: Stream,
84+
) -> None:
85+
for cell_id, buffer in outputs_buffered_per_cell.items():
86+
for output in buffer:
87+
_write_console_output(
88+
stream,
89+
output.stream,
90+
cell_id,
91+
output.data,
92+
output.mimetype,
93+
)
94+
outputs_buffered_per_cell.clear()
95+
96+
7397
def buffered_writer(
74-
msg_queue: deque[ConsoleMsg | None],
98+
msg_queue: deque[ConsoleMsg | FlushMarker | None],
7599
stream: Stream,
76100
cv: Condition,
77101
) -> None:
@@ -84,6 +108,7 @@ def buffered_writer(
84108
was noticeably faster than the builtin queue.Queue in testing.)
85109
86110
A `None` passed to `msg_queue` signals the writer should terminate.
111+
A `FlushMarker` forces an immediate flush and signals the caller.
87112
"""
88113

89114
# only have a non-None timer when there's at least one output buffered
@@ -93,6 +118,7 @@ def buffered_writer(
93118

94119
outputs_buffered_per_cell: dict[CellId_t, list[ConsoleMsg]] = {}
95120
while True:
121+
flush_marker: FlushMarker | None = None
96122
with cv:
97123
# We wait for messages until the timer (if any) expires
98124
while timer is None or timer > 0:
@@ -106,23 +132,21 @@ def buffered_writer(
106132
msg = msg_queue.popleft()
107133
if msg is None:
108134
return
135+
if isinstance(msg, FlushMarker):
136+
flush_marker = msg
137+
break
109138
_add_output_to_buffer(msg, outputs_buffered_per_cell)
139+
if flush_marker is not None:
140+
break
110141
if outputs_buffered_per_cell and timer is None:
111142
# start the timeout timer
112143
timer = TIMEOUT_S
113144
elif timer is not None:
114145
time_waited = time.time() - time_started_waiting
115146
timer -= time_waited
116147

117-
# the timer has expired: flush the outputs
118-
for cell_id, buffer in outputs_buffered_per_cell.items():
119-
for output in buffer:
120-
_write_console_output(
121-
stream,
122-
output.stream,
123-
cell_id,
124-
output.data,
125-
output.mimetype,
126-
)
127-
outputs_buffered_per_cell = {}
148+
# the timer has expired (or a flush was requested): flush the outputs
149+
_flush_outputs(outputs_buffered_per_cell, stream)
150+
if flush_marker is not None:
151+
flush_marker.done.set()
128152
timer = None

marimo/_messaging/streams.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
from marimo import _loggers
1616
from marimo._messaging.cell_output import CellChannel
17-
from marimo._messaging.console_output_worker import ConsoleMsg, buffered_writer
17+
from marimo._messaging.console_output_worker import (
18+
ConsoleMsg,
19+
FlushMarker,
20+
buffered_writer,
21+
)
1822
from marimo._messaging.mimetypes import ConsoleMimeType
1923
from marimo._messaging.types import (
2024
KernelMessage,
@@ -32,6 +36,12 @@
3236
LOGGER = _loggers.marimo_logger()
3337

3438

39+
# Maximum time to block waiting for the buffered console writer to flush.
40+
# The flush hook runs on the hot path between cell execution and idle, so
41+
# we bound the wait even though flushes normally complete in <10ms.
42+
_FLUSH_CONSOLE_TIMEOUT_S = 5.0
43+
44+
3545
# Byte limits on outputs exist for two reasons
3646
#
3747
# 1. We use a multiprocessing.Connection object to send outputs from
@@ -106,7 +116,9 @@ def __init__(
106116
if self.redirect_console:
107117
# Console outputs are buffered
108118
self.console_msg_cv = threading.Condition(threading.Lock())
109-
self.console_msg_queue: deque[ConsoleMsg | None] = deque()
119+
self.console_msg_queue: deque[ConsoleMsg | FlushMarker | None] = (
120+
deque()
121+
)
110122
self.buffered_console_thread = threading.Thread(
111123
target=buffered_writer,
112124
args=(self.console_msg_queue, self, self.console_msg_cv),
@@ -133,14 +145,41 @@ def write(self, data: KernelMessage) -> None:
133145
e,
134146
)
135147

148+
def flush_console(self) -> None:
149+
"""Force the buffered console writer to flush immediately.
150+
151+
Blocks until all pending console messages have been sent to the
152+
frontend, or until a short timeout elapses if the writer thread
153+
is no longer alive. This ensures that stderr/stdout output
154+
produced during cell execution is delivered before the cell is
155+
marked idle.
156+
"""
157+
if not self.redirect_console:
158+
return
159+
# If the buffered writer isn't alive (e.g., shutdown in progress),
160+
# enqueuing a marker would never be drained. Bail out early.
161+
if not self.buffered_console_thread.is_alive():
162+
return
163+
marker = FlushMarker()
164+
with self.console_msg_cv:
165+
self.console_msg_queue.append(marker)
166+
self.console_msg_cv.notify()
167+
# Bounded wait: if the writer dies or stalls, don't block the
168+
# caller indefinitely.
169+
if not marker.done.wait(timeout=_FLUSH_CONSOLE_TIMEOUT_S):
170+
LOGGER.warning(
171+
"Timed out waiting for console flush after %ss",
172+
_FLUSH_CONSOLE_TIMEOUT_S,
173+
)
174+
136175
def stop(self) -> None:
137176
"""Teardown resources created by the stream."""
138177
# Sending `None` through the queue signals the console thread to exit.
139178
# We don't join the thread in case its processing outputs still; don't
140179
# want to block the entire program.
141180
if self.redirect_console:
142-
self.console_msg_queue.append(None)
143181
with self.console_msg_cv:
182+
self.console_msg_queue.append(None)
144183
self.console_msg_cv.notify()
145184

146185

@@ -263,8 +302,7 @@ def seekable(self) -> bool:
263302
return False
264303

265304
def flush(self) -> None:
266-
# TODO(akshayka): maybe force the buffered writer to write
267-
return
305+
self._stream.flush_console()
268306

269307
def _write_with_mimetype(
270308
self, data: str, mimetype: ConsoleMimeType
@@ -280,15 +318,15 @@ def _write_with_mimetype(
280318
"Warning: marimo truncated a very large console output.\n"
281319
)
282320
data = data[: int(max_bytes)] + " ... "
283-
self._stream.console_msg_queue.append(
284-
ConsoleMsg(
285-
stream=CellChannel.STDOUT,
286-
cell_id=self._stream.cell_id,
287-
data=data,
288-
mimetype=mimetype,
289-
)
290-
)
291321
with self._stream.console_msg_cv:
322+
self._stream.console_msg_queue.append(
323+
ConsoleMsg(
324+
stream=CellChannel.STDOUT,
325+
cell_id=self._stream.cell_id,
326+
data=data,
327+
mimetype=mimetype,
328+
)
329+
)
292330
self._stream.console_msg_cv.notify()
293331
return len(data)
294332

@@ -338,8 +376,7 @@ def seekable(self) -> bool:
338376
return False
339377

340378
def flush(self) -> None:
341-
# TODO(akshayka): maybe force the buffered writer to write
342-
return
379+
self._stream.flush_console()
343380

344381
def _write_with_mimetype(
345382
self, data: str, mimetype: ConsoleMimeType

marimo/_messaging/types.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class Stream(abc.ABC):
2525
def write(self, data: KernelMessage) -> None:
2626
pass
2727

28+
def flush_console(self) -> None:
29+
"""Flush buffered console output, if any."""
30+
return
31+
2832
def stop(self) -> None:
2933
"""Tear down resources, if any."""
3034
return

marimo/_runtime/runner/hooks_post_execution.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,27 @@ def _reset_matplotlib_context(
494494
exec("__marimo__._output.mpl.close_figures()", ctx.glbls)
495495

496496

497+
@kernel_tracer.start_as_current_span("flush_console")
498+
def _flush_console(
499+
cell: CellImpl,
500+
ctx: PostExecutionHookContext,
501+
run_result: cell_runner.RunResult,
502+
) -> None:
503+
"""Flush buffered console output before marking the cell idle.
504+
505+
Console messages (stdout/stderr) are batched by a background thread
506+
for performance. Without an explicit flush, the messages may arrive
507+
at the frontend *after* the cell is marked idle and ``completed-run``
508+
is sent. A subsequent run would then clear the console (via
509+
``console=[]``) before the user sees the output.
510+
"""
511+
del cell
512+
del run_result
513+
del ctx
514+
stream = get_context().stream
515+
stream.flush_console()
516+
517+
497518
POST_EXECUTION_HOOKS: list[PostExecutionHook] = [
498519
_set_imported_defs,
499520
_set_run_result_status,
@@ -506,6 +527,9 @@ def _reset_matplotlib_context(
506527
_broadcast_duckdb_datasource,
507528
_broadcast_outputs,
508529
_reset_matplotlib_context,
530+
# Flush buffered console output so that stderr/stdout arrives at the
531+
# frontend before the cell transitions to idle.
532+
_flush_console,
509533
# set status to idle after all post-processing is done, in case the
510534
# other hooks take a long time (broadcast outputs can take a long time
511535
# if a formatter is slow).

tests/_messaging/test_console_output_worker.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from marimo._messaging.console_output_worker import (
1010
TIMEOUT_S,
1111
ConsoleMsg,
12+
FlushMarker,
1213
_add_output_to_buffer,
1314
_can_merge_outputs,
1415
_write_console_output,
@@ -181,6 +182,74 @@ def test_buffered_writer_basic(self) -> None:
181182
cv.notify()
182183
thread.join(timeout=1.0)
183184

185+
def test_buffered_writer_flush_marker_flushes_immediately(self) -> None:
186+
# A FlushMarker should cause buffered output to be written out
187+
# without waiting for the TIMEOUT_S batching window, and the
188+
# marker's done event should be set once flushing is complete.
189+
stream = MockStream()
190+
msg_queue: deque[ConsoleMsg | FlushMarker | None] = deque()
191+
cv = threading.Condition()
192+
193+
thread = threading.Thread(
194+
target=buffered_writer, args=(msg_queue, stream, cv)
195+
)
196+
thread.daemon = True
197+
thread.start()
198+
199+
try:
200+
marker = FlushMarker()
201+
with cv:
202+
msg_queue.append(
203+
ConsoleMsg(
204+
stream=CellChannel.STDOUT,
205+
cell_id="cell1",
206+
data="Hello",
207+
mimetype="text/plain",
208+
)
209+
)
210+
msg_queue.append(marker)
211+
cv.notify()
212+
213+
# The flush should complete promptly — well before a batching
214+
# window would expire. Give it generous headroom for CI jitter
215+
# but assert it happens faster than a multi-batch wait.
216+
assert marker.done.wait(timeout=1.0), (
217+
"FlushMarker did not signal completion within timeout"
218+
)
219+
assert len(stream.operations) == 1
220+
assert stream.operations[0]["console"]["data"] == "Hello"
221+
finally:
222+
with cv:
223+
msg_queue.append(None)
224+
cv.notify()
225+
thread.join(timeout=1.0)
226+
227+
def test_buffered_writer_flush_marker_on_empty_queue(self) -> None:
228+
# A FlushMarker on an empty buffer should still signal completion.
229+
stream = MockStream()
230+
msg_queue: deque[ConsoleMsg | FlushMarker | None] = deque()
231+
cv = threading.Condition()
232+
233+
thread = threading.Thread(
234+
target=buffered_writer, args=(msg_queue, stream, cv)
235+
)
236+
thread.daemon = True
237+
thread.start()
238+
239+
try:
240+
marker = FlushMarker()
241+
with cv:
242+
msg_queue.append(marker)
243+
cv.notify()
244+
245+
assert marker.done.wait(timeout=1.0)
246+
assert stream.operations == []
247+
finally:
248+
with cv:
249+
msg_queue.append(None)
250+
cv.notify()
251+
thread.join(timeout=1.0)
252+
184253
def test_buffered_writer_multiple_messages(self) -> None:
185254
# Test buffered writer with multiple messages
186255
stream = MockStream()

0 commit comments

Comments
 (0)