@@ -377,8 +377,7 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
377377def launch (source : Source , args : List [str ]) -> None :
378378 source_entrypoint = AirbyteEntrypoint (source )
379379 parsed_args = source_entrypoint .parse_args (args )
380- with PRINT_BUFFER :
381- _nonblocking_write_to_stdout (source_entrypoint .run (parsed_args ))
380+ _nonblocking_write_to_stdout (source_entrypoint .run (parsed_args ))
382381
383382
384383def _nonblocking_write_to_stdout (messages : Iterable [str ]) -> None :
@@ -403,20 +402,42 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None:
403402 resumes reading, ``select()`` returns, the write completes, the main
404403 thread resumes draining the queue, and workers unblock automatically.
405404 """
406- # Only use non-blocking I/O when stdout is the real file descriptor.
407- # In test environments (pytest capsys) or when PRINT_BUFFER is active,
408- # sys.stdout is replaced with a wrapper. Writing to sys.__stdout__
409- # via os.write() would bypass the capture, so fall back to print().
405+ # We need to write to the *real* stdout fd for non-blocking I/O.
406+ # However, in test environments (pytest capsys) or other wrappers,
407+ # sys.stdout may have been replaced. If sys.stdout.fileno() fails
408+ # or doesn't match sys.__stdout__.fileno(), something is capturing
409+ # output and we must fall back to print() so it goes through the
410+ # capture layer.
411+ try :
412+ current_fd = sys .stdout .fileno ()
413+ except (OSError , AttributeError , ValueError ):
414+ # capsys, PRINT_BUFFER, or other wrapper — no real fd available.
415+ for message in messages :
416+ print (f"{ message } \n " , end = "" )
417+ return
418+
410419 real_stdout = sys .__stdout__
411- if real_stdout is None or not hasattr (real_stdout , "fileno" ) or sys .stdout is not real_stdout :
420+ if real_stdout is None or not hasattr (real_stdout , "fileno" ):
421+ for message in messages :
422+ print (f"{ message } \n " , end = "" )
423+ return
424+
425+ try :
426+ real_fd = real_stdout .fileno ()
427+ except (OSError , AttributeError , ValueError ):
428+ for message in messages :
429+ print (f"{ message } \n " , end = "" )
430+ return
431+
432+ if current_fd != real_fd :
433+ # stdout has been redirected; fall back to print().
412434 for message in messages :
413435 print (f"{ message } \n " , end = "" )
414436 return
415437
416438 try :
417- stdout_fd = real_stdout .fileno ()
418- original_blocking = os .get_blocking (stdout_fd )
419- os .set_blocking (stdout_fd , False )
439+ original_blocking = os .get_blocking (real_fd )
440+ os .set_blocking (real_fd , False )
420441 except OSError :
421442 # Fallback: if we cannot set non-blocking (e.g. the fd does not
422443 # support non-blocking mode), just write normally.
@@ -427,10 +448,10 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None:
427448 try :
428449 for message in messages :
429450 data = f"{ message } \n " .encode ()
430- _write_all_nonblocking (stdout_fd , data )
451+ _write_all_nonblocking (real_fd , data )
431452 finally :
432453 try :
433- os .set_blocking (stdout_fd , original_blocking )
454+ os .set_blocking (real_fd , original_blocking )
434455 except OSError :
435456 logger .debug ("Failed to restore stdout blocking mode" , exc_info = True )
436457
0 commit comments