Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ repos:
- id: check-toml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.3
rev: v0.11.5
hooks:
# Run the linter with repo-defined settings
- id: ruff
Expand Down
9 changes: 5 additions & 4 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import time
from copy import copy
from typing import Any, List, MutableMapping
from typing import Any, List
from collections.abc import MutableMapping

import orjson

Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(
non_observed_mapping[item] = ObservedDict(value, observer)

# Observe nested list of dicts
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, observer)
Expand All @@ -52,11 +53,11 @@ def __setitem__(self, item: Any, value: Any) -> None:
previous_value = self.get(item)
if isinstance(value, MutableMapping):
value = ObservedDict(value, self.observer)
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, self.observer)
super(ObservedDict, self).__setitem__(item, value)
super().__setitem__(item, value)
if self.update_on_unchanged_value or value != previous_value:
self.observer.update()

Expand Down
7 changes: 4 additions & 3 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
from typing import Any, Generic, Optional, Protocol, TypeVar
from collections.abc import Mapping

import yaml

Expand All @@ -19,7 +20,7 @@
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
def load_optional_package_file(package: str, filename: str) -> bytes | None:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
Expand Down Expand Up @@ -52,7 +53,7 @@ def read_config(config_path: str) -> Mapping[str, Any]:

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
with open(file_path) as file:
contents = file.read()

try:
Expand Down
7 changes: 4 additions & 3 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@


from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List, Mapping
from typing import Any, Dict, List
from collections.abc import Mapping

from airbyte_cdk.connector_builder.test_reader import TestReader
from airbyte_cdk.models import (
Expand Down Expand Up @@ -74,7 +75,7 @@ def read_stream(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
state: list[AirbyteStateMessage],
limits: TestLimits,
) -> AirbyteMessage:
try:
Expand Down Expand Up @@ -128,7 +129,7 @@ def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits)
for stream in streams:
stream["dynamic_stream_name"] = None

mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
mapped_streams: dict[str, list[dict[str, Any]]] = {}
for stream in source.dynamic_streams:
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

Expand Down
19 changes: 10 additions & 9 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@


import sys
from typing import Any, List, Mapping, Optional, Tuple
from typing import Any, List, Optional, Tuple
from collections.abc import Mapping

import orjson

Expand All @@ -31,8 +32,8 @@


def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
args: list[str],
) -> tuple[str, Mapping[str, Any], ConfiguredAirbyteCatalog | None, Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
Expand Down Expand Up @@ -71,24 +72,24 @@ def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
catalog: ConfiguredAirbyteCatalog | None,
state: list[AirbyteStateMessage],
limits: TestLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
assert catalog is not None, (
"`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
)
return read_stream(source, config, catalog, state, limits)
elif command == "full_resolve_manifest":
return full_resolve_manifest(source, limits)
else:
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]) -> str:
def handle_request(args: list[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
Expand Down
48 changes: 24 additions & 24 deletions airbyte_cdk/connector_builder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@
@dataclass
class HttpResponse:
status: int
body: Optional[str] = None
headers: Optional[Dict[str, Any]] = None
body: str | None = None
headers: dict[str, Any] | None = None


@dataclass
class HttpRequest:
url: str
headers: Optional[Dict[str, Any]]
headers: dict[str, Any] | None
http_method: str
body: Optional[str] = None
body: str | None = None


@dataclass
class LogMessage:
message: str
level: str
internal_message: Optional[str] = None
stacktrace: Optional[str] = None
internal_message: str | None = None
stacktrace: str | None = None


@dataclass
Expand All @@ -40,34 +40,34 @@ class AuxiliaryRequest:

@dataclass
class StreamReadPages:
records: List[object]
request: Optional[HttpRequest] = None
response: Optional[HttpResponse] = None
records: list[object]
request: HttpRequest | None = None
response: HttpResponse | None = None


@dataclass
class StreamReadSlices:
pages: List[StreamReadPages]
slice_descriptor: Optional[Dict[str, Any]]
state: Optional[List[Dict[str, Any]]] = None
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None
pages: list[StreamReadPages]
slice_descriptor: dict[str, Any] | None
state: list[dict[str, Any]] | None = None
auxiliary_requests: list[AuxiliaryRequest] | None = None


@dataclass
class StreamRead(object):
logs: List[LogMessage]
slices: List[StreamReadSlices]
class StreamRead:
logs: list[LogMessage]
slices: list[StreamReadSlices]
test_read_limit_reached: bool
auxiliary_requests: List[AuxiliaryRequest]
inferred_schema: Optional[Dict[str, Any]]
inferred_datetime_formats: Optional[Dict[str, str]]
latest_config_update: Optional[Dict[str, Any]]
auxiliary_requests: list[AuxiliaryRequest]
inferred_schema: dict[str, Any] | None
inferred_datetime_formats: dict[str, str] | None
latest_config_update: dict[str, Any] | None


@dataclass
class StreamReadRequestBody:
manifest: Dict[str, Any]
manifest: dict[str, Any]
stream: str
config: Dict[str, Any]
state: Optional[Dict[str, Any]]
record_limit: Optional[int]
config: dict[str, Any]
state: dict[str, Any] | None
record_limit: int | None
Loading
Loading