-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathinitializator.py
More file actions
121 lines (94 loc) · 3.28 KB
/
initializator.py
File metadata and controls
121 lines (94 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import copy
from collections.abc import Awaitable, Callable, Mapping
from typing import Any
from fastapi import FastAPI, Request
from starlette.requests import HTTPConnection
from taskiq import AsyncBroker, TaskiqEvents, TaskiqState
from taskiq.cli.utils import import_object
def startup_event_generator(
broker: AsyncBroker,
app_or_path: str | FastAPI,
) -> Callable[[TaskiqState], Awaitable[None]]:
"""
Generate shutdown event.
This function takes FastAPI application path
and runs startup event on broker's startup.
:param broker: current broker.
:param app_path: fastapi application path.
:returns: startup handler.
"""
async def startup(state: TaskiqState) -> None:
if not broker.is_worker_process:
return
if isinstance(app_or_path, str):
app = import_object(app_or_path)
else:
app = app_or_path
if not isinstance(app, FastAPI):
app = app()
if not isinstance(app, FastAPI):
raise ValueError(f"'{app_or_path}' is not a FastAPI application.")
state.fastapi_app = app
state.lf_ctx = app.router.lifespan_context(app)
asgi_state = await state.lf_ctx.__aenter__()
populate_dependency_context(broker, app, asgi_state)
return startup
def shutdown_event_generator(
broker: AsyncBroker,
) -> Callable[[TaskiqState], Awaitable[None]]:
"""
Generate shutdown event.
This function takes FastAPI application
and runs shutdown event on broker's shutdown.
:param broker: current broker.
:return: shutdown event handler.
"""
async def shutdown(state: TaskiqState) -> None:
if not broker.is_worker_process:
return
await state.lf_ctx.__aexit__(None, None, None)
return shutdown
def init(broker: AsyncBroker, app_or_path: str | FastAPI) -> None:
"""
Add taskiq startup events.
This is the main function to integrate FastAPI
with taskiq.
It creates startup events for broker. So
in worker processes all fastapi
startup events will run.
:param broker: current broker to use.
:param app_path: path to fastapi application.
"""
broker.add_event_handler(
TaskiqEvents.WORKER_STARTUP,
startup_event_generator(broker, app_or_path),
)
broker.add_event_handler(
TaskiqEvents.WORKER_SHUTDOWN,
shutdown_event_generator(broker),
)
def populate_dependency_context(
broker: AsyncBroker,
app: FastAPI,
asgi_state: Mapping[str, Any] | None = None,
) -> None:
"""
Populate dependency context.
This function injects the Request and HTTPConnection
into the broker's dependency context.
It may be need to be called manually if you are using InMemoryBroker.
:param broker: current broker to use.
:param app: current application.
:param asgi_state: state that will be injected in request.
"""
asgi_state = asgi_state or {}
broker.dependency_overrides.update(
{
Request: lambda: Request(
scope={"app": app, "type": "http", "state": copy.copy(asgi_state)},
),
HTTPConnection: lambda: HTTPConnection(
scope={"app": app, "type": "http", "state": copy.copy(asgi_state)},
),
},
)