File tree Expand file tree Collapse file tree 1 file changed +6
-0
lines changed
airbyte_cdk/sources/concurrent_source Expand file tree Collapse file tree 1 file changed +6
-0
lines changed Original file line number Diff line number Diff line change @@ -370,6 +370,12 @@ def is_done(self) -> bool:
370370 internal_message = f"Streams { stuck_stream_names } remained in the partition generation queue after all streams were marked done." ,
371371 failure_type = FailureType .system_error ,
372372 )
373+ if is_done and self ._active_groups :
374+ raise AirbyteTracedException (
375+ message = "Active stream groups are not empty after all streams completed." ,
376+ internal_message = f"Groups { dict (self ._active_groups )} still active after all streams were marked done." ,
377+ failure_type = FailureType .system_error ,
378+ )
373379 if is_done and self ._exceptions_per_stream_name :
374380 error_message = generate_failed_streams_error_message (self ._exceptions_per_stream_name )
375381 self ._logger .info (error_message )
You can’t perform that action at this time.
0 commit comments