Skip to content

Commit 9d70362

Browse files
feat: make record queue unlimited (maxsize=0) to prevent deadlock
Co-Authored-By: unknown <>
1 parent a13be8a commit 9d70362

3 files changed

Lines changed: 3 additions & 3 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def __init__(
9999
# threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
100100
# partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
101101
# information and might even need to be configurable depending on the source
102-
self._queue = queue or Queue(maxsize=10_000)
102+
self._queue = queue or Queue(maxsize=0)
103103

104104
def read(
105105
self,

airbyte_cdk/sources/concurrent_source/thread_pool_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class ThreadPoolManager:
1212
Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed.
1313
"""
1414

15-
DEFAULT_MAX_QUEUE_SIZE = 50_000
15+
DEFAULT_MAX_QUEUE_SIZE = 0 # 0 means unlimited
1616

1717
def __init__(
1818
self,

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def __init__(
157157
# threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
158158
# partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
159159
# information and might even need to be configurable depending on the source
160-
queue: Queue[QueueItem] = Queue(maxsize=10_000)
160+
queue: Queue[QueueItem] = Queue(maxsize=0)
161161
message_repository = InMemoryMessageRepository(
162162
Level.DEBUG if emit_connector_builder_messages else Level.INFO
163163
)

0 commit comments

Comments
 (0)