Skip to content

Commit 866ad48

Browse files
fix: prevent deadlock by returning messages directly instead of queueing
Changes the Cursor ABC and all implementations so that ensure_at_least_one_state_emitted() returns Iterable[AirbyteMessage] instead of putting messages on the shared queue. The main thread (consumer) yields these messages directly, eliminating the deadlock where it would block on queue.put() into its own full queue. Also changes on_partition() to yield slice log messages directly instead of emitting through the message repository. Modified files: - cursor.py: ABC + FinalStateCursor + ConcurrentCursor - concurrent_partition_cursor.py: Added _create_state_message() - file_based_concurrent_cursor.py - file_based_final_state_cursor.py - abstract_concurrent_file_based_cursor.py - concurrent_read_processor.py: on_partition() and _on_stream_is_done() - concurrent_source.py: yield from on_partition() - substream_partition_router.py: consume returned iterator Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent 0b94cbe commit 866ad48

10 files changed

Lines changed: 101 additions & 36 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,19 @@ def on_partition_generation_completed(
140140
if status_message:
141141
yield status_message
142142

143-
def on_partition(self, partition: Partition) -> None:
143+
def on_partition(self, partition: Partition) -> Iterable[AirbyteMessage]:
144144
"""
145145
This method is called when a partition is generated.
146146
1. Add the partition to the set of partitions for the stream
147-
2. Log the slice if necessary
147+
2. Log the slice if necessary — yield the log message directly instead of
148+
putting it on the shared queue (prevents deadlock when queue is full)
148149
3. Submit the partition to the thread pool manager
149150
"""
150151
stream_name = partition.stream_name()
151152
self._streams_to_running_partitions[stream_name].add(partition)
152153
cursor = self._stream_name_to_instance[stream_name].cursor
153154
if self._slice_logger.should_log_slice_message(self._logger):
154-
self._message_repository.emit_message(
155-
self._slice_logger.create_slice_log_message(partition.to_slice())
156-
)
155+
yield self._slice_logger.create_slice_log_message(partition.to_slice())
157156
self._thread_pool_manager.submit(
158157
self._partition_reader.process_partition, partition, cursor
159158
)
@@ -426,7 +425,7 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
426425
)
427426
self._logger.info(f"Marking stream {stream_name} as STOPPED")
428427
stream = self._stream_name_to_instance[stream_name]
429-
stream.cursor.ensure_at_least_one_state_emitted()
428+
yield from stream.cursor.ensure_at_least_one_state_emitted()
430429
yield from self._message_repository.consume_queue()
431430
self._logger.info(f"Finished syncing {stream.name}")
432431
self._streams_done.add(stream_name)

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def _handle_item(
166166
elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
167167
yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
168168
elif isinstance(queue_item, Partition):
169-
concurrent_stream_processor.on_partition(queue_item)
169+
yield from concurrent_stream_processor.on_partition(queue_item)
170170
elif isinstance(queue_item, PartitionCompleteSentinel):
171171
yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
172172
elif isinstance(queue_item, Record):

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, TypeVar
1414

1515
from airbyte_cdk.models import (
16+
AirbyteMessage,
1617
AirbyteStateBlob,
1718
AirbyteStateMessage,
1819
AirbyteStateType,
@@ -268,10 +269,11 @@ def _check_and_update_parent_state(self) -> None:
268269
if last_closed_state is not None:
269270
self._parent_state = last_closed_state
270271

271-
def ensure_at_least_one_state_emitted(self) -> None:
272+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]:
272273
"""
273274
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
274275
called.
276+
Returns the state message directly instead of putting it on the shared queue.
275277
"""
276278
if not any(
277279
semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
@@ -281,7 +283,7 @@ def ensure_at_least_one_state_emitted(self) -> None:
281283
self._global_cursor = self._new_global_cursor
282284
self._lookback_window = self._timer.finish()
283285
self._parent_state = self._partition_router.get_stream_state()
284-
self._emit_state_message(throttle=False)
286+
yield from self._create_state_message(throttle=False)
285287

286288
def _throttle_state_message(self) -> Optional[float]:
287289
"""
@@ -292,7 +294,33 @@ def _throttle_state_message(self) -> Optional[float]:
292294
return None
293295
return current_time
294296

297+
def _create_state_message(self, throttle: bool = True) -> Iterable[AirbyteMessage]:
298+
"""
299+
Build and return the state message directly instead of emitting through the message repository.
300+
Used by ensure_at_least_one_state_emitted() to avoid deadlock when the main thread
301+
would otherwise call queue.put() on a full queue.
302+
"""
303+
if throttle:
304+
current_time = self._throttle_state_message()
305+
if current_time is None:
306+
return
307+
self._last_emission_time = current_time
308+
# Skip state emit for global cursor if parent state is empty
309+
if self._use_global_cursor and not self._parent_state:
310+
return
311+
312+
self._connector_state_manager.update_state_for_stream(
313+
self._stream_name,
314+
self._stream_namespace,
315+
self.state,
316+
)
317+
state_message = self._connector_state_manager.create_state_message(
318+
self._stream_name, self._stream_namespace
319+
)
320+
yield state_message
321+
295322
def _emit_state_message(self, throttle: bool = True) -> None:
323+
"""Emit state message via message repository. Used by close_partition() on worker threads."""
296324
if throttle:
297325
current_time = self._throttle_state_message()
298326
if current_time is None:

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,14 @@ def stream_slices(self) -> Iterable[StreamSlice]:
256256
if is_last_record_in_slice:
257257
parent_stream.cursor.close_partition(partition)
258258
if is_last_slice:
259-
parent_stream.cursor.ensure_at_least_one_state_emitted()
259+
# ensure_at_least_one_state_emitted now returns messages directly.
260+
# On this worker thread we need to consume the returned iterator
261+
# so the cursor's internal state updates happen, but the messages
262+
# themselves are discarded — the parent cursor's close_partition()
263+
# above already emitted state through the queue. This call just
264+
# ensures internal bookkeeping is finalized.
265+
for _msg in parent_stream.cursor.ensure_at_least_one_state_emitted():
266+
pass
260267

261268
if emit_slice:
262269
yield StreamSlice(

airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from datetime import datetime
88
from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping
99

10+
from airbyte_cdk.models import AirbyteMessage
1011
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
1112
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
1213
from airbyte_cdk.sources.file_based.types import StreamState
@@ -56,4 +57,4 @@ def get_start_time(self) -> datetime: ...
5657
def emit_state_message(self) -> None: ...
5758

5859
@abstractmethod
59-
def ensure_at_least_one_state_emitted(self) -> None: ...
60+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: ...

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,19 @@ def get_state(self) -> MutableMapping[str, Any]:
309309
def set_initial_state(self, value: StreamState) -> None:
310310
pass
311311

312-
def ensure_at_least_one_state_emitted(self) -> None:
313-
self.emit_state_message()
312+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]:
313+
"""Return the state message directly instead of putting it on the shared queue."""
314+
with self._state_lock:
315+
new_state = self.get_state()
316+
self._connector_state_manager.update_state_for_stream(
317+
self._stream_name,
318+
self._stream_namespace,
319+
new_state,
320+
)
321+
state_message = self._connector_state_manager.create_state_message(
322+
self._stream_name, self._stream_namespace
323+
)
324+
yield state_message
314325

315326
def should_be_synced(self, record: Record) -> bool:
316327
return True

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from datetime import datetime
77
from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping, Optional
88

9+
from airbyte_cdk.models import AirbyteMessage
910
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1011
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
1112
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -73,14 +74,15 @@ def get_start_time(self) -> datetime:
7374
def emit_state_message(self) -> None:
7475
pass
7576

76-
def ensure_at_least_one_state_emitted(self) -> None:
77+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]:
78+
"""Return the state message directly instead of putting it on the shared queue."""
7779
self._connector_state_manager.update_state_for_stream(
7880
self._stream_name, self._stream_namespace, self.state
7981
)
8082
state_message = self._connector_state_manager.create_state_message(
8183
self._stream_name, self._stream_namespace
8284
)
83-
self._message_repository.emit_message(state_message)
85+
yield state_message
8486

8587
def should_be_synced(self, record: Record) -> bool:
8688
return True

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
Union,
2020
)
2121

22+
from airbyte_cdk.models import AirbyteMessage
2223
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
2324
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
2425
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
@@ -72,10 +73,13 @@ def close_partition(self, partition: Partition) -> None:
7273
raise NotImplementedError()
7374

7475
@abstractmethod
75-
def ensure_at_least_one_state_emitted(self) -> None:
76+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]:
7677
"""
7778
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
7879
stream. Hence, if no partitions are generated, this method needs to be called.
80+
81+
Returns the state messages directly instead of putting them on the shared queue,
82+
so the caller (main thread) can yield them without risk of deadlock.
7983
"""
8084
raise NotImplementedError()
8185

@@ -140,9 +144,10 @@ def observe(self, record: Record) -> None:
140144
def close_partition(self, partition: Partition) -> None:
141145
pass
142146

143-
def ensure_at_least_one_state_emitted(self) -> None:
147+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]:
144148
"""
145-
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
149+
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync.
150+
Returns the state message directly instead of putting it on the shared queue.
146151
"""
147152

148153
self._connector_state_manager.update_state_for_stream(
@@ -151,7 +156,7 @@ def ensure_at_least_one_state_emitted(self) -> None:
151156
state_message = self._connector_state_manager.create_state_message(
152157
self._stream_name, self._stream_namespace
153158
)
154-
self._message_repository.emit_message(state_message)
159+
yield state_message
155160

156161
def should_be_synced(self, record: Record) -> bool:
157162
return True
@@ -397,12 +402,21 @@ def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType
397402
f"Partition is expected to have key `{key}` but could not be found"
398403
) from exception
399404

400-
def ensure_at_least_one_state_emitted(self) -> None:
405+
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]:
401406
"""
402407
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
403408
called.
409+
Returns the state message directly instead of putting it on the shared queue.
404410
"""
405-
self._emit_state_message()
411+
self._connector_state_manager.update_state_for_stream(
412+
self._stream_name,
413+
self._stream_namespace,
414+
self.state,
415+
)
416+
state_message = self._connector_state_manager.create_state_message(
417+
self._stream_name, self._stream_namespace
418+
)
419+
yield state_message
406420

407421
def stream_slices(self) -> Iterable[StreamSlice]:
408422
"""

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,15 @@ def setUp(self):
7979
json_schema={},
8080
supported_sync_modes=[SyncMode.full_refresh],
8181
)
82+
self._stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([])
8283
self._another_stream = Mock(spec=AbstractStream)
8384
self._another_stream.name = _ANOTHER_STREAM_NAME
8485
self._another_stream.as_airbyte_stream.return_value = AirbyteStream(
8586
name=_ANOTHER_STREAM_NAME,
8687
json_schema={},
8788
supported_sync_modes=[SyncMode.full_refresh],
8889
)
90+
self._another_stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([])
8991

9092
self._record_data = {"id": 1, "value": "A"}
9193
self._partition = Mock(spec=Partition)
@@ -122,7 +124,7 @@ def test_handle_partition_done_no_other_streams_to_generate_partitions_for(self)
122124
self._partition_reader,
123125
)
124126
handler.start_next_partition_generator()
125-
handler.on_partition(self._an_open_partition)
127+
list(handler.on_partition(self._an_open_partition))
126128

127129
sentinel = PartitionGenerationCompletedSentinel(self._stream)
128130
messages = list(handler.on_partition_generation_completed(sentinel))
@@ -186,7 +188,7 @@ def test_handle_partition(self):
186188

187189
expected_cursor = handler._stream_name_to_instance[_ANOTHER_STREAM_NAME].cursor
188190

189-
handler.on_partition(self._a_closed_partition)
191+
list(handler.on_partition(self._a_closed_partition))
190192

191193
self._thread_pool_manager.submit.assert_called_with(
192194
self._partition_reader.process_partition, self._a_closed_partition, expected_cursor
@@ -213,12 +215,13 @@ def test_handle_partition_emits_log_message_if_it_should_be_logged(self):
213215

214216
expected_cursor = handler._stream_name_to_instance[_STREAM_NAME].cursor
215217

216-
handler.on_partition(self._an_open_partition)
218+
messages = list(handler.on_partition(self._an_open_partition))
217219

218220
self._thread_pool_manager.submit.assert_called_with(
219221
self._partition_reader.process_partition, self._an_open_partition, expected_cursor
220222
)
221-
self._message_repository.emit_message.assert_called_with(self._log_message)
223+
# Log message is now yielded directly instead of emitted through the repository
224+
assert self._log_message in messages
222225

223226
assert self._an_open_partition in handler._streams_to_running_partitions[_STREAM_NAME]
224227

@@ -240,7 +243,7 @@ def test_handle_on_partition_complete_sentinel_with_messages_from_repository(sel
240243
self._partition_reader,
241244
)
242245
handler.start_next_partition_generator()
243-
handler.on_partition(partition)
246+
list(handler.on_partition(partition))
244247

245248
sentinel = PartitionCompleteSentinel(partition)
246249

@@ -285,7 +288,7 @@ def test_handle_on_partition_complete_sentinel_yields_status_message_if_the_stre
285288
self._partition_reader,
286289
)
287290
handler.start_next_partition_generator()
288-
handler.on_partition(self._a_closed_partition)
291+
list(handler.on_partition(self._a_closed_partition))
289292
list(
290293
handler.on_partition_generation_completed(
291294
PartitionGenerationCompletedSentinel(self._another_stream)
@@ -560,7 +563,7 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
560563
)
561564

562565
handler.start_next_partition_generator()
563-
handler.on_partition(self._an_open_partition)
566+
list(handler.on_partition(self._an_open_partition))
564567
list(
565568
handler.on_partition_generation_completed(
566569
PartitionGenerationCompletedSentinel(self._stream)
@@ -627,7 +630,7 @@ def test_given_underlying_exception_is_traced_exception_on_exception_return_trac
627630
)
628631

629632
handler.start_next_partition_generator()
630-
handler.on_partition(self._an_open_partition)
633+
list(handler.on_partition(self._an_open_partition))
631634
list(
632635
handler.on_partition_generation_completed(
633636
PartitionGenerationCompletedSentinel(self._stream)
@@ -688,7 +691,7 @@ def test_given_partition_completion_is_not_success_then_do_not_close_partition(s
688691
)
689692

690693
handler.start_next_partition_generator()
691-
handler.on_partition(self._an_open_partition)
694+
list(handler.on_partition(self._an_open_partition))
692695
list(
693696
handler.on_partition_generation_completed(
694697
PartitionGenerationCompletedSentinel(self._stream)
@@ -749,7 +752,7 @@ def test_is_done_is_false_if_all_partitions_are_not_closed(self):
749752
)
750753

751754
handler.start_next_partition_generator()
752-
handler.on_partition(self._an_open_partition)
755+
list(handler.on_partition(self._an_open_partition))
753756
handler.on_partition_generation_completed(
754757
PartitionGenerationCompletedSentinel(self._stream)
755758
)
@@ -789,7 +792,7 @@ def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(s
789792
)
790793

791794
handler.start_next_partition_generator()
792-
handler.on_partition(self._an_open_partition)
795+
list(handler.on_partition(self._an_open_partition))
793796
list(
794797
handler.on_partition_generation_completed(
795798
PartitionGenerationCompletedSentinel(self._stream)
@@ -874,7 +877,7 @@ def _create_mock_stream(self, name: str, block_simultaneous_read: str = ""):
874877
json_schema={},
875878
supported_sync_modes=[SyncMode.full_refresh],
876879
)
877-
stream.cursor.ensure_at_least_one_state_emitted = Mock()
880+
stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([])
878881
return stream
879882

880883
def _create_mock_stream_with_parent(
@@ -889,7 +892,7 @@ def _create_mock_stream_with_parent(
889892
json_schema={},
890893
supported_sync_modes=[SyncMode.full_refresh],
891894
)
892-
stream.cursor.ensure_at_least_one_state_emitted = Mock()
895+
stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([])
893896

894897
mock_partition_router = Mock(spec=SubstreamPartitionRouter)
895898
mock_parent_config = Mock()

unit_tests/sources/streams/test_stream_read.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ def close_partition(self, partition: Partition) -> None:
145145
)
146146
)
147147

148-
def ensure_at_least_one_state_emitted(self) -> None:
149-
pass
148+
def ensure_at_least_one_state_emitted(self):
149+
yield from []
150150

151151
def should_be_synced(self, record: Record) -> bool:
152152
return True

0 commit comments

Comments
 (0)