Skip to content

Commit 0b5cf79

Browse files
feat: add JsonItemsDecoder for streaming large JSON responses
Adds a new declarative decoder, JsonItemsDecoder, that streams elements of a nested array out of a single JSON document one at a time using the ijson library. This lets manifest-only connectors decode multi-GB JSON responses (e.g. Amazon Seller Partner Brand Analytics reports) without loading the full document into memory. - New `JsonItemsParser` in composite_raw_decoder.py (wraps ijson.items) - New `JsonItemsDecoder` schema entry, wired into GzipDecoder / ZipfileDecoder / top-level decoder unions so it composes with the existing decoder hierarchy - Pydantic models regenerated from schema - Factory: create_json_items_decoder + JsonItemsDecoderModel handling in _get_parser - Drop ijson from deptry DEP002 ignore list now that the CDK imports it directly; update pyproject.toml comment to reflect first-class use - Unit tests covering top-level, nested, empty, encoding, gzip composition, missing path validation, and lazy streaming behavior
1 parent 9852599 commit 0b5cf79

6 files changed

Lines changed: 415 additions & 156 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2628,6 +2628,40 @@ definitions:
26282628
type:
26292629
type: string
26302630
enum: [JsonDecoder]
2631+
JsonItemsDecoder:
2632+
title: JSON Items (Streaming)
2633+
description: >-
2634+
Select 'JSON Items (Streaming)' to stream-decode a single JSON document
2635+
by yielding each element of a nested array, one at a time. Use this for
2636+
very large single-document JSON responses (e.g. a wrapping object
2637+
containing a multi-GB array) where buffering the whole document into
2638+
memory would cause out-of-memory errors. Powered by the `ijson`
2639+
streaming parser.
2640+
type: object
2641+
required:
2642+
- type
2643+
- items_path
2644+
properties:
2645+
type:
2646+
type: string
2647+
enum: [JsonItemsDecoder]
2648+
items_path:
2649+
title: Items Path
2650+
description: >-
2651+
Dot-separated path to the JSON array whose elements should be
2652+
yielded as records. Uses `ijson` path syntax (e.g. `data.users`),
2653+
not JSONPath syntax — do not include leading `$.` or trailing
2654+
`[*]`.
2655+
type: string
2656+
examples:
2657+
- dataByDepartmentAndSearchTerm
2658+
- dataByAsin
2659+
- data.users
2660+
encoding:
2661+
title: Encoding
2662+
description: Text encoding used to decode the streamed bytes before JSON parsing.
2663+
type: string
2664+
default: utf-8
26312665
JsonlDecoder:
26322666
title: JSON Lines
26332667
description: Select 'JSON Lines' if the response consists of JSON objects separated by new lines ('\n') in JSONL format.
@@ -2869,6 +2903,7 @@ definitions:
28692903
- "$ref": "#/definitions/CsvDecoder"
28702904
- "$ref": "#/definitions/GzipDecoder"
28712905
- "$ref": "#/definitions/JsonDecoder"
2906+
- "$ref": "#/definitions/JsonItemsDecoder"
28722907
- "$ref": "#/definitions/JsonlDecoder"
28732908
ListPartitionRouter:
28742909
title: List Partition Router
@@ -3909,6 +3944,7 @@ definitions:
39093944
description: Component decoding the response so records can be extracted.
39103945
anyOf:
39113946
- "$ref": "#/definitions/JsonDecoder"
3947+
- "$ref": "#/definitions/JsonItemsDecoder"
39123948
- "$ref": "#/definitions/XmlDecoder"
39133949
- "$ref": "#/definitions/CsvDecoder"
39143950
- "$ref": "#/definitions/JsonlDecoder"
@@ -3997,6 +4033,7 @@ definitions:
39974033
- "$ref": "#/definitions/CsvDecoder"
39984034
- "$ref": "#/definitions/GzipDecoder"
39994035
- "$ref": "#/definitions/JsonDecoder"
4036+
- "$ref": "#/definitions/JsonItemsDecoder"
40004037
- "$ref": "#/definitions/JsonlDecoder"
40014038
CsvDecoder:
40024039
title: CSV
@@ -4163,6 +4200,7 @@ definitions:
41634200
- "$ref": "#/definitions/CsvDecoder"
41644201
- "$ref": "#/definitions/GzipDecoder"
41654202
- "$ref": "#/definitions/JsonDecoder"
4203+
- "$ref": "#/definitions/JsonItemsDecoder"
41664204
- "$ref": "#/definitions/JsonlDecoder"
41674205
- "$ref": "#/definitions/IterableDecoder"
41684206
- "$ref": "#/definitions/XmlDecoder"
@@ -4175,6 +4213,7 @@ definitions:
41754213
- "$ref": "#/definitions/CsvDecoder"
41764214
- "$ref": "#/definitions/GzipDecoder"
41774215
- "$ref": "#/definitions/JsonDecoder"
4216+
- "$ref": "#/definitions/JsonItemsDecoder"
41784217
- "$ref": "#/definitions/JsonlDecoder"
41794218
- "$ref": "#/definitions/IterableDecoder"
41804219
- "$ref": "#/definitions/XmlDecoder"

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from io import BufferedIOBase, TextIOWrapper
1212
from typing import Any, List, Optional
1313

14+
import ijson
1415
import orjson
1516
import requests
1617

@@ -98,6 +99,33 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
9899
logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")
99100

100101

102+
@dataclass
103+
class JsonItemsParser(Parser):
104+
"""Streaming JSON parser that yields each element of a nested array.
105+
106+
Use this for very large single-document JSON responses where the records
107+
of interest live under a nested array (e.g. `dataByDepartmentAndSearchTerm`,
108+
`data.users`). Powered by `ijson`, this parser does not materialize the
109+
full document — peak memory is bounded by a single record plus ijson's
110+
internal parse buffers, regardless of document size.
111+
112+
`items_path` uses `ijson` dotted path syntax (e.g. `data.users`), not
113+
JSONPath syntax (`$.data.users[*]`). Internally we append `.item`, which
114+
is the `ijson` convention for "iterate elements of this array".
115+
"""
116+
117+
items_path: str = ""
118+
encoding: Optional[str] = "utf-8"
119+
120+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
121+
if not self.items_path:
122+
raise ValueError("JsonItemsParser requires a non-empty items_path.")
123+
# ijson auto-selects the best available backend (yajl2_c when present)
124+
# and reads from `data` lazily — it does not call `.read()` on the
125+
# whole stream up front.
126+
yield from ijson.items(data, f"{self.items_path}.item")
127+
128+
101129
@dataclass
102130
class CsvParser(Parser):
103131
# TODO: migrate implementation to re-use file-base classes

0 commit comments

Comments
 (0)