|
7 | 7 | import ipaddress |
8 | 8 | import json |
9 | 9 | import logging |
| 10 | +import os |
10 | 11 | import os.path |
| 12 | +import select |
11 | 13 | import socket |
12 | 14 | import sys |
13 | 15 | import tempfile |
| 16 | +import time |
14 | 17 | from collections import defaultdict |
15 | 18 | from functools import wraps |
16 | 19 | from typing import Any, DefaultDict, Iterable, List, Mapping, Optional |
@@ -374,13 +377,77 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: |
374 | 377 | def launch(source: Source, args: List[str]) -> None: |
375 | 378 | source_entrypoint = AirbyteEntrypoint(source) |
376 | 379 | parsed_args = source_entrypoint.parse_args(args) |
377 | | - # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs |
378 | | - # Refer to: https://github.com/airbytehq/oncall/issues/6235 |
379 | 380 | with PRINT_BUFFER: |
380 | | - for message in source_entrypoint.run(parsed_args): |
381 | | - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and |
382 | | - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time |
| 381 | + _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) |
| 382 | + |
| 383 | + |
| 384 | +def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: |
| 385 | + """Write messages to stdout using non-blocking I/O to prevent deadlocks. |
| 386 | +
|
| 387 | + When the Airbyte platform pauses reading from the source container's |
| 388 | + stdout pipe, a blocking ``write()`` stalls the main thread. Since the |
| 389 | + main thread is also responsible for draining the internal record queue, |
| 390 | + this causes a cascading deadlock: the queue fills, worker threads block |
| 391 | + on ``queue.put()``, and the entire process hangs. |
| 392 | +
|
| 393 | + This function sets stdout to non-blocking mode so that ``os.write()`` |
| 394 | + raises ``BlockingIOError`` instead of blocking when the pipe buffer is |
| 395 | + full. It then uses ``select()`` to wait (with a timeout) until the fd |
| 396 | + is writable again. The main thread remains in a Python-level retry |
| 397 | + loop it controls, so it never gets stuck in a kernel-level syscall. |
| 398 | +
|
| 399 | + Memory stays bounded because the upstream record queue keeps its |
| 400 | + default bounded size (10,000 items). When stdout is blocked the main |
| 401 | + thread pauses here, the queue fills naturally, and worker threads |
| 402 | + block on ``queue.put()`` with their own timeouts. When the platform |
| 403 | + resumes reading, ``select()`` returns, the write completes, the main |
| 404 | + thread resumes draining the queue, and workers unblock automatically. |
| 405 | + """ |
| 406 | + stdout_fd = sys.stdout.fileno() |
| 407 | + original_blocking = os.get_blocking(stdout_fd) |
| 408 | + |
| 409 | + try: |
| 410 | + os.set_blocking(stdout_fd, False) |
| 411 | + except OSError: |
| 412 | + # Fallback: if we cannot set non-blocking (e.g. redirected to |
| 413 | + # a file or in a test environment), just write normally. |
| 414 | + for message in messages: |
383 | 415 | print(f"{message}\n", end="") |
| 416 | + return |
| 417 | + |
| 418 | + try: |
| 419 | + for message in messages: |
| 420 | + data = f"{message}\n".encode() |
| 421 | + _write_all_nonblocking(stdout_fd, data) |
| 422 | + finally: |
| 423 | + try: |
| 424 | + os.set_blocking(stdout_fd, original_blocking) |
| 425 | + except OSError: |
| 426 | + pass |
| 427 | + |
| 428 | + |
| 429 | +def _write_all_nonblocking(fd: int, data: bytes) -> None: |
| 430 | + """Write all bytes to a non-blocking fd, retrying with select on EAGAIN.""" |
| 431 | + total_written = 0 |
| 432 | + last_progress = time.monotonic() |
| 433 | + |
| 434 | + while total_written < len(data): |
| 435 | + try: |
| 436 | + written = os.write(fd, data[total_written:]) |
| 437 | + total_written += written |
| 438 | + last_progress = time.monotonic() |
| 439 | + except BlockingIOError: |
| 440 | + # Pipe buffer is full. Wait up to 1 second for it to become |
| 441 | + # writable, then retry. The short timeout keeps the main |
| 442 | + # thread responsive and allows periodic stall detection. |
| 443 | + _, writable, _ = select.select([], [fd], [], 1.0) |
| 444 | + if not writable: |
| 445 | + elapsed = time.monotonic() - last_progress |
| 446 | + if elapsed > 600: |
| 447 | + raise RuntimeError( |
| 448 | + f"stdout pipe blocked for {elapsed:.0f}s with no progress. " |
| 449 | + "The platform is not reading from the source container pipe." |
| 450 | + ) |
384 | 451 |
|
385 | 452 |
|
386 | 453 | def _init_internal_request_filter() -> None: |
|
0 commit comments