-
-
Notifications
You must be signed in to change notification settings - Fork 117
Expand file tree
/
Copy pathtest_updater.py
More file actions
87 lines (76 loc) · 2.23 KB
/
test_updater.py
File metadata and controls
87 lines (76 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from datetime import datetime
from typing import List, Union
import pytest
from taskiq import InMemoryBroker, ScheduleSource
from taskiq.cli.scheduler.run import get_all_schedules
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler
class DummySource(ScheduleSource):
def __init__(self, schedules: Union[Exception, List[ScheduledTask]]) -> None:
self.schedules = schedules
async def get_schedules(self) -> List[ScheduledTask]:
"""Return test schedules, or raise an exception."""
if isinstance(self.schedules, Exception):
raise self.schedules
return self.schedules
@pytest.mark.anyio
async def test_get_schedules_success() -> None:
"""Tests that schedules are returned correctly."""
schedules1 = [
ScheduledTask(
task_name="a",
labels={},
args=[],
kwargs={},
time=datetime.now(),
),
ScheduledTask(
task_name="b",
labels={},
args=[],
kwargs={},
time=datetime.now(),
),
]
schedules2 = [
ScheduledTask(
task_name="c",
labels={},
args=[],
kwargs={},
time=datetime.now(),
),
]
sources: List[ScheduleSource] = [
DummySource(schedules1),
DummySource(schedules2),
]
schedules = await get_all_schedules(
TaskiqScheduler(InMemoryBroker(), sources),
)
assert schedules == [
(sources[0], schedules1),
(sources[1], schedules2),
]
@pytest.mark.anyio
async def test_get_schedules_error() -> None:
"""Tests that if source returned an error, empty list will be returned."""
source1 = DummySource(
[
ScheduledTask(
task_name="a",
labels={},
args=[],
kwargs={},
time=datetime.now(),
),
],
)
source2 = DummySource(Exception("test"))
schedules = await get_all_schedules(
TaskiqScheduler(InMemoryBroker(), [source1, source2]),
)
assert schedules == [
(source1, source1.schedules),
(source2, []),
]