Skip to content

Commit 059fd36

Browse files
fix: use unbounded stdout buffer to prevent deadlock with high-throughput syncs
Co-Authored-By: unknown <>
1 parent 3bc6d54 commit 059fd36

1 file changed

Lines changed: 1 addition & 9 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -379,14 +379,6 @@ def launch(source: Source, args: List[str]) -> None:
379379
_buffered_write_to_stdout(source_entrypoint.run(parsed_args))
380380

381381

382-
_STDOUT_WRITER_QUEUE_SIZE = 50_000
383-
"""Upper bound on the number of serialised messages buffered between the
384-
main (generator) thread and the dedicated stdout-writer thread. This
385-
decouples the generator from OS-level stdout backpressure so that the
386-
CDK's internal record queue keeps draining even when the platform is
387-
slow to read from the pipe."""
388-
389-
390382
def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
391383
"""Drain *messages* through a background writer thread.
392384
@@ -400,7 +392,7 @@ def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
400392
re-raised in the main thread after the generator is exhausted.
401393
"""
402394
_SENTINEL = None # signals the writer to stop
403-
buffer: Queue[Optional[str]] = Queue(maxsize=_STDOUT_WRITER_QUEUE_SIZE)
395+
buffer: Queue[Optional[str]] = Queue()
404396
writer_error: List[Exception] = []
405397

406398
def _writer() -> None:

0 commit comments

Comments
 (0)