@@ -376,7 +376,8 @@ def __init__(
376376 self ._flush_pending = False
377377 self ._subprocess_flush_pending = False
378378 self ._io_loop = pub_thread .io_loop
379- self ._new_buffer ()
379+ self ._buffer_lock = threading .RLock ()
380+ self ._buffer = StringIO ()
380381 self .echo = None
381382 self ._isatty = bool (isatty )
382383
@@ -528,7 +529,8 @@ def write(self, string: str) -> int:
528529
529530 is_child = (not self ._is_master_process ())
530531 # only touch the buffer in the IO thread to avoid races
531- self .pub_thread .schedule (lambda : self ._buffer .write (string ))
532+ with self ._buffer_lock :
533+ self ._buffer .write (string )
532534 if is_child :
533535 # mp.Pool cannot be trusted to flush promptly (or ever),
534536 # and this helps.
@@ -553,17 +555,15 @@ def writable(self):
553555 return True
554556
555557 def _flush_buffer (self ):
556- """clear the current buffer and return the current buffer data.
557-
558- This should only be called in the IO thread.
559- """
560- data = ''
561- if self ._buffer is not None :
562- buf = self ._buffer
563- self ._new_buffer ()
564- data = buf .getvalue ()
565- buf .close ()
558+ """clear the current buffer and return the current buffer data."""
559+ buf = self ._rotate_buffer ()
560+ data = buf .getvalue ()
561+ buf .close ()
566562 return data
567563
568- def _new_buffer (self ):
569- self ._buffer = StringIO ()
564+ def _rotate_buffer (self ):
565+ """Returns the current buffer and replaces it with an empty buffer."""
566+ with self ._buffer_lock :
567+ old_buffer = self ._buffer
568+ self ._buffer = StringIO ()
569+ return old_buffer
0 commit comments