-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathavailability_strategy.py
More file actions
85 lines (72 loc) · 3.18 KB
/
availability_strategy.py
File metadata and controls
85 lines (72 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
import typing
from abc import ABC, abstractmethod
from typing import Any, Mapping, Optional, Tuple
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream, StreamData
if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
# FIXME this
class AvailabilityStrategy(ABC):
"""
Abstract base class for checking stream availability.
"""
@abstractmethod
def check_availability(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
) -> Tuple[bool, Optional[str]]:
"""
Checks stream availability.
:param stream: stream
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
@staticmethod
def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
"""
Gets the first stream_slice from a given stream's stream_slices.
:param stream: stream
:raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
:return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
"""
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
slices = iter(
stream.stream_slices(
cursor_field=stream.cursor_field, # type: ignore[arg-type]
sync_mode=SyncMode.full_refresh,
)
)
return next(slices)
@staticmethod
def get_first_record_for_slice(
stream: Stream, stream_slice: Optional[Mapping[str, Any]]
) -> StreamData:
"""
Gets the first record for a stream_slice of a stream.
:param stream: stream instance from which to read records
:param stream_slice: stream_slice parameters for slicing the stream
:raises StopIteration: if there is no first record to return (the read_records generator is empty)
:return: StreamData containing the first record in the slice
"""
# Store the original value of exit_on_rate_limit
original_exit_on_rate_limit = stream.exit_on_rate_limit
try:
# Ensure exit_on_rate_limit is safely set to True if possible
stream.exit_on_rate_limit = True
# We wrap the return output of read_records() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
records_for_slice = iter(
stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
)
return next(records_for_slice)
finally:
# Restore the original exit_on_rate_limit value
stream.exit_on_rate_limit = original_exit_on_rate_limit