-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathtest_job_tracker.py
More file actions
71 lines (53 loc) · 2.2 KB
/
test_job_tracker.py
File metadata and controls
71 lines (53 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from typing import List
from unittest import TestCase
import pytest
from airbyte_cdk.sources.declarative.async_job.job_tracker import (
ConcurrentJobLimitReached,
JobTracker,
)
_LIMIT = 3
class JobTrackerTest(TestCase):
def setUp(self) -> None:
self._tracker = JobTracker(_LIMIT)
def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None:
intents = self._reach_limit()
with pytest.raises(ConcurrentJobLimitReached):
self._tracker.try_to_get_intent()
self._tracker.remove_job(intents[0])
assert self._tracker.try_to_get_intent()
def test_given_job_does_not_exist_when_remove_job_then_do_not_raise(self) -> None:
self._tracker.remove_job("non existing job id")
def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> None:
intents = [self._tracker.try_to_get_intent() for i in range(_LIMIT)]
with pytest.raises(ConcurrentJobLimitReached):
self._tracker.try_to_get_intent()
self._tracker.add_job(intents[0], "a created job")
with pytest.raises(ConcurrentJobLimitReached):
self._tracker.try_to_get_intent()
def _reach_limit(self) -> List[str]:
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]
@pytest.mark.parametrize("limit", [-1, 0])
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
tracker = JobTracker(limit)
assert tracker._limit == 1
@pytest.mark.parametrize(
("limit", "config", "expected_limit"),
[
("2", {}, 2),
(
"{{ config['max_concurrent_async_job_count'] }}",
{"max_concurrent_async_job_count": 2},
2,
),
],
)
def test_given_limit_as_string_when_init_then_interpolate_correctly(limit, config, expected_limit):
tracker = JobTracker(limit, config)
assert tracker._limit == expected_limit
def test_given_interpolated_limit_and_empty_config_when_init_then_set_to_1():
tracker = JobTracker(
"{{ config['max_concurrent_async_job_count'] }}",
{"max_concurrent_async_job_count": "hello"},
)
assert tracker._limit == 1