-
-
Notifications
You must be signed in to change notification settings - Fork 116
Expand file tree
/
Copy pathevents_example_annot.py
More file actions
69 lines (50 loc) · 1.83 KB
/
events_example_annot.py
File metadata and controls
69 lines (50 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
from typing import Annotated
from redis.asyncio import ConnectionPool, Redis # type: ignore
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend
from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState
# To run this example, please install:
# * taskiq
# * taskiq-redis
# * taskiq-aio-pika
broker = AioPikaBroker(
"amqp://localhost",
).with_result_backend(RedisAsyncResultBackend("redis://localhost"))
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
# Here we store connection pool on startup for later use.
state.redis = ConnectionPool.from_url("redis://localhost/1")
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def shutdown(state: TaskiqState) -> None:
# Here we close our pool on shutdown event.
await state.redis.disconnect()
@broker.task
async def get_val(
key: str,
context: Annotated[Context, TaskiqDepends()],
) -> str | None:
# Now we can use our pool.
redis = Redis(connection_pool=context.state.redis, decode_responses=True)
return await redis.get(key)
@broker.task
async def set_val(
key: str,
value: str,
context: Annotated[Context, TaskiqDepends()],
) -> None:
# Now we can use our pool to set value.
await Redis(connection_pool=context.state.redis).set(key, value)
async def main() -> None:
await broker.startup()
set_task = await set_val.kiq("key", "value")
set_result = await set_task.wait_result(with_logs=True)
if set_result.is_err:
print(set_result.log)
raise ValueError("Cannot set value in redis. See logs.")
get_task = await get_val.kiq("key")
get_res = await get_task.wait_result()
print(f"Got redis value: {get_res.return_value}")
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())