-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathtestcase.py
More file actions
142 lines (122 loc) · 4.24 KB
/
testcase.py
File metadata and controls
142 lines (122 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import asyncio
import typing
from datetime import datetime, timedelta, timezone
from typing import Any
from unittest.mock import MagicMock
import anyio
import pytest
from faststream.types import SendableMessage
from freezegun import freeze_time
from taskiq import AsyncBroker
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import run_scheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
from tests import messages
@pytest.mark.anyio
class SchedulerTestcase:
test_class: Any
subj_name: str
@staticmethod
def build_taskiq_broker(broker: Any) -> AsyncBroker:
"""Build Taskiq compatible object."""
return BrokerWrapper(broker)
async def test_task(
self,
subject: str,
broker: Any,
mock: MagicMock,
event: asyncio.Event,
) -> None:
"""Base testcase."""
@broker.subscriber(subject)
async def handler(msg: str) -> None:
event.set()
mock(msg)
taskiq_broker = self.build_taskiq_broker(broker)
taskiq_broker.task(
"Hi!",
**{self.subj_name: subject},
schedule=[
{
"time": datetime.now(tz=timezone.utc),
},
],
)
async with self.test_class(broker):
task = asyncio.create_task(
run_scheduler(
SchedulerArgs(
scheduler=StreamScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)],
),
modules=[],
),
),
)
with anyio.fail_after(3.0):
await event.wait()
mock.assert_called_once_with("Hi!")
task.cancel()
@pytest.mark.parametrize(
"msg",
[
messages.message, # regular msg
messages.sync_callable_msg, # sync callable
messages.async_callable_msg, # async callable
messages.sync_generator_msg, # sync generator
messages.async_generator_msg, # async generator
messages.sync_callable_class_message, # sync callable class
messages.async_callable_class_message, # async callable class
],
)
async def test_task_multiple_schedules_by_cron(
self,
subject: str,
broker: Any,
event: asyncio.Event,
msg: None
| SendableMessage
| typing.Callable[[], SendableMessage]
| typing.Callable[[], typing.Awaitable[SendableMessage]]
| typing.Callable[[], typing.Generator[SendableMessage, None, None]]
| typing.Callable[[], typing.AsyncGenerator[SendableMessage, None]],
) -> None:
"""Test cron runs twice via StreamScheduler."""
received_message = []
@broker.subscriber(subject)
async def handler(message: str) -> None:
received_message.append(message)
event.set()
taskiq_broker = self.build_taskiq_broker(broker)
taskiq_broker.task(
msg,
**{self.subj_name: subject},
schedule=[
{
"cron": "* * * * *",
},
],
)
async with self.test_class(broker):
with freeze_time("00:00:00", tick=True) as frozen_datetime:
task = asyncio.create_task(
run_scheduler(
SchedulerArgs(
scheduler=StreamScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)],
),
modules=[],
),
),
)
await asyncio.wait_for(event.wait(), 2.0)
event.clear()
frozen_datetime.tick(timedelta(minutes=2))
await asyncio.wait_for(event.wait(), 2.0)
task.cancel()
assert received_message == [messages.message, messages.message], (
received_message
)