Skip to content

Commit e1dffc0

Browse files
committed
used 30 minute bounce window
1 parent 1619824 commit e1dffc0

1 file changed

Lines changed: 43 additions & 29 deletions

File tree

  • functions-python/refresh_materialized_view/src

functions-python/refresh_materialized_view/src/main.py

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,47 +23,61 @@ def refresh_materialized_view_function(request):
2323
"""
2424
try:
2525
logging.info("Starting materialized view refresh function.")
26-
now = datetime.now(timezone.utc)
27-
delay_minutes = 5
28-
# Round up to the next 5-minute mark (bounce window)
29-
delay_target = now + timedelta(minutes=delay_minutes)
30-
bucket_time = delay_target.replace(
31-
minute=(delay_target.minute // 5) * 5, second=0, microsecond=0
32-
)
33-
34-
# Generate deduplication key based on current timestamp
35-
timestamp = bucket_time.strftime("%Y-%m-%d-%H-%M")
36-
task_name = f"refresh-materialized-view-{timestamp}"
37-
38-
# Cloud Tasks client setup
26+
now = datetime.now()
27+
28+
# BOUNCE WINDOW: next :00 or :30
29+
minute = now.minute
30+
if minute < 30:
31+
bucket_time = now.replace(minute=30, second=0, microsecond=0)
32+
else:
33+
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(
34+
hours=1
35+
)
36+
37+
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
38+
task_name = f"refresh-materialized-view-{timestamp_str}"
39+
40+
# Cloud Tasks setup
3941
client = tasks_v2.CloudTasksClient()
40-
project_id = os.getenv("PROJECT_ID")
41-
queue = os.getenv("QUEUE_NAME")
42+
project = os.getenv("PROJECT_ID")
4243
location = os.getenv("LOCATION")
43-
url = os.getenv("FUNCTION_URL")
44+
queue = os.getenv("QUEUE_NAME")
45+
url = os.getenv("FUNCTION_URL_REFRESH_MV")
4446

45-
parent = client.queue_path(project_id, location, queue)
47+
parent = client.queue_path(project, location, queue)
48+
task_name = client.task_path(project, location, queue, task_name)
4649

47-
# Schedule time fix
48-
schedule_time_pb = timestamp_pb2.Timestamp()
49-
schedule_time_pb.FromDatetime(bucket_time)
50+
# Convert to protobuf timestamp
51+
proto_time = timestamp_pb2.Timestamp()
52+
proto_time.FromDatetime(bucket_time)
5053

5154
task = {
52-
"name": client.task_path(project_id, location, queue, task_name),
55+
"name": task_name,
5356
"http_request": {
54-
"http_method": tasks_v2.HttpMethod.POST,
57+
"http_method": tasks_v2.HttpMethod.GET,
5558
"url": url,
5659
"headers": {"Content-Type": "application/json"},
57-
"body": b"", # Empty body for this task
5860
},
59-
"schedule_time": schedule_time_pb,
61+
"schedule_time": proto_time,
6062
}
6163

6264
# Enqueue the task
63-
response = client.create_task(request={"parent": parent, "task": task})
64-
logging.info(f"Task {response.name} enqueued successfully.")
65-
66-
return {"message": f"Task {response.name} enqueued successfully."}, 200
65+
try:
66+
client.create_task(request={"parent": parent, "task": task})
67+
logging.info(
68+
f"Scheduled refresh materialized view task for {timestamp_str}"
69+
)
70+
return {
71+
"message": f"Scheduled materialized view task for {timestamp_str}"
72+
}, 200
73+
except Exception as e:
74+
if "ALREADY_EXISTS" in str(e):
75+
logging.info(f"Task already exists for {timestamp_str}, skipping.")
76+
return {
77+
"message": f"Task already exists for {timestamp_str}, skipping."
78+
}, 200
79+
else:
80+
raise
6781

6882
except Exception as error:
6983
error_msg = f"Error enqueuing task: {error}"
@@ -74,7 +88,7 @@ def refresh_materialized_view_function(request):
7488
@with_db_session
7589
def refresh_materialized_view_task(request, db_session):
7690
"""
77-
Refreshes a materialized view using the CONCURRENTLY command to avoid
91+
Refreshes the materialized view using the CONCURRENTLY command to avoid
7892
table locks. This function is triggered by a Cloud Task.
7993
8094
Returns:

0 commit comments

Comments
 (0)