-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathgcp_utils.py
More file actions
100 lines (85 loc) · 3.18 KB
/
gcp_utils.py
File metadata and controls
100 lines (85 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import logging
import os
from google.cloud import tasks_v2
from google.protobuf.timestamp_pb2 import Timestamp
def create_refresh_materialized_view_task():
"""
Asynchronously refresh a materialized view.
Ensures deduplication by generating a unique task name.
Returns:
dict: Response message and status code.
"""
from google.protobuf import timestamp_pb2
from datetime import datetime, timedelta
try:
logging.info("Creating materialized view refresh task.")
now = datetime.now()
# BOUNCE WINDOW: next :00 or :30
minute = now.minute
if minute < 30:
bucket_time = now.replace(minute=30, second=0, microsecond=0)
else:
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
task_name = f"refresh-materialized-view-{timestamp_str}"
# Convert to protobuf timestamp
proto_time = timestamp_pb2.Timestamp()
proto_time.FromDatetime(bucket_time)
# Cloud Tasks setup
client = tasks_v2.CloudTasksClient()
project = os.getenv("PROJECT_ID")
location = os.getenv("LOCATION")
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
url = (
f"https://{os.getenv('GCP_REGION')}-"
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
)
task_name = client.task_path(project, location, queue, task_name)
# Enqueue the task
try:
create_http_task_with_name(
client=client,
body=b"",
url=url,
project_id=project,
gcp_region=location,
queue_name=queue,
task_name=task_name,
task_time=proto_time,
http_method=tasks_v2.HttpMethod.GET,
)
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
except Exception as e:
if "ALREADY_EXISTS" in str(e):
logging.info(f"Task already exists for {timestamp_str}, skipping.")
except Exception as error:
error_msg = f"Error enqueuing task: {error}"
logging.error(error_msg)
return {"error": error_msg}, 500
def create_http_task_with_name(
client: "tasks_v2.CloudTasksClient",
body: bytes,
url: str,
project_id: str,
gcp_region: str,
queue_name: str,
task_name: str,
task_time: Timestamp,
http_method: "tasks_v2.HttpMethod",
):
"""Creates a GCP Cloud Task."""
token = tasks_v2.OidcToken(service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL"))
task = tasks_v2.Task(
name=task_name,
schedule_time=task_time,
http_request=tasks_v2.HttpRequest(
url=url,
http_method=http_method,
oidc_token=token,
body=body,
headers={"Content-Type": "application/json"},
),
)
client.create_task(parent=client.queue_path(project_id, gcp_region, queue_name), task=task)