-
Notifications
You must be signed in to change notification settings - Fork 45
feat: Add InferredSchemaLoader for runtime schema inference #831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
62ba126
ac34204
81cd888
c671ccf
038b58f
32b512a
02050b8
b276315
ccdda0d
c288814
d99c2fa
ff51fa5
6906cb0
e118a20
66db70e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| # | ||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| from collections.abc import Mapping as ABCMapping | ||
| from collections.abc import Sequence | ||
| from dataclasses import InitVar, dataclass | ||
| from typing import Any, Mapping | ||
|
|
||
| from airbyte_cdk.models import AirbyteRecordMessage | ||
| from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever | ||
| from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader | ||
| from airbyte_cdk.sources.types import Config | ||
| from airbyte_cdk.utils.schema_inferrer import SchemaInferrer | ||
|
|
||
|
|
||
| def _to_builtin_types(value: Any) -> Any: | ||
| """ | ||
| Recursively convert Mapping-like and Sequence-like objects to plain Python types. | ||
|
|
||
| This is necessary because genson's schema inference doesn't handle custom Mapping | ||
| or Sequence implementations properly. We need to convert everything to plain dicts, | ||
| lists, and primitive types. | ||
|
|
||
| Args: | ||
| value: The value to convert | ||
|
|
||
| Returns: | ||
| The value converted to plain Python types | ||
| """ | ||
| if isinstance(value, ABCMapping): | ||
| return {k: _to_builtin_types(v) for k, v in value.items()} | ||
| elif isinstance(value, (list, tuple)): | ||
| return [_to_builtin_types(item) for item in value] | ||
| elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)): | ||
| return [_to_builtin_types(item) for item in value] | ||
| else: | ||
| return value | ||
|
|
||
|
|
||
| @dataclass | ||
| class InferredSchemaLoader(SchemaLoader): | ||
| """ | ||
| Infers a JSON Schema by reading a sample of records from the stream at discover time. | ||
|
|
||
| This schema loader reads up to `record_sample_size` records from the stream and uses | ||
| the SchemaInferrer to generate a JSON schema based on the structure of those records. | ||
| This is useful for streams where the schema is not known in advance or changes dynamically. | ||
|
|
||
| Attributes: | ||
| retriever (Retriever): The retriever used to fetch records from the stream | ||
| config (Config): The user-provided configuration as specified by the source's spec | ||
| parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed | ||
| record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100. | ||
| stream_name (str): The name of the stream for which to infer the schema | ||
| """ | ||
|
|
||
| retriever: Retriever | ||
| config: Config | ||
| parameters: InitVar[Mapping[str, Any]] | ||
| record_sample_size: int = 100 | ||
| stream_name: str = "" | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._parameters = parameters | ||
| if not self.stream_name: | ||
| self.stream_name = parameters.get("name", "") | ||
|
aaronsteers marked this conversation as resolved.
|
||
| if not self.stream_name: | ||
| raise ValueError("stream_name must be provided either directly or via the 'name' parameter") | ||
|
aaronsteers marked this conversation as resolved.
Outdated
|
||
|
|
||
| def get_json_schema(self) -> Mapping[str, Any]: | ||
| """ | ||
| Infers and returns a JSON schema by reading a sample of records from the stream. | ||
|
|
||
| This method reads up to `record_sample_size` records from the stream and uses | ||
| the SchemaInferrer to generate a JSON schema. If no records are available, | ||
| it returns an empty schema. | ||
|
|
||
| Returns: | ||
| A mapping representing the inferred JSON schema for the stream | ||
| """ | ||
| schema_inferrer = SchemaInferrer() | ||
|
|
||
| record_count = 0 | ||
| for stream_slice in self.retriever.stream_slices(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does that mean that during a read, we will read some records twice?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question! The InferredSchemaLoader reads records during discover time (when
So yes, some records may be read twice - once during discover and once during read. This is intentional and consistent with how schema inference works. The alternative would be to cache records from discover and replay them during read, but that would add complexity and memory overhead. This behavior is similar to how Is this acceptable, or would you prefer a different approach (e.g., caching records from discover)?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the schema loader called during read here which would mean that the HTTP requests are done once for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excellent catch! You're absolutely right - I checked other schema loaders:
Proposed Solution:
This will ensure schema inference happens only once per sync job, not once per partition. I'll implement this fix now. Does this approach sound good?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've implemented the caching solution! 🎉 Changes made:
Result: The implementation follows standard thread-safe memoization patterns and ensures the cached schema is returned on all subsequent calls. |
||
| for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice): | ||
| if record_count >= self.record_sample_size: | ||
| break | ||
|
|
||
| # Convert all Mapping-like and Sequence-like objects to plain Python types | ||
| # This is necessary because genson doesn't handle custom implementations properly | ||
| record = _to_builtin_types(record) | ||
|
|
||
| airbyte_record = AirbyteRecordMessage( | ||
| stream=self.stream_name, | ||
| data=record, # type: ignore[arg-type] | ||
| emitted_at=0, | ||
| ) | ||
|
|
||
| schema_inferrer.accumulate(airbyte_record) | ||
| record_count += 1 | ||
|
|
||
| if record_count >= self.record_sample_size: | ||
| break | ||
|
|
||
| inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema( | ||
| self.stream_name | ||
| ) | ||
|
|
||
| return inferred_schema if inferred_schema else {} | ||
Uh oh!
There was an error while loading. Please reload this page.