Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions airbyte_cdk/manifest_server/command_processor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,7 @@ def build_source(
page_limit: Optional[int] = None,
slice_limit: Optional[int] = None,
) -> ConcurrentDeclarativeSource:
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
# to retain ordering for the grouping of the builder message responses.
definition = copy.deepcopy(manifest)
if "concurrency_level" in definition:
definition["concurrency_level"]["default_concurrency"] = 1
else:
definition["concurrency_level"] = {
"type": "ConcurrencyLevel",
"default_concurrency": 1,
}

should_normalize = should_normalize_manifest(manifest)
if should_normalize:
Expand Down
13 changes: 12 additions & 1 deletion airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,19 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
"md5": hashlib.md5(request.custom_components_code.encode()).hexdigest()
}

# We enforce a concurrency level of 1 so that the stream is processed on a single thread
# to retain ordering for the grouping of the builder message responses.
manifest = request.manifest.model_dump()
if "concurrency_level" in manifest:
manifest["concurrency_level"]["default_concurrency"] = 1
else:
manifest["concurrency_level"] = {
"type": "ConcurrencyLevel",
"default_concurrency": 1,
}

source = safe_build_source(
request.manifest.model_dump(),
manifest,
config_dict,
catalog,
converted_state,
Expand Down
55 changes: 0 additions & 55 deletions unit_tests/manifest_server/command_processor/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,61 +31,6 @@ def test_build_catalog_creates_correct_structure(self):
assert configured_stream.sync_mode == SyncMode.incremental
assert configured_stream.destination_sync_mode == DestinationSyncMode.overwrite

@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
def test_build_source_creates_source(self, mock_source_class):
"""Test that build_source creates a ConcurrentDeclarativeSource with correct parameters."""
# Setup mocks
mock_source = Mock()
mock_source_class.return_value = mock_source

# Test with complex manifest and config structures
manifest = {
"version": "0.1.0",
"definitions": {"selector": {"extractor": {"field_path": ["data"]}}},
"streams": [
{
"name": "users",
"primary_key": "id",
"retriever": {
"requester": {
"url_base": "https://api.example.com",
"path": "/users",
}
},
}
],
"check": {"stream_names": ["users"]},
}

config = {
"api_key": "sk-test-123",
"base_url": "https://api.example.com",
"timeout": 30,
}

# Call build_source with additional parameters
catalog = build_catalog("test_stream")
state = []
result = build_source(manifest, catalog, config, state)

# Verify ConcurrentDeclarativeSource was created with correct parameters
expected_source_config = {
**manifest,
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
}
mock_source_class.assert_called_once_with(
catalog=catalog,
state=state,
source_config=expected_source_config,
config=config,
normalize_manifest=False, # Default when flag not set
migrate_manifest=False, # Default when flag not set
emit_connector_builder_messages=True,
limits=mock_source_class.call_args[1]["limits"],
)

assert result == mock_source

@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
def test_build_source_with_normalize_flag(self, mock_source_class):
"""Test build_source when normalize flag is set."""
Expand Down
13 changes: 11 additions & 2 deletions unit_tests/manifest_server/routers/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ def test_test_read_endpoint_success(

assert response.status_code == 200
# Verify build_source was called with correct arguments
expected_source_config = {
**sample_manifest,
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
}
mock_build_source.assert_called_once_with(
sample_manifest,
expected_source_config,
mock_build_catalog.return_value,
sample_config,
[],
Expand Down Expand Up @@ -169,8 +173,13 @@ def test_test_read_with_custom_components(
assert config_arg["__injected_components_py_checksums"]["md5"] == expected_checksum

# Verify other arguments
# Verify build_source was called with correct arguments
expected_source_config = {
**sample_manifest,
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
}
mock_build_source.assert_called_once_with(
sample_manifest,
expected_source_config,
mock_build_catalog.return_value,
config_arg,
[],
Expand Down
Loading