-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathtest_utils.py
More file actions
68 lines (52 loc) · 2.64 KB
/
test_utils.py
File metadata and controls
68 lines (52 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from unittest.mock import Mock, patch
from airbyte_cdk.manifest_server.command_processor.utils import (
SHOULD_MIGRATE_KEY,
SHOULD_NORMALIZE_KEY,
build_catalog,
build_source,
)
class TestManifestUtils:
"""Test cases for the utils module."""
def test_build_catalog_creates_correct_structure(self):
"""Test that build_catalog creates a properly structured ConfiguredAirbyteCatalog."""
stream_name = "test_stream"
catalog = build_catalog(stream_name)
# Verify catalog structure
assert len(catalog.streams) == 1
configured_stream = catalog.streams[0]
assert configured_stream.stream.name == stream_name
assert configured_stream.stream.json_schema == {}
# Verify sync modes
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode
assert SyncMode.full_refresh in configured_stream.stream.supported_sync_modes
assert SyncMode.incremental in configured_stream.stream.supported_sync_modes
assert configured_stream.sync_mode == SyncMode.incremental
assert configured_stream.destination_sync_mode == DestinationSyncMode.overwrite
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
def test_build_source_with_normalize_flag(self, mock_source_class):
"""Test build_source when normalize flag is set."""
mock_source = Mock()
mock_source_class.return_value = mock_source
manifest = {"streams": [{"name": "test_stream"}], SHOULD_NORMALIZE_KEY: True}
config = {"api_key": "test_key"}
catalog = build_catalog("test_stream")
state = []
build_source(manifest, catalog, config, state)
# Verify normalize_manifest is True
call_args = mock_source_class.call_args[1]
assert call_args["normalize_manifest"] is True
assert call_args["migrate_manifest"] is False
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
def test_build_source_with_migrate_flag(self, mock_source_class):
"""Test build_source when migrate flag is set."""
mock_source = Mock()
mock_source_class.return_value = mock_source
manifest = {"streams": [{"name": "test_stream"}], SHOULD_MIGRATE_KEY: True}
config = {"api_key": "test_key"}
catalog = build_catalog("test_stream")
state = []
build_source(manifest, catalog, config, state)
# Verify migrate_manifest is True
call_args = mock_source_class.call_args[1]
assert call_args["normalize_manifest"] is False
assert call_args["migrate_manifest"] is True