-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathpartition_reader.py
More file actions
92 lines (76 loc) · 3.76 KB
/
partition_reader.py
File metadata and controls
92 lines (76 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import logging
from queue import Queue
from typing import Optional
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.message.repository import MessageRepository
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.types import (
PartitionCompleteSentinel,
QueueItem,
)
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
LOGGER = logging.getLogger(f"airbyte.PartitionReader")
# Since moving all the connector builder workflow to the concurrent CDK which required correct ordering
# of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this
# separate flow and class that was used to log slices onto this partition's message_repository
# should just be replaced by emitting messages directly onto the repository instead of an intermediary.
class PartitionLogger:
"""
Helper class that provides a mechanism for passing a log message onto the current
partitions message repository
"""
def __init__(
self,
slice_logger: SliceLogger,
logger: logging.Logger,
message_repository: MessageRepository,
):
self._slice_logger = slice_logger
self._logger = logger
self._message_repository = message_repository
def log(self, partition: Partition) -> None:
if self._slice_logger.should_log_slice_message(self._logger):
self._message_repository.emit_message(
self._slice_logger.create_slice_log_message(partition.to_slice())
)
class PartitionReader:
"""
Generates records from a partition and puts them in a queue.
"""
_IS_SUCCESSFUL = True
def __init__(
self,
queue: Queue[QueueItem],
partition_logger: Optional[PartitionLogger] = None,
) -> None:
"""
:param queue: The queue to put the records in.
"""
self._queue = queue
self._partition_logger = partition_logger
def process_partition(self, partition: Partition, cursor: Cursor) -> None:
"""
Process a partition and put the records in the output queue.
When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
main thread will have no way to know that something when wrong and will wait until the timeout is reached
This method is meant to be called from a thread.
:param partition: The partition to read data from
:return: None
"""
try:
LOGGER.info(f"Starting to read from stream {partition.stream_name()} and partition {partition.to_slice()}")
if self._partition_logger:
self._partition_logger.log(partition)
for record in partition.read():
self._queue.put(record)
cursor.observe(record)
cursor.close_partition(partition)
LOGGER.info(f"Reading complete for stream {partition.stream_name()} and partition {partition.to_slice()}")
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
except Exception as e:
LOGGER.info(f"Error while reading from {partition.stream_name()} and partition {partition.to_slice()}")
self._queue.put(StreamThreadException(e, partition.stream_name()))
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))