🚨🚨[DON'T MERGE] all CDK changes for google analytics migration🚨🚨#554
🚨🚨[DON'T MERGE] all CDK changes for google analytics migration🚨🚨#554Serhii Lazebnyi (lazebnyi) wants to merge 3 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughThis update introduces two new record extractors— Changes
Sequence Diagram(s)sequenceDiagram
participant HTTPResponse as HTTP Response
participant KeyExtractor as keys_extractor
participant ValueExtractor as values_extractor
participant KeyValueExtractor
participant Caller
Caller->>KeyValueExtractor: extract_records(response)
KeyValueExtractor->>KeyExtractor: extract_records(response)
KeyValueExtractor->>ValueExtractor: extract_records(response)
KeyValueExtractor->>KeyValueExtractor: Zip keys and values
KeyValueExtractor-->>Caller: Yield key-value mapped records
sequenceDiagram
participant HTTPResponse as HTTP Response
participant Extractor1
participant Extractor2
participant CombinedExtractor
participant Caller
Caller->>CombinedExtractor: extract_records(response)
CombinedExtractor->>Extractor1: extract_records(response)
CombinedExtractor->>Extractor2: extract_records(response)
CombinedExtractor->>CombinedExtractor: Zip and merge records
CombinedExtractor-->>Caller: Yield merged records
sequenceDiagram
participant DynamicSchemaLoader
participant RecordFilter
participant SchemaTransformer
participant Caller
Caller->>DynamicSchemaLoader: get_json_schema()
DynamicSchemaLoader->>DynamicSchemaLoader: Extract properties
alt schema_filter exists
DynamicSchemaLoader->>RecordFilter: filter(properties)
RecordFilter-->>DynamicSchemaLoader: filtered properties
end
DynamicSchemaLoader->>SchemaTransformer: transform(filtered properties)
SchemaTransformer-->>DynamicSchemaLoader: transformed schema
DynamicSchemaLoader-->>Caller: Return schema
Would you like to see a diagram for the multi-stream config merging process as well, or is this sufficient for your needs? Wdyt? Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
/autofix
|
There was a problem hiding this comment.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3698-3723: 🛠️ Refactor suggestion
stream_configcan beNone– guard against it before iteratingWhen
stream_configis missing the current logic constructs[None], and the subsequent_create_component_from_model(...)call will crash type-checking (and possibly at runtime).- model_stream_configs = ( - model.stream_config if isinstance(model.stream_config, list) else [model.stream_config] - ) + if model.stream_config is None: + model_stream_configs = [] + elif isinstance(model.stream_config, list): + model_stream_configs = model.stream_config + else: + model_stream_configs = [model.stream_config]This preserves the new list-normalisation behaviour while keeping the code safe when the field is omitted.
Would you like to adopt this guard?
🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (7)
1744-1769: Could we improve the KeyValueExtractor metadata?
The currentdescriptionis just a placeholder and theenumhas an extra space ([ KeyValueExtractor ]). To make this self‐documenting, perhaps we could update to something like:title: Key Value Extractor description: Extracts records as key/value pairs by applying separate extractors for keys and values. enum: [KeyValueExtractor]and replace the placeholder descriptions under
keys_extractorandvalues_extractorwith meaningful text or examples. wdyt?
1770-1791: Could we refine the CombinedExtractor schema?
Thedescriptionis currently incomplete ("Record extractor that extract with .") and theextractorsproperty lacks guidance. Maybe we can update to:title: Combined Extractor description: Sequentially applies multiple extractors and merges their outputs into a single record list. properties: extractors: description: List of extractor components to run in order; results are concatenated.This will help users understand when to use it. wdyt?
2369-2374: Could we flesh out the DynamicSchemaLoaderschema_filter?
Thedescription: placeholderdoesn’t convey its purpose. How about:title: Schema Filter description: Filter applied to schema properties after extraction; only fields matching this filter will be retained.This aligns with the new filtering feature. wdyt?
3372-3373: Shall we update the RecordSelector docs to include new extractors?
We addedCombinedExtractorandKeyValueExtractorunderextractor.anyOf—should we also adjust thedescriptionhere to mention these new options for clarity? wdyt?
4053-4056: Can we add metadata for thecreate_or_updateflag?
The new boolean has notitleordescription. Perhaps:title: Create or Update description: If true, existing component mappings are updated instead of only creating new ones.Adding this will improve discoverability. wdyt?
4107-4110: Could we complete the StreamConfigdefault_valuesdefinition?
Thedescription: placeholderand missingitemsschema make it unclear. For example:title: Default Values description: Fallback values injected into each stream config when no explicit value is provided. type: array items: anyOf: - type: string - type: numberThis will make usage obvious. wdyt?
4122-4127: Shall we clarify that ConfigComponentsResolver accepts single or multiple configs?
Thestream_config.anyOfnow allows an array or singleStreamConfig. Could we update the surrounding description to:description: Accepts one or many StreamConfig objects to resolve components from config.to explicitly call this out? wdyt?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1847-1856: Improve docstrings forKeyValueExtractorBoth
keys_extractorandvalues_extractorcurrently say"placeholder", which surfaces in generated docs.
Shall we add a brief description & example so connector authors understand how to use the component? wdyt?
1858-1864: Guard the self-referential annotationThe recursive union references
CombinedExtractordirectly. Thanks tofrom __future__ import annotationsthis works today, but removing that import later would raise aNameError.
Would quoting the inner reference ("CombinedExtractor") add a bit more safety for future refactors?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml(6 hunks)airbyte_cdk/sources/declarative/extractors/__init__.py(2 hunks)airbyte_cdk/sources/declarative/extractors/combined_extractor.py(1 hunks)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py(1 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(9 hunks)airbyte_cdk/sources/declarative/resolvers/components_resolver.py(2 hunks)airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py(6 hunks)airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
airbyte_cdk/sources/declarative/extractors/__init__.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
CombinedExtractor(19-44)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
CombinedExtractor(1858-1863)DpathExtractor(696-709)KeyValueExtractor(1847-1855)airbyte_cdk/sources/declarative/extractors/http_selector.py (1)
HttpSelector(13-37)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
KeyValueExtractor(19-46)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (17)
Config(134-135)Config(148-149)Config(162-163)Config(176-177)Config(194-195)Config(208-209)Config(222-223)Config(236-237)Config(250-251)Config(264-265)Config(278-279)Config(292-293)Config(308-309)Config(322-323)Config(336-337)Config(370-371)CombinedExtractor(1858-1863)airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
RecordExtractor(12-27)airbyte_cdk/sources/declarative/interpolation/interpolated_string.py (1)
InterpolatedString(13-79)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
extract_records(38-46)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
StreamConfig(1485-1494)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
[error] 183-183: Need type annotation for "filtered_properties" (hint: "filtered_properties: dict[, ] = ...") [var-annotated]
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2476-2476: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (18)
airbyte_cdk/sources/declarative/extractors/__init__.py (2)
5-8: Excellent addition of new extractor classes!I see you've added imports for two new extractor classes (
CombinedExtractorandKeyValueExtractor) that provide flexible record extraction strategies. The imports are appropriately placed alongside other extractors.
23-24: Good job updating the all listThe new extractor classes have been properly added to the
__all__list to make them part of the public API.airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)
25-25: Good addition of create_or_update flagAdding the
create_or_updateflag toComponentMappingDefinitionwith a default value ofFalseprovides nice flexibility for component resolution. This will support the broader enhancement of conditional creation/updating of configuration paths.
38-38: Consistency maintained between related dataclassesThe same
create_or_updateflag has been properly added toResolvedComponentMappingDefinitionas well, maintaining consistency between these related dataclasses.airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
265-265: Nice refactoring to use the property accessorYou've replaced the direct method call with the property accessor, which is cleaner and more maintainable. The
dynamic_streamsproperty properly encapsulates the implementation details.airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)
13-13: Good addition of RecordFilter importThis import supports the new filtering capability you're adding to the schema loader.
130-130: Well-designed optional schema_filter parameterAdding the optional schema filter parameter extends the DynamicSchemaLoader's capabilities while maintaining backward compatibility.
156-156: Good implementation of the filtering workflowYou've modified the property transformation flow to apply filtering before transformation, which is a logical sequence.
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (2)
5-15: Trim the unused imports to calm the linters, wdyt?
dpath,Decoder,JsonDecoder,InterpolatedString,InitVar,field(and evenMapping,List,Union) are not referenced anywhere in the file. Keeping them around will trigger static-analysis noise and slightly lengthen import time.-from dataclasses import InitVar, dataclass, field -from itertools import islice -from typing import Any, Iterable, List, Mapping, MutableMapping, Union - -import dpath -import requests - -from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder -from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.types import Config +from dataclasses import dataclass +from itertools import islice +from typing import Any, Iterable, MutableMapping + +import requests + +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractorThis should silence the “unused import” warnings without altering behaviour.
[ suggest_nitpick ]
38-46: Guard against “no-keys” & type-mismatch corner cases, wdyt?
- If
keys_extractoryields an empty list, we enter an infinitewhile Trueloop that breaks immediately but still performs a useless iteration. A short-circuit test would be clearer.- We assume each item yielded by
keys_extractoris hashable and represents a single key. If a downstream user returns a dict (a common pattern for other extractors) we’ll end up with unintuitivedict_keysobjects inzip. A tiny validation layer could prevent surprises.keys = list(self.keys_extractor.extract_records(response)) - values = self.values_extractor.extract_records(response) - - while True: - chunk = list(islice(values, len(keys))) - if not chunk: - break - yield dict(zip(keys, chunk)) + if not keys: + return # nothing to pair – early exit + + if any(isinstance(k, MutableMapping) for k in keys): + raise ValueError("keys_extractor must yield hashable scalar values, got mapping(s)") + + values = self.values_extractor.extract_records(response) + + while True: + chunk = list(islice(values, len(keys))) + if not chunk: + break + yield dict(zip(keys, chunk))Early-exit + validation should make the extractor more robust while keeping the algorithm intact.
[ suggest_optional_refactor ]airbyte_cdk/sources/declarative/extractors/combined_extractor.py (3)
5-15: Pare back unused imports for clarity, wdyt?As in the sibling file,
dpath,Decoder,JsonDecoder,InterpolatedString,InitVar,field,Mapping,Union,Listare never used and can be removed to stop linter complaints.[ suggest_nitpick ]
18-33: Doc-string talks about keys/values but this extractor merges arbitrary recordsThe current documentation is a copy-paste from
KeyValueExtractor. It can mislead future maintainers into thinking the class requires exactly two extractors that return parallel key/value arrays. Could we replace it with something like:""" Merges the outputs of N extractors record-by-record. Each extractor is invoked with the same HTTP response; the first record from each extractor is merged together (later extractors override keys set by earlier ones), then the second record from each extractor and so on until the shortest extractor is exhausted. """Updating the narrative now will avoid confusion later, wdyt?
[ suggest_nitpick ]
37-44: Handle uneven extractor lengths or preferzip_longest, wdyt?Using
zip(*iterables)stops at the shortest iterable, silently discarding surplus records from longer extractors. Would adoptingitertools.zip_longest(with e.g.{}as the fill value) make the behaviour clearer and safer?-from records in zip(*extractors_records): +from records in itertools.zip_longest(*extractors_records, fillvalue={})You may also want to skip or log when a filler value is encountered to surface data-alignment issues.
[ suggest_optional_refactor ]airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (3)
97-125: Missing type hints are breaking CI – extract helpers & annotate them, wdyt?
_stream_configdefines several nested functions without annotations; mypy/ruff are flagging them (no-untyped-def,arg-type, etc.). CI is currently red. Splitting them into private methods and adding signatures will both appease the linters and improve readability.Example sketch:
- def _stream_config(self): - def resolve_path(pointer): + def _stream_config(self) -> List[Dict[str, Any]]: + def _resolve_path(pointer: List[InterpolatedString | str]) -> List[str]: ... - - def normalize_configs(configs): + def _normalize_configs(configs: Any) -> List[Any]: ... - - def prepare_streams(): + def _prepare_streams() -> List[List[tuple[int, Any]]]: ... - - def merge_combination(combo): + def _merge_combination(combo: tuple[tuple[int, Any], ...]) -> Dict[str, Any]: ... - - all_indexed_streams = list(prepare_streams()) - return [merge_combination(combo) for combo in product(*all_indexed_streams)] + all_indexed_streams = list(_prepare_streams()) + return [_merge_combination(combo) for combo in product(*all_indexed_streams)]This change should resolve the reported linter errors (97-125) and make the intent of each helper explicit.
[ raise_critical_issue ]🧰 Tools
🪛 GitHub Actions: Linters
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
111-113:stream_configs += default_valuesmutates the original listBecause
dpath.getmay return a direct reference to a sub-list insideself.config, in-place+=could unintentionally mutate the user-supplied configuration. Would making a shallow copy before appending be safer?- stream_configs = dpath.get(dict(self.config), path, default=[]) - stream_configs = normalize_configs(stream_configs) - if stream_config.default_values: - stream_configs += stream_config.default_values + raw = dpath.get(dict(self.config), path, default=[]) + stream_configs = normalize_configs(list(raw)) # copy + if stream_config.default_values: + stream_configs.extend(stream_config.default_values)This avoids side-effects on
self.config, wdyt?
[ suggest_optional_refactor ]🧰 Tools
🪛 GitHub Actions: Linters
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
153-159: Graceful path creation: shoulddpath.newrespectcreate_or_update=False?Right now we attempt
dpath.set, and if it fails we always fall back todpath.newprovidedcreate_or_updateis true. All good. Ifcreate_or_updateis false the value is silently dropped. Would logging or raising a descriptive error help users debug why a mapping hasn’t been applied, wdyt?
[ suggest_nitpick ]airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1499-1500: Potential naming mismatch withstream_configsElsewhere in the PR the corresponding resolver expects
stream_configs; here the field isstream_config.
Would accepting the plural form via an alias (or renaming the attribute) help avoid runtime KeyErrors? wdyt?- stream_config: Union[List[StreamConfig], StreamConfig] + stream_config: Union[List[StreamConfig], StreamConfig] = Field( + ..., + alias="stream_configs", + )
2436-2438: Clarify execution order betweenschema_filterandschema_transformationsWhen both are provided, which one runs first? A short note in the field description—or a
root_validatorenforcing the order—could prevent subtle bugs. What do you think?
|
/autofix
|
🚨🚨DON'T MERGE THIS PR🚨🚨
This PR contains changes to the CDK that are intended for pre-release. It is used to run CI tests and avoid blocking the migration while CDK changes are under review.
USED IN airbytehq/airbyte#60342
Summary by CodeRabbit
New Features
Enhancements