44import concurrent
55import logging
66from queue import Queue
7- from typing import Iterable , Iterator , List
7+ from typing import Iterable , Iterator , List , Optional
88
99from airbyte_cdk .models import AirbyteMessage
1010from airbyte_cdk .sources .concurrent_source .concurrent_read_processor import ConcurrentReadProcessor
1616from airbyte_cdk .sources .message import InMemoryMessageRepository , MessageRepository
1717from airbyte_cdk .sources .streams .concurrent .abstract_stream import AbstractStream
1818from airbyte_cdk .sources .streams .concurrent .partition_enqueuer import PartitionEnqueuer
19- from airbyte_cdk .sources .streams .concurrent .partition_reader import PartitionReader
19+ from airbyte_cdk .sources .streams .concurrent .partition_reader import PartitionLogger , PartitionReader
2020from airbyte_cdk .sources .streams .concurrent .partitions .partition import Partition
2121from airbyte_cdk .sources .streams .concurrent .partitions .types import (
2222 PartitionCompleteSentinel ,
@@ -43,6 +43,7 @@ def create(
4343 logger : logging .Logger ,
4444 slice_logger : SliceLogger ,
4545 message_repository : MessageRepository ,
46+ queue : Optional [Queue [QueueItem ]] = None ,
4647 timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
4748 ) -> "ConcurrentSource" :
4849 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
@@ -59,19 +60,21 @@ def create(
5960 logger ,
6061 )
6162 return ConcurrentSource (
62- threadpool ,
63- logger ,
64- slice_logger ,
65- message_repository ,
66- initial_number_of_partitions_to_generate ,
67- timeout_seconds ,
63+ threadpool = threadpool ,
64+ logger = logger ,
65+ slice_logger = slice_logger ,
66+ queue = queue ,
67+ message_repository = message_repository ,
68+ initial_number_partitions_to_generate = initial_number_of_partitions_to_generate ,
69+ timeout_seconds = timeout_seconds ,
6870 )
6971
7072 def __init__ (
7173 self ,
7274 threadpool : ThreadPoolManager ,
7375 logger : logging .Logger ,
7476 slice_logger : SliceLogger = DebugSliceLogger (),
77+ queue : Optional [Queue [QueueItem ]] = None ,
7578 message_repository : MessageRepository = InMemoryMessageRepository (),
7679 initial_number_partitions_to_generate : int = 1 ,
7780 timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
@@ -91,33 +94,36 @@ def __init__(
9194 self ._initial_number_partitions_to_generate = initial_number_partitions_to_generate
9295 self ._timeout_seconds = timeout_seconds
9396
97+ # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
98+ # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
99+ # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
100+ # information and might even need to be configurable depending on the source
101+ self ._queue = queue or Queue (maxsize = 10_000 )
102+
94103 def read (
95104 self ,
96105 streams : List [AbstractStream ],
97106 ) -> Iterator [AirbyteMessage ]:
98107 self ._logger .info ("Starting syncing" )
99-
100- # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
101- # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
102- # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
103- # information and might even need to be configurable depending on the source
104- queue : Queue [QueueItem ] = Queue (maxsize = 10_000 )
105108 concurrent_stream_processor = ConcurrentReadProcessor (
106109 streams ,
107- PartitionEnqueuer (queue , self ._threadpool ),
110+ PartitionEnqueuer (self . _queue , self ._threadpool ),
108111 self ._threadpool ,
109112 self ._logger ,
110113 self ._slice_logger ,
111114 self ._message_repository ,
112- PartitionReader (queue ),
115+ PartitionReader (
116+ self ._queue ,
117+ PartitionLogger (self ._slice_logger , self ._logger , self ._message_repository ),
118+ ),
113119 )
114120
115121 # Enqueue initial partition generation tasks
116122 yield from self ._submit_initial_partition_generators (concurrent_stream_processor )
117123
118124 # Read from the queue until all partitions were generated and read
119125 yield from self ._consume_from_queue (
120- queue ,
126+ self . _queue ,
121127 concurrent_stream_processor ,
122128 )
123129 self ._threadpool .check_for_errors_and_shutdown ()
@@ -141,7 +147,10 @@ def _consume_from_queue(
141147 airbyte_message_or_record_or_exception ,
142148 concurrent_stream_processor ,
143149 )
144- if concurrent_stream_processor .is_done () and queue .empty ():
150+ # In the event that a partition raises an exception, anything remaining in
151+ # the queue will be missed because is_done() can raise an exception and exit
152+ # out of this loop before remaining items are consumed
153+ if queue .empty () and concurrent_stream_processor .is_done ():
145154 # all partitions were generated and processed. we're done here
146155 break
147156
@@ -161,5 +170,7 @@ def _handle_item(
161170 yield from concurrent_stream_processor .on_partition_complete_sentinel (queue_item )
162171 elif isinstance (queue_item , Record ):
163172 yield from concurrent_stream_processor .on_record (queue_item )
173+ elif isinstance (queue_item , AirbyteMessage ):
174+ yield queue_item
164175 else :
165176 raise ValueError (f"Unknown queue item type: { type (queue_item )} " )
0 commit comments