44import concurrent
55import logging
66from queue import Queue
7- from typing import Iterable , Iterator , List , Optional
7+ from typing import Iterable , Iterator , List
88
99from airbyte_cdk .models import AirbyteMessage
1010from airbyte_cdk .sources .concurrent_source .concurrent_read_processor import ConcurrentReadProcessor
1616from airbyte_cdk .sources .message import InMemoryMessageRepository , MessageRepository
1717from airbyte_cdk .sources .streams .concurrent .abstract_stream import AbstractStream
1818from airbyte_cdk .sources .streams .concurrent .partition_enqueuer import PartitionEnqueuer
19- from airbyte_cdk .sources .streams .concurrent .partition_reader import PartitionLogger , PartitionReader
19+ from airbyte_cdk .sources .streams .concurrent .partition_reader import PartitionReader
2020from airbyte_cdk .sources .streams .concurrent .partitions .partition import Partition
2121from airbyte_cdk .sources .streams .concurrent .partitions .types import (
2222 PartitionCompleteSentinel ,
@@ -43,7 +43,6 @@ def create(
4343 logger : logging .Logger ,
4444 slice_logger : SliceLogger ,
4545 message_repository : MessageRepository ,
46- queue : Optional [Queue [QueueItem ]] = None ,
4746 timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
4847 ) -> "ConcurrentSource" :
4948 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
@@ -60,21 +59,19 @@ def create(
6059 logger ,
6160 )
6261 return ConcurrentSource (
63- threadpool = threadpool ,
64- logger = logger ,
65- slice_logger = slice_logger ,
66- queue = queue ,
67- message_repository = message_repository ,
68- initial_number_partitions_to_generate = initial_number_of_partitions_to_generate ,
69- timeout_seconds = timeout_seconds ,
62+ threadpool ,
63+ logger ,
64+ slice_logger ,
65+ message_repository ,
66+ initial_number_of_partitions_to_generate ,
67+ timeout_seconds ,
7068 )
7169
7270 def __init__ (
7371 self ,
7472 threadpool : ThreadPoolManager ,
7573 logger : logging .Logger ,
7674 slice_logger : SliceLogger = DebugSliceLogger (),
77- queue : Optional [Queue [QueueItem ]] = None ,
7875 message_repository : MessageRepository = InMemoryMessageRepository (),
7976 initial_number_partitions_to_generate : int = 1 ,
8077 timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
@@ -94,36 +91,33 @@ def __init__(
9491 self ._initial_number_partitions_to_generate = initial_number_partitions_to_generate
9592 self ._timeout_seconds = timeout_seconds
9693
97- # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
98- # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
99- # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
100- # information and might even need to be configurable depending on the source
101- self ._queue = queue or Queue (maxsize = 10_000 )
102-
10394 def read (
10495 self ,
10596 streams : List [AbstractStream ],
10697 ) -> Iterator [AirbyteMessage ]:
10798 self ._logger .info ("Starting syncing" )
99+
100+ # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
101+ # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
102+ # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
103+ # information and might even need to be configurable depending on the source
104+ queue : Queue [QueueItem ] = Queue (maxsize = 10_000 )
108105 concurrent_stream_processor = ConcurrentReadProcessor (
109106 streams ,
110- PartitionEnqueuer (self . _queue , self ._threadpool ),
107+ PartitionEnqueuer (queue , self ._threadpool ),
111108 self ._threadpool ,
112109 self ._logger ,
113110 self ._slice_logger ,
114111 self ._message_repository ,
115- PartitionReader (
116- self ._queue ,
117- PartitionLogger (self ._slice_logger , self ._logger , self ._message_repository ),
118- ),
112+ PartitionReader (queue ),
119113 )
120114
121115 # Enqueue initial partition generation tasks
122116 yield from self ._submit_initial_partition_generators (concurrent_stream_processor )
123117
124118 # Read from the queue until all partitions were generated and read
125119 yield from self ._consume_from_queue (
126- self . _queue ,
120+ queue ,
127121 concurrent_stream_processor ,
128122 )
129123 self ._threadpool .check_for_errors_and_shutdown ()
@@ -147,10 +141,7 @@ def _consume_from_queue(
147141 airbyte_message_or_record_or_exception ,
148142 concurrent_stream_processor ,
149143 )
150- # In the event that a partition raises an exception, anything remaining in
151- # the queue will be missed because is_done() can raise an exception and exit
152- # out of this loop before remaining items are consumed
153- if queue .empty () and concurrent_stream_processor .is_done ():
144+ if concurrent_stream_processor .is_done () and queue .empty ():
154145 # all partitions were generated and processed. we're done here
155146 break
156147
@@ -170,7 +161,5 @@ def _handle_item(
170161 yield from concurrent_stream_processor .on_partition_complete_sentinel (queue_item )
171162 elif isinstance (queue_item , Record ):
172163 yield from concurrent_stream_processor .on_record (queue_item )
173- elif isinstance (queue_item , AirbyteMessage ):
174- yield queue_item
175164 else :
176165 raise ValueError (f"Unknown queue item type: { type (queue_item )} " )
0 commit comments