Skip to content

Commit 7099a0a

Browse files
fix: Add thread safety to exclusive partition processing
- Add threading.Lock to protect shared state for exclusive streams - Wrap access to _pending_partitions_per_exclusive_stream and _exclusive_stream_partition_in_progress with lock - Set in_progress flag BEFORE popping from queue to prevent race condition - Add docstring note about lock requirement for _submit_next_exclusive_partition Addresses Copilot review comments about thread safety concerns. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
1 parent 38e1579 commit 7099a0a

1 file changed

Lines changed: 15 additions & 6 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
import logging
55
import os
6+
import threading
67
from collections import deque
78
from typing import Deque, Dict, Iterable, List, Optional, Set
89

@@ -75,6 +76,7 @@ def __init__(
7576
self._exclusive_stream_partition_in_progress: Dict[str, bool] = {
7677
stream_name: False for stream_name in self._exclusive_streams
7778
}
79+
self._exclusive_partition_lock = threading.Lock()
7880

7981
def on_partition_generation_completed(
8082
self, sentinel: PartitionGenerationCompletedSentinel
@@ -114,9 +116,10 @@ def on_partition(self, partition: Partition) -> None:
114116
)
115117

116118
if stream_name in self._exclusive_streams:
117-
self._pending_partitions_per_exclusive_stream[stream_name].append(partition)
118-
if not self._exclusive_stream_partition_in_progress[stream_name]:
119-
self._submit_next_exclusive_partition(stream_name)
119+
with self._exclusive_partition_lock:
120+
self._pending_partitions_per_exclusive_stream[stream_name].append(partition)
121+
if not self._exclusive_stream_partition_in_progress[stream_name]:
122+
self._submit_next_exclusive_partition(stream_name)
120123
else:
121124
self._thread_pool_manager.submit(
122125
self._partition_reader.process_partition, partition, cursor
@@ -140,8 +143,9 @@ def on_partition_complete_sentinel(
140143
partitions_running.remove(partition)
141144

142145
if stream_name in self._exclusive_streams:
143-
self._exclusive_stream_partition_in_progress[stream_name] = False
144-
self._submit_next_exclusive_partition(stream_name)
146+
with self._exclusive_partition_lock:
147+
self._exclusive_stream_partition_in_progress[stream_name] = False
148+
self._submit_next_exclusive_partition(stream_name)
145149

146150
# If all partitions were generated and this was the last one, the stream is done
147151
if (
@@ -276,12 +280,17 @@ def _submit_next_exclusive_partition(self, stream_name: str) -> None:
276280
Submit the next pending partition for an exclusive stream.
277281
This ensures that only one partition is processed at a time for streams
278282
that have use_exclusive_concurrency=True.
283+
284+
Note: This method must be called while holding self._exclusive_partition_lock
285+
to ensure thread safety.
279286
"""
280287
pending_partitions = self._pending_partitions_per_exclusive_stream[stream_name]
281288
if pending_partitions:
289+
# Set the flag BEFORE popping to prevent race conditions where another thread
290+
# could see in_progress=False and submit another partition concurrently
291+
self._exclusive_stream_partition_in_progress[stream_name] = True
282292
partition = pending_partitions.popleft()
283293
cursor = self._stream_name_to_instance[stream_name].cursor
284-
self._exclusive_stream_partition_in_progress[stream_name] = True
285294
self._thread_pool_manager.submit(
286295
self._partition_reader.process_partition, partition, cursor
287296
)

0 commit comments

Comments
 (0)