|
9 | 9 | import logging |
10 | 10 | import os |
11 | 11 | import os.path |
12 | | -import queue |
13 | 12 | import socket |
14 | 13 | import sys |
15 | 14 | import tempfile |
@@ -378,203 +377,63 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: |
378 | 377 | def launch(source: Source, args: List[str]) -> None: |
379 | 378 | source_entrypoint = AirbyteEntrypoint(source) |
380 | 379 | parsed_args = source_entrypoint.parse_args(args) |
381 | | - _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) |
382 | 380 |
|
383 | | - |
384 | | -def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: |
385 | | - """Write messages to stdout via a dedicated writer thread to prevent deadlocks. |
386 | | -
|
387 | | - When the Airbyte platform pauses reading from the source container's |
388 | | - stdout pipe, a blocking ``write()`` in the main thread stalls message |
389 | | - processing. Since the main thread is also responsible for draining the |
390 | | - internal record queue, this causes a cascading deadlock: the queue |
391 | | - fills, worker threads block on ``queue.put()``, and the entire process |
392 | | - hangs. |
393 | | -
|
394 | | - This function decouples stdout writing from the main thread by routing |
395 | | - messages through a bounded internal queue to a dedicated writer thread. |
396 | | - The writer thread performs normal blocking ``os.write()`` calls; if the |
397 | | - pipe is full, only the writer thread stalls — the main thread continues |
398 | | - iterating the message generator (which drains the record queue). |
399 | | -
|
400 | | - When the internal write queue fills (because the writer is blocked on |
401 | | - the pipe), the main thread retries ``queue.put()`` with a 1-second |
402 | | - timeout. A watchdog detects if no message has been accepted for 600 |
403 | | - seconds and raises ``RuntimeError`` to terminate the process cleanly. |
404 | | - """ |
405 | | - # In test environments (pytest capsys) or wrappers like PRINT_BUFFER, |
406 | | - # sys.stdout may have been replaced. Detect this via fileno() and |
407 | | - # fall back to print() so output goes through the capture layer. |
408 | | - try: |
409 | | - current_fd = sys.stdout.fileno() |
410 | | - except (OSError, AttributeError, ValueError): |
411 | | - for message in messages: |
412 | | - print(f"{message}\n", end="") |
413 | | - return |
414 | | - |
415 | | - real_stdout = sys.__stdout__ |
416 | | - if real_stdout is None or not hasattr(real_stdout, "fileno"): |
417 | | - for message in messages: |
418 | | - print(f"{message}\n", end="") |
419 | | - return |
420 | | - |
421 | | - try: |
422 | | - real_fd = real_stdout.fileno() |
423 | | - except (OSError, AttributeError, ValueError): |
424 | | - for message in messages: |
425 | | - print(f"{message}\n", end="") |
426 | | - return |
427 | | - |
428 | | - if current_fd != real_fd: |
429 | | - for message in messages: |
430 | | - print(f"{message}\n", end="") |
431 | | - return |
432 | | - |
433 | | - # Bounded queue decouples the main thread from stdout I/O. |
434 | | - # The writer thread does blocking writes; if the pipe is full only the |
435 | | - # writer stalls — the main thread keeps draining the record queue. |
436 | | - _WRITE_QUEUE_SIZE = 1000 |
437 | | - _WATCHDOG_TIMEOUT_S = 600 |
438 | | - write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) |
439 | | - writer_error: List[Exception] = [] |
440 | | - |
441 | | - logger = logging.getLogger("airbyte_cdk.stdout_writer") |
442 | | - _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this |
443 | | - _HEARTBEAT_INTERVAL_S = 30.0 # emit a heartbeat to stderr every 30s |
| 381 | + # Heartbeat state — shared with the background heartbeat thread. |
| 382 | + _HEARTBEAT_INTERVAL_S = 30.0 |
444 | 383 | messages_written = 0 |
445 | 384 | bytes_written = 0 |
446 | | - last_write_ts = time.monotonic() |
447 | | - total_blocked_s = 0.0 |
448 | | - block_count = 0 |
449 | | - pipe_blocked = False # True while os.write() is in progress |
450 | | - pipe_blocked_since = 0.0 # monotonic timestamp when current os.write() started |
| 385 | + print_blocked = False |
| 386 | + print_blocked_since = 0.0 |
451 | 387 | heartbeat_stop = threading.Event() |
452 | 388 |
|
453 | 389 | def _heartbeat() -> None: |
454 | | - """Emit periodic status to stderr so we can prove pipe-blocking in Cloud logs. |
| 390 | + """Emit periodic status to stderr to diagnose stdout pipe blocking. |
455 | 391 |
|
456 | | - This thread writes directly to fd 2 (stderr) which is collected by the |
457 | | - Kubernetes container runtime independently of the orchestrator that reads |
458 | | - stdout. Even when the orchestrator stops reading stdout, these heartbeat |
459 | | - lines should still appear in the Cloud job logs. |
| 392 | + Writes directly to fd 2 (stderr) which the Kubernetes container |
| 393 | + runtime collects independently of the orchestrator reading stdout. |
460 | 394 | """ |
461 | 395 | start = time.monotonic() |
462 | | - stderr_fd = 2 # write directly to fd 2, bypassing Python buffering |
| 396 | + stderr_fd = 2 |
463 | 397 | while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): |
464 | 398 | now = time.monotonic() |
465 | | - elapsed_total = now - start |
466 | | - blocked_str = "YES" if pipe_blocked else "NO" |
467 | | - blocked_dur = f" blocked_since={now - pipe_blocked_since:.0f}s" if pipe_blocked else "" |
| 399 | + elapsed = now - start |
| 400 | + blocked_str = "YES" if print_blocked else "NO" |
| 401 | + blocked_dur = ( |
| 402 | + f" blocked_since={now - print_blocked_since:.0f}s" |
| 403 | + if print_blocked |
| 404 | + else "" |
| 405 | + ) |
468 | 406 | line = ( |
469 | | - f"STDOUT_WRITER_HEARTBEAT: t={elapsed_total:.0f}s " |
| 407 | + f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " |
470 | 408 | f"msgs={messages_written} bytes={bytes_written} " |
471 | | - f"pipe_blocked={blocked_str}{blocked_dur} " |
472 | | - f"queue={write_queue.qsize()}/{_WRITE_QUEUE_SIZE}\n" |
| 409 | + f"print_blocked={blocked_str}{blocked_dur}\n" |
473 | 410 | ) |
474 | 411 | try: |
475 | 412 | os.write(stderr_fd, line.encode()) |
476 | 413 | except OSError: |
477 | | - pass # stderr itself might be broken; nothing we can do |
| 414 | + pass |
478 | 415 |
|
479 | 416 | heartbeat_thread = threading.Thread( |
480 | | - target=_heartbeat, name="stdout-writer-heartbeat", daemon=True |
| 417 | + target=_heartbeat, name="stdout-heartbeat", daemon=True |
481 | 418 | ) |
482 | 419 | heartbeat_thread.start() |
483 | 420 |
|
484 | | - def _stdout_writer() -> None: |
485 | | - """Dedicated thread that writes queued messages to stdout.""" |
486 | | - nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count |
487 | | - nonlocal pipe_blocked, pipe_blocked_since |
488 | | - try: |
489 | | - while True: |
490 | | - data = write_queue.get() |
491 | | - if data is None: |
492 | | - break |
493 | | - total = 0 |
494 | | - while total < len(data): |
495 | | - pipe_blocked = True |
496 | | - pipe_blocked_since = time.monotonic() |
497 | | - before = pipe_blocked_since |
498 | | - written = os.write(real_fd, data[total:]) |
499 | | - elapsed = time.monotonic() - before |
500 | | - pipe_blocked = False |
501 | | - total += written |
502 | | - if elapsed >= _BLOCK_LOG_THRESHOLD_S: |
503 | | - block_count += 1 |
504 | | - total_blocked_s += elapsed |
505 | | - logger.warning( |
506 | | - "STDOUT_WRITER: os.write() blocked for %.1fs " |
507 | | - "(wrote %d bytes). block_count=%d total_blocked=%.1fs " |
508 | | - "messages_written=%d bytes_written=%d queue_size=%d", |
509 | | - elapsed, |
510 | | - written, |
511 | | - block_count, |
512 | | - total_blocked_s, |
513 | | - messages_written, |
514 | | - bytes_written, |
515 | | - write_queue.qsize(), |
516 | | - ) |
| 421 | + # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs |
| 422 | + # Refer to: https://github.com/airbytehq/oncall/issues/6235 |
| 423 | + try: |
| 424 | + with PRINT_BUFFER: |
| 425 | + for message in source_entrypoint.run(parsed_args): |
| 426 | + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and |
| 427 | + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time |
| 428 | + data = f"{message}\n" |
| 429 | + print_blocked = True |
| 430 | + print_blocked_since = time.monotonic() |
| 431 | + print(data, end="") |
| 432 | + print_blocked = False |
517 | 433 | messages_written += 1 |
518 | 434 | bytes_written += len(data) |
519 | | - last_write_ts = time.monotonic() |
520 | | - except (KeyboardInterrupt, SystemExit): |
521 | | - raise |
522 | | - except Exception as exc: |
523 | | - writer_error.append(exc) |
524 | | - |
525 | | - writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) |
526 | | - writer.start() |
527 | | - logger.info( |
528 | | - "STDOUT_WRITER: started writer_thread fd=%d queue_size=%d watchdog=%ds heartbeat=%ds", |
529 | | - real_fd, |
530 | | - _WRITE_QUEUE_SIZE, |
531 | | - _WATCHDOG_TIMEOUT_S, |
532 | | - int(_HEARTBEAT_INTERVAL_S), |
533 | | - ) |
534 | | - |
535 | | - try: |
536 | | - last_progress = time.monotonic() |
537 | | - for message in messages: |
538 | | - if writer_error: |
539 | | - raise writer_error[0] |
540 | | - data = f"{message}\n".encode() |
541 | | - while True: |
542 | | - try: |
543 | | - write_queue.put(data, timeout=1.0) |
544 | | - last_progress = time.monotonic() |
545 | | - break |
546 | | - except queue.Full: |
547 | | - if writer_error: |
548 | | - raise writer_error[0] |
549 | | - elapsed = time.monotonic() - last_progress |
550 | | - if int(elapsed) % 30 == 0 and int(elapsed) > 0: |
551 | | - logger.warning( |
552 | | - "STDOUT_WRITER: write_queue full for %.0fs. " |
553 | | - "writer_messages=%d writer_bytes=%d " |
554 | | - "writer_blocks=%d writer_total_blocked=%.1fs " |
555 | | - "writer_last_write=%.1fs_ago queue_size=%d", |
556 | | - elapsed, |
557 | | - messages_written, |
558 | | - bytes_written, |
559 | | - block_count, |
560 | | - total_blocked_s, |
561 | | - time.monotonic() - last_write_ts, |
562 | | - write_queue.qsize(), |
563 | | - ) |
564 | | - if elapsed > _WATCHDOG_TIMEOUT_S: |
565 | | - raise RuntimeError( |
566 | | - f"stdout pipe blocked for {elapsed:.0f}s with no progress " |
567 | | - f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " |
568 | | - f"Writer stats: messages={messages_written} bytes={bytes_written} " |
569 | | - f"blocks={block_count} total_blocked={total_blocked_s:.1f}s " |
570 | | - f"last_write={time.monotonic() - last_write_ts:.1f}s ago. " |
571 | | - "Terminating process to prevent indefinite hang." |
572 | | - ) |
573 | 435 | finally: |
574 | 436 | heartbeat_stop.set() |
575 | | - # Signal writer to drain remaining messages and exit. |
576 | | - write_queue.put(None) |
577 | | - writer.join(timeout=30) |
578 | 437 |
|
579 | 438 |
|
580 | 439 | def _init_internal_request_filter() -> None: |
|
0 commit comments