Skip to content

Commit c0e2647

Browse files
tolik0devin-ai-integration[bot]claude
committed
feat: add JsonItemsDecoder for streaming large JSON responses
Adds a streaming JSON decoder for very large single-document JSON responses where the records live under a nested array. JsonItemsParser yields each array element via ijson, so peak memory is bounded by a single record rather than the whole document. Composes with the existing CompositeRawDecoder hierarchy (gzip/zip) and is wired into the decoder unions + factory. Adds ijson as a first-class CDK dependency. JsonItemsParser also honors a configured non-UTF-8 encoding by transcoding to UTF-8 bytes via a lazy streaming recoder, keeping ijson on its native byte backend. Adopts and supersedes #1026. Co-Authored-By: devin-ai-integration[bot] <devin-ai-integration[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent fd95ecf commit c0e2647

7 files changed

Lines changed: 643 additions & 282 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2650,6 +2650,40 @@ definitions:
26502650
type:
26512651
type: string
26522652
enum: [JsonDecoder]
2653+
JsonItemsDecoder:
2654+
title: JSON Items (Streaming)
2655+
description: >-
2656+
Select 'JSON Items (Streaming)' to stream-decode a single JSON document
2657+
by yielding each element of a nested array, one at a time. Use this for
2658+
very large single-document JSON responses (e.g. a wrapping object
2659+
containing a multi-GB array) where buffering the whole document into
2660+
memory would cause out-of-memory errors. Powered by the `ijson`
2661+
streaming parser.
2662+
type: object
2663+
required:
2664+
- type
2665+
- items_path
2666+
properties:
2667+
type:
2668+
type: string
2669+
enum: [JsonItemsDecoder]
2670+
items_path:
2671+
title: Items Path
2672+
description: >-
2673+
Dot-separated path to the JSON array whose elements should be
2674+
yielded as records. Uses `ijson` path syntax (e.g. `data.users`),
2675+
not JSONPath syntax — do not include leading `$.` or trailing
2676+
`[*]`.
2677+
type: string
2678+
examples:
2679+
- dataByDepartmentAndSearchTerm
2680+
- dataByAsin
2681+
- data.users
2682+
encoding:
2683+
title: Encoding
2684+
description: Text encoding used to decode the streamed bytes before JSON parsing.
2685+
type: string
2686+
default: utf-8
26532687
JsonlDecoder:
26542688
title: JSON Lines
26552689
description: Select 'JSON Lines' if the response consists of JSON objects separated by new lines ('\n') in JSONL format.
@@ -2891,6 +2925,7 @@ definitions:
28912925
- "$ref": "#/definitions/CsvDecoder"
28922926
- "$ref": "#/definitions/GzipDecoder"
28932927
- "$ref": "#/definitions/JsonDecoder"
2928+
- "$ref": "#/definitions/JsonItemsDecoder"
28942929
- "$ref": "#/definitions/JsonlDecoder"
28952930
ListPartitionRouter:
28962931
title: List Partition Router
@@ -3931,6 +3966,7 @@ definitions:
39313966
description: Component decoding the response so records can be extracted.
39323967
anyOf:
39333968
- "$ref": "#/definitions/JsonDecoder"
3969+
- "$ref": "#/definitions/JsonItemsDecoder"
39343970
- "$ref": "#/definitions/XmlDecoder"
39353971
- "$ref": "#/definitions/CsvDecoder"
39363972
- "$ref": "#/definitions/JsonlDecoder"
@@ -4019,6 +4055,7 @@ definitions:
40194055
- "$ref": "#/definitions/CsvDecoder"
40204056
- "$ref": "#/definitions/GzipDecoder"
40214057
- "$ref": "#/definitions/JsonDecoder"
4058+
- "$ref": "#/definitions/JsonItemsDecoder"
40224059
- "$ref": "#/definitions/JsonlDecoder"
40234060
CsvDecoder:
40244061
title: CSV
@@ -4185,6 +4222,7 @@ definitions:
41854222
- "$ref": "#/definitions/CsvDecoder"
41864223
- "$ref": "#/definitions/GzipDecoder"
41874224
- "$ref": "#/definitions/JsonDecoder"
4225+
- "$ref": "#/definitions/JsonItemsDecoder"
41884226
- "$ref": "#/definitions/JsonlDecoder"
41894227
- "$ref": "#/definitions/IterableDecoder"
41904228
- "$ref": "#/definitions/XmlDecoder"
@@ -4197,6 +4235,7 @@ definitions:
41974235
- "$ref": "#/definitions/CsvDecoder"
41984236
- "$ref": "#/definitions/GzipDecoder"
41994237
- "$ref": "#/definitions/JsonDecoder"
4238+
- "$ref": "#/definitions/JsonItemsDecoder"
42004239
- "$ref": "#/definitions/JsonlDecoder"
42014240
- "$ref": "#/definitions/IterableDecoder"
42024241
- "$ref": "#/definitions/XmlDecoder"

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
import codecs
56
import csv
67
import gzip
78
import io
@@ -11,6 +12,7 @@
1112
from io import BufferedIOBase, TextIOWrapper
1213
from typing import Any, List, Optional
1314

15+
import ijson
1416
import orjson
1517
import requests
1618

@@ -98,6 +100,58 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
98100
logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")
99101

100102

103+
class _Utf8Recoder:
104+
"""Lazily transcode a byte stream from a source encoding into UTF-8 bytes.
105+
106+
Lets a non-UTF-8 byte stream be fed to ijson while keeping it on the native byte
107+
backend (ijson deprecates text-mode inputs). Bytes are read from the underlying
108+
stream and decoded incrementally, so multi-byte characters split across read
109+
boundaries are handled and memory stays bounded regardless of document size.
110+
"""
111+
112+
def __init__(self, stream: BufferedIOBase, encoding: str) -> None:
113+
self._stream = stream
114+
self._decoder = codecs.getincrementaldecoder(encoding)()
115+
116+
def read(self, size: int = -1) -> bytes:
117+
chunk = self._stream.read(size)
118+
# `final` once the underlying stream is exhausted so a trailing partial sequence flushes.
119+
return self._decoder.decode(chunk, final=not chunk).encode("utf-8")
120+
121+
122+
@dataclass
123+
class JsonItemsParser(Parser):
124+
"""Streaming JSON parser that yields each element of a nested array.
125+
126+
Use this for very large single-document JSON responses where the records
127+
of interest live under a nested array (e.g. `dataByDepartmentAndSearchTerm`,
128+
`data.users`). Powered by `ijson`, this parser does not materialize the
129+
full document — peak memory is bounded by a single record plus ijson's
130+
internal parse buffers, regardless of document size.
131+
132+
`items_path` uses `ijson` dotted path syntax (e.g. `data.users`), not
133+
JSONPath syntax (`$.data.users[*]`). Internally we append `.item`, which
134+
is the `ijson` convention for "iterate elements of this array".
135+
"""
136+
137+
items_path: str = ""
138+
encoding: Optional[str] = "utf-8"
139+
140+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
141+
if not self.items_path:
142+
raise ValueError("JsonItemsParser requires a non-empty items_path.")
143+
if self.encoding and codecs.lookup(self.encoding).name != "utf-8":
144+
# ijson reads bytes natively (auto-detecting UTF-8/16/32). For an explicitly
145+
# configured non-UTF-8 encoding (e.g. iso-8859-1) we transcode to UTF-8 bytes
146+
# so ijson keeps using its fast byte backend rather than a (deprecated) text
147+
# stream. The recoder decodes lazily in chunks, preserving bounded memory.
148+
data = _Utf8Recoder(data, self.encoding) # type: ignore[assignment]
149+
# ijson auto-selects the best available backend (yajl2_c when present)
150+
# and reads from `data` lazily — it does not call `.read()` on the
151+
# whole stream up front.
152+
yield from ijson.items(data, f"{self.items_path}.item")
153+
154+
101155
@dataclass
102156
class CsvParser(Parser):
103157
# TODO: migrate implementation to re-use file-base classes

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,20 @@ class JsonDecoder(BaseModel):
674674
type: Literal["JsonDecoder"]
675675

676676

677+
class JsonItemsDecoder(BaseModel):
678+
type: Literal["JsonItemsDecoder"]
679+
items_path: str = Field(
680+
...,
681+
description="Dot-separated path to the JSON array whose elements should be yielded as records. Uses `ijson` path syntax (e.g. `data.users`), not JSONPath syntax \u2014 do not include leading `$.` or trailing `[*]`.",
682+
title="Items Path",
683+
)
684+
encoding: Optional[str] = Field(
685+
"utf-8",
686+
description="The character encoding of the JSON data. Defaults to UTF-8.",
687+
title="Encoding",
688+
)
689+
690+
677691
class JsonlDecoder(BaseModel):
678692
type: Literal["JsonlDecoder"]
679693

@@ -2201,7 +2215,7 @@ class PaginationReset(BaseModel):
22012215

22022216
class GzipDecoder(BaseModel):
22032217
type: Literal["GzipDecoder"]
2204-
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]
2218+
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder]
22052219

22062220

22072221
class RequestBodyGraphQL(BaseModel):
@@ -2339,7 +2353,7 @@ class Config:
23392353
extra = Extra.allow
23402354

23412355
type: Literal["ZipfileDecoder"]
2342-
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field(
2356+
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder] = Field(
23432357
...,
23442358
description="Parser to parse the decompressed data from the zipfile(s).",
23452359
title="Parser",
@@ -3011,6 +3025,7 @@ class SimpleRetriever(BaseModel):
30113025
decoder: Optional[
30123026
Union[
30133027
JsonDecoder,
3028+
JsonItemsDecoder,
30143029
XmlDecoder,
30153030
CsvDecoder,
30163031
JsonlDecoder,
@@ -3144,6 +3159,7 @@ class AsyncRetriever(BaseModel):
31443159
CsvDecoder,
31453160
GzipDecoder,
31463161
JsonDecoder,
3162+
JsonItemsDecoder,
31473163
JsonlDecoder,
31483164
IterableDecoder,
31493165
XmlDecoder,
@@ -3160,6 +3176,7 @@ class AsyncRetriever(BaseModel):
31603176
CsvDecoder,
31613177
GzipDecoder,
31623178
JsonDecoder,
3179+
JsonItemsDecoder,
31633180
JsonlDecoder,
31643181
IterableDecoder,
31653182
XmlDecoder,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
CompositeRawDecoder,
9898
CsvParser,
9999
GzipParser,
100+
JsonItemsParser,
100101
JsonLineParser,
101102
JsonParser,
102103
Parser,
@@ -321,6 +322,9 @@
321322
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
322323
JsonFileSchemaLoader as JsonFileSchemaLoaderModel,
323324
)
325+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
326+
JsonItemsDecoder as JsonItemsDecoderModel,
327+
)
324328
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
325329
JsonlDecoder as JsonlDecoderModel,
326330
)
@@ -763,6 +767,7 @@ def _init_mappings(self) -> None:
763767
HttpResponseFilterModel: self.create_http_response_filter,
764768
InlineSchemaLoaderModel: self.create_inline_schema_loader,
765769
JsonDecoderModel: self.create_json_decoder,
770+
JsonItemsDecoderModel: self.create_json_items_decoder,
766771
JsonlDecoderModel: self.create_jsonl_decoder,
767772
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
768773
GzipDecoderModel: self.create_gzip_decoder,
@@ -2671,6 +2676,14 @@ def create_jsonl_decoder(
26712676
stream_response=False if self._emit_connector_builder_messages else True,
26722677
)
26732678

2679+
def create_json_items_decoder(
2680+
self, model: JsonItemsDecoderModel, config: Config, **kwargs: Any
2681+
) -> Decoder:
2682+
return CompositeRawDecoder(
2683+
parser=ModelToComponentFactory._get_parser(model, config),
2684+
stream_response=False if self._emit_connector_builder_messages else True,
2685+
)
2686+
26742687
def create_gzip_decoder(
26752688
self, model: GzipDecoderModel, config: Config, **kwargs: Any
26762689
) -> Decoder:
@@ -2719,6 +2732,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
27192732
if isinstance(model, JsonDecoderModel):
27202733
# Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases
27212734
return JsonParser()
2735+
elif isinstance(model, JsonItemsDecoderModel):
2736+
return JsonItemsParser(
2737+
items_path=model.items_path,
2738+
encoding=model.encoding or "utf-8",
2739+
)
27222740
elif isinstance(model, JsonlDecoderModel):
27232741
return JsonLineParser()
27242742
elif isinstance(model, CsvDecoderModel):

0 commit comments

Comments
 (0)