-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathtest_catalog_providers.py
More file actions
160 lines (135 loc) · 6.42 KB
/
Copy pathtest_catalog_providers.py
File metadata and controls
160 lines (135 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from unittest.mock import Mock
import pytest
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
from airbyte_cdk.sql.shared.catalog_providers import CatalogProvider
class TestCatalogProvider:
"""Test cases for CatalogProvider.get_primary_keys() method."""
def test_get_primary_keys_uses_configured_primary_key_when_set(self):
"""Test that configured primary_key is used when set."""
stream = AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=[["source_id"]],
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=[["configured_id"]],
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == ["configured_id"]
def test_get_primary_keys_falls_back_to_source_defined_when_configured_empty(self):
"""Test that source_defined_primary_key is used when primary_key is empty."""
stream = AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=[["source_id"]],
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=[], # Empty configured primary key
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == ["source_id"]
def test_get_primary_keys_falls_back_to_source_defined_when_configured_none(self):
"""Test that source_defined_primary_key is used when primary_key is None."""
stream = AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=[["source_id"]],
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=None, # None configured primary key
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == ["source_id"]
def test_get_primary_keys_returns_empty_when_both_empty(self):
"""Test that empty list is returned when both primary keys are empty."""
stream = AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=[], # Empty source-defined primary key
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=[], # Empty configured primary key
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == []
def test_get_primary_keys_returns_empty_when_both_none(self):
"""Test that empty list is returned when both primary keys are None."""
stream = AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=None, # None source-defined primary key
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=None, # None configured primary key
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == []
def test_get_primary_keys_handles_composite_keys_from_source_defined(self):
"""Test that composite primary keys work correctly with source-defined fallback."""
stream = AirbyteStream(
name="test_stream",
json_schema={
"type": "object",
"properties": {"id1": {"type": "string"}, "id2": {"type": "string"}},
},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=[["id1"], ["id2"]], # Composite primary key
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=[], # Empty configured primary key
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == ["id1", "id2"]
def test_get_primary_keys_normalizes_case_for_source_defined(self):
"""Test that primary keys from source-defined are normalized to lowercase."""
stream = AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"ID": {"type": "string"}}},
supported_sync_modes=["full_refresh"],
source_defined_primary_key=[["ID"]], # Uppercase primary key
)
configured_stream = ConfiguredAirbyteStream(
stream=stream,
sync_mode="full_refresh",
destination_sync_mode="overwrite",
primary_key=[], # Empty configured primary key
)
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
provider = CatalogProvider(catalog)
result = provider.get_primary_keys("test_stream")
assert result == ["id"]