-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtasks.py
More file actions
113 lines (91 loc) · 3.31 KB
/
tasks.py
File metadata and controls
113 lines (91 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Standard Library Imports
import logging
# Third Party Imports
from fastapi import Depends, FastAPI
from fastapi.routing import APIRouter
from google.protobuf import duration_pb2
# Imports from this repository
from examples.full.serializer import Payload
from examples.full.settings import (
CLOUD_TASKS_EMULATOR_URL,
IS_LOCAL,
SCHEDULED_LOCATION_PATH,
SCHEDULED_OIDC_TOKEN,
TASK_LISTENER_BASE_URL,
TASK_OIDC_TOKEN,
TASK_QUEUE_PATH,
)
from fastapi_gcp_tasks import DelayedRouteBuilder
from fastapi_gcp_tasks.dependencies import max_retries
from fastapi_gcp_tasks.hooks import (
chained_hook,
deadline_delayed_hook,
deadline_scheduled_hook,
oidc_delayed_hook,
oidc_scheduled_hook,
)
from fastapi_gcp_tasks.scheduled_route import ScheduledRouteBuilder
from fastapi_gcp_tasks.utils import emulator_client
app = FastAPI()
logger = logging.getLogger("uvicorn")
delayed_client = None
if IS_LOCAL:
delayed_client = emulator_client(host=CLOUD_TASKS_EMULATOR_URL)
DelayedRoute = DelayedRouteBuilder(
client=delayed_client,
base_url=TASK_LISTENER_BASE_URL,
queue_path=TASK_QUEUE_PATH,
# Chain multiple hooks together
pre_create_hook=chained_hook(
# Add service account for cloud run
oidc_delayed_hook(
token=TASK_OIDC_TOKEN,
),
# Wait for half an hour
deadline_delayed_hook(duration=duration_pb2.Duration(seconds=1800)),
),
)
# No Cloud Scheduler emulator exists, so pass a dummy client when running locally
# to avoid requiring GCP credentials. The client is never used in local mode
# because the .schedule() call below is guarded by `if not IS_LOCAL`.
scheduled_client = None
if IS_LOCAL:
from google.auth.credentials import AnonymousCredentials
from google.cloud import scheduler_v1
scheduled_client = scheduler_v1.CloudSchedulerClient(credentials=AnonymousCredentials())
ScheduledRoute = ScheduledRouteBuilder(
client=scheduled_client,
base_url=TASK_LISTENER_BASE_URL,
location_path=SCHEDULED_LOCATION_PATH,
pre_create_hook=chained_hook(
# Add service account for cloud run
oidc_scheduled_hook(
token=SCHEDULED_OIDC_TOKEN,
),
# Wait for half an hour
deadline_scheduled_hook(duration=duration_pb2.Duration(seconds=1800)),
),
)
delayed_router = APIRouter(route_class=DelayedRoute, prefix="/delayed")
@delayed_router.post("/hello")
async def hello(p: Payload = Payload(message="Default")):
message = f"Hello task ran with payload: {p.message}"
logger.warning(message)
@delayed_router.post("/fail_twice", dependencies=[Depends(max_retries(2))])
async def fail_twice():
raise Exception("nooo")
scheduled_router = APIRouter(route_class=ScheduledRoute, prefix="/scheduled")
@scheduled_router.post("/timed_hello")
async def scheduled_hello(p: Payload = Payload(message="Default")):
message = f"Scheduled hello task ran with payload: {p.message}"
logger.warning(message)
return {"message": message}
# We want to schedule tasks only in a deployed environment
if not IS_LOCAL:
scheduled_hello.scheduler(
name="testing-examples-scheduled-hello",
schedule="*/5 * * * *",
time_zone="Asia/Kolkata",
).schedule(p=Payload(message="Scheduled"))
app.include_router(delayed_router)
app.include_router(scheduled_router)