Skip to content

Commit 8c4b8b7

Browse files
committed
fix typing issues
1 parent 71112d6 commit 8c4b8b7

13 files changed

Lines changed: 66 additions & 27 deletions

File tree

airbyte_cdk/sources/declarative/async_job/timer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from datetime import datetime, timedelta, timezone
33
from typing import Optional
44

5+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
6+
57

68
class Timer:
79
def __init__(self, timeout: timedelta) -> None:
@@ -36,4 +38,4 @@ def has_timed_out(self) -> bool:
3638

3739
@staticmethod
3840
def _now() -> datetime:
39-
return datetime.now(tz=timezone.utc)
41+
return ab_datetime_now()

airbyte_cdk/sources/declarative/auth/jwt.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1515
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
1616
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
17+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
1718

1819

1920
class JwtAlgorithm(str):
@@ -127,7 +128,7 @@ def _get_jwt_payload(self) -> dict[str, Any]:
127128
"""
128129
Builds and returns the payload used when signing the JWT.
129130
"""
130-
now = int(datetime.now().timestamp())
131+
now = ab_datetime_now().timestamp()
131132
exp = now + self._token_duration if isinstance(self._token_duration, int) else now
132133
nbf = now
133134

airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
)
2222
from airbyte_cdk.sources.message import MessageRepository
2323
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
24-
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
24+
from airbyte_cdk.utils.datetime_helpers import (
25+
ab_datetime_format,
26+
ab_datetime_now,
27+
ab_datetime_parse,
28+
)
2529
from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths
2630

2731

@@ -90,7 +94,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9094
None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters)
9195
)
9296

93-
self._timezone = datetime.timezone.utc
9497
self._interpolation = JinjaInterpolation()
9598

9699
self._step = (
@@ -240,10 +243,10 @@ def select_best_end_datetime(self) -> datetime.datetime:
240243
241244
:return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
242245
"""
243-
now = datetime.datetime.now(tz=self._timezone)
246+
now = ab_datetime_now()
244247
if not self._end_datetime:
245248
return now
246-
return min(self._end_datetime.get_datetime(self.config), now)
249+
return min(self._end_datetime.get_datetime(self.config), now.to_datetime())
247250

248251
def _calculate_cursor_datetime_from_state(
249252
self, stream_state: Mapping[str, Any]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@
546546
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
547547
from airbyte_cdk.sources.types import Config
548548
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
549+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
549550

550551
ComponentDefinition = Mapping[str, Any]
551552

@@ -3532,7 +3533,7 @@ def create_fixed_window_call_rate_policy(
35323533
# Set the initial reset timestamp to 10 days from now.
35333534
# This value will be updated by the first request.
35343535
return FixedWindowCallRatePolicy(
3535-
next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10),
3536+
next_reset_ts=ab_datetime_now() + datetime.timedelta(days=10),
35363537
period=parse_duration(model.period),
35373538
call_limit=model.call_limit,
35383539
matchers=matchers,

airbyte_cdk/sources/file_based/file_types/unstructured_parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
3838
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
3939
from airbyte_cdk.utils import is_cloud_environment
40+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
4041
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
4142

4243
unstructured_partition_pdf = None
@@ -300,7 +301,7 @@ def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[st
300301
format_config.processing,
301302
FileType.MD,
302303
"auto",
303-
RemoteFile(uri="test", last_modified=datetime.now()),
304+
RemoteFile(uri="test", last_modified=ab_datetime_now()),
304305
)
305306
except Exception:
306307
return False, "".join(traceback.format_exc())

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
2121
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
2222
from airbyte_cdk.sources.types import Record
23-
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
23+
from airbyte_cdk.utils.datetime_helpers import (
24+
ab_datetime_format,
25+
ab_datetime_now,
26+
ab_datetime_parse,
27+
)
2428

2529
if TYPE_CHECKING:
2630
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamPartition
@@ -310,7 +314,7 @@ def _compute_start_time(self) -> datetime:
310314
disallow_other_formats=False,
311315
)
312316
if self._is_history_full():
313-
time_window = datetime.now() - self._time_window_if_history_is_full
317+
time_window = ab_datetime_now() - self._time_window_if_history_is_full
314318
earliest_dt = min(earliest_dt, time_window)
315319
return earliest_dt
316320

airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
AbstractFileBasedCursor,
1313
)
1414
from airbyte_cdk.sources.file_based.types import StreamState
15-
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
15+
from airbyte_cdk.utils.datetime_helpers import (
16+
ab_datetime_format,
17+
ab_datetime_now,
18+
ab_datetime_parse,
19+
)
1620

1721

1822
class DefaultFileBasedCursor(AbstractFileBasedCursor):
@@ -156,6 +160,6 @@ def _compute_start_time(self) -> datetime:
156160
disallow_other_formats=False,
157161
)
158162
if self._is_history_full():
159-
time_window = datetime.now() - self._time_window_if_history_is_full
163+
time_window = ab_datetime_now() - self._time_window_if_history_is_full
160164
earliest_dt = min(earliest_dt, time_window)
161165
return earliest_dt

airbyte_cdk/sources/streams/call_rate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from pyrate_limiter import Rate as PyRateRate
2020
from pyrate_limiter.exceptions import BucketFullException
2121

22-
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime
22+
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now
2323

2424
# prevents mypy from complaining about missing session attributes in LimiterMixin
2525
if TYPE_CHECKING:
@@ -282,7 +282,7 @@ class UnlimitedCallRatePolicy(BaseCallRatePolicy):
282282
),
283283
FixedWindowCallRatePolicy(
284284
matchers=[HttpRequestMatcher(url="/some/method")],
285-
next_reset_ts=datetime.now(),
285+
next_reset_ts=ab_datetime_now(),
286286
period=timedelta(hours=1)
287287
call_limit=1000,
288288
),
@@ -387,7 +387,7 @@ def update(
387387
self._next_reset_ts = call_reset_ts
388388

389389
def _update_current_window(self) -> None:
390-
now = datetime.datetime.now()
390+
now = ab_datetime_now()
391391
if now > self._next_reset_ts:
392392
logger.debug("started new window, %s calls available now", self._call_limit)
393393
self._next_reset_ts = self._next_reset_ts + self._offset

airbyte_cdk/utils/datetime_helpers.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@
8181
```
8282
"""
8383

84+
import decimal
8485
from datetime import datetime, timedelta, timezone
85-
from typing import Any, Optional, Union, overload
86+
from typing import Any, Optional, Union, cast, overload
8687

8788
from dateutil import parser
8889
from typing_extensions import Never
@@ -360,8 +361,8 @@ def ab_datetime_now() -> AirbyteDateTime:
360361

361362

362363
def ab_datetime_parse(
363-
dt_str: str | int,
364-
formats: list[str | None] | None = None,
364+
dt_str: str | int | float,
365+
formats: list[str | None] | list[str] | None = None,
365366
disallow_other_formats: bool = False,
366367
) -> AirbyteDateTime:
367368
"""Parses a datetime string or timestamp into an AirbyteDateTime with timezone awareness.
@@ -404,6 +405,27 @@ def ab_datetime_parse(
404405
# Remove None values from formats list, and coalesce to None if empty
405406
formats = [f for f in formats or [] if f] or None
406407

408+
if isinstance(dt_str, str):
409+
if dt_str.startswith("-"):
410+
raise ValueError("Timestamp cannot be negative: " + dt_str)
411+
412+
if dt_str[1:].replace(".", "").isdigit():
413+
# Handle floats and ints as strings
414+
if "." in dt_str:
415+
dt_str = float(dt_str)
416+
else:
417+
dt_str = int(dt_str)
418+
419+
if isinstance(dt_str, float):
420+
# Handle float values as Unix timestamps (UTC)
421+
if dt_str < 0:
422+
raise ValueError("Timestamp cannot be negative")
423+
if len(str(abs(int(dt_str)))) > 10:
424+
raise ValueError("Timestamp value too large")
425+
426+
instant = Instant.from_timestamp(dt_str)
427+
return AirbyteDateTime.from_datetime(instant.py_datetime())
428+
407429
# Handle numeric values as Unix timestamps (UTC)
408430
if isinstance(dt_str, int) or (
409431
isinstance(dt_str, str)

airbyte_cdk/utils/stream_status_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
TraceType,
1818
)
1919
from airbyte_cdk.models import Type as MessageType
20+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
2021

2122

2223
def as_airbyte_message(
@@ -28,7 +29,7 @@ def as_airbyte_message(
2829
Builds an AirbyteStreamStatusTraceMessage for the provided stream
2930
"""
3031

31-
now_millis = datetime.now().timestamp() * 1000.0
32+
now_millis = ab_datetime_now().to_epoch_millis()
3233

3334
trace_message = AirbyteTraceMessage(
3435
type=TraceType.STREAM_STATUS,

0 commit comments

Comments
 (0)