-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathtest_file_based_source_concurrency.py
More file actions
66 lines (53 loc) · 2.02 KB
/
Copy pathtest_file_based_source_concurrency.py
File metadata and controls
66 lines (53 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock, patch
import pytest
from airbyte_cdk.sources.file_based.file_based_source import (
MAX_CONCURRENCY,
FileBasedSource,
)
class _ConcreteFBSource(FileBasedSource):
"""Minimal concrete subclass so we can instantiate FileBasedSource."""
_concurrency_level = None
@property
def name(self) -> str:
return "test-source"
def check_connection(self, logger, config):
return True, None
def streams(self, config):
return []
@pytest.mark.parametrize(
"concurrency_level, expected_num_workers, expected_initial_partitions",
[
pytest.param(None, MAX_CONCURRENCY, MAX_CONCURRENCY // 2, id="none_uses_max"),
pytest.param(100, 100, 50, id="default_concurrency"),
pytest.param(20, 20, 10, id="reduced_concurrency"),
pytest.param(2, 2, 1, id="minimal_concurrency"),
pytest.param(200, MAX_CONCURRENCY, MAX_CONCURRENCY // 2, id="capped_at_max"),
],
)
def test_concurrency_level_controls_thread_pool_size(
concurrency_level, expected_num_workers, expected_initial_partitions
):
_ConcreteFBSource._concurrency_level = concurrency_level
with patch(
"airbyte_cdk.sources.file_based.file_based_source.ConcurrentSource.create"
) as mock_create:
mock_create.return_value = MagicMock()
try:
_ConcreteFBSource(
stream_reader=MagicMock(),
spec_class=MagicMock(),
catalog=None,
config=None,
state=None,
)
except Exception:
pass # Other init errors are fine; we only care about the ConcurrentSource.create call
mock_create.assert_called_once()
call_args = mock_create.call_args
actual_num_workers = call_args[0][0]
actual_initial_partitions = call_args[0][1]
assert actual_num_workers == expected_num_workers
assert actual_initial_partitions == expected_initial_partitions