Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions airbyte_cdk/manifest_server/command_processor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,7 @@ def build_source(
page_limit: Optional[int] = None,
slice_limit: Optional[int] = None,
) -> ConcurrentDeclarativeSource:
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
# to retain ordering for the grouping of the builder message responses.
definition = copy.deepcopy(manifest)
if "concurrency_level" in definition:
definition["concurrency_level"]["default_concurrency"] = 1
else:
definition["concurrency_level"] = {
"type": "ConcurrencyLevel",
"default_concurrency": 1,
}

should_normalize = should_normalize_manifest(manifest)
if should_normalize:
Expand Down
13 changes: 12 additions & 1 deletion airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,19 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
"md5": hashlib.md5(request.custom_components_code.encode()).hexdigest()
}

# We enforce a concurrency level of 1 so that the stream is processed on a single thread
# to retain ordering for the grouping of the builder message responses.
manifest = request.manifest.model_dump()
if "concurrency_level" in manifest:
manifest["concurrency_level"]["default_concurrency"] = 1
else:
manifest["concurrency_level"] = {
"type": "ConcurrencyLevel",
"default_concurrency": 1,
}

source = safe_build_source(
request.manifest.model_dump(),
manifest,
config_dict,
catalog,
converted_state,
Expand Down
Loading