Skip to content

Commit d16bc29

Browse files
authored
fix: conflicts in data extract success marking implementation
1 parent ce60aec commit d16bc29

File tree

2 files changed

+41
-14
lines changed

2 files changed

+41
-14
lines changed

data_extract/run_extract.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
# =============================================================================
44

55
import sys
6-
from datetime import datetime as dt
7-
from zoneinfo import ZoneInfo
86
import json
97
import uuid
108
from data_extract.shared.utils import (
@@ -13,6 +11,7 @@
1311
get_drive_service,
1412
check_gcs_marking,
1513
upload_to_gcs,
14+
get_target_folder_name,
1615
)
1716

1817
ARCHIVAL_BUCKET = "gs://operations-archival-bucket"
@@ -25,7 +24,7 @@ def run_extraction(target_child_folder):
2524

2625
service = get_drive_service()
2726
metadata_path = f"logs/{target_child_folder}_metadata.json"
28-
marking_path = f"status/{target_child_folder}.success"
27+
archival_marking_path = f"status/{target_child_folder}.success"
2928

3029
# Root Folder
3130
parent_query = f"name = '{PARENT_FOLDER}' and mimeType = '{MIME_TYPE}'"
@@ -54,7 +53,7 @@ def run_extraction(target_child_folder):
5453
folder_id = children[0]["id"]
5554

5655
# Deduplication Check
57-
if check_gcs_marking(ARCHIVAL_BUCKET, marking_path):
56+
if check_gcs_marking(ARCHIVAL_BUCKET, archival_marking_path):
5857
print(f"[INFO]: {target_child_folder} already processed.")
5958

6059
return 0
@@ -137,21 +136,13 @@ def run_extraction(target_child_folder):
137136
return 1
138137

139138
upload_to_gcs(ARCHIVAL_BUCKET, metadata_path, json.dumps(metadata))
140-
upload_to_gcs(ARCHIVAL_BUCKET, marking_path, "")
139+
upload_to_gcs(ARCHIVAL_BUCKET, archival_marking_path, "")
141140
return 0
142141

143142

144-
def get_target_folder_name():
145-
146-
# Uploaded folder name pattern: "operations_YYYY_MM_DD"
147-
pht_now = dt.now(ZoneInfo("Asia/Manila"))
148-
today = pht_now.strftime("%Y_%m_%d")
149-
return f"operations_{today}"
150-
151-
152143
def main():
153144

154-
target_folder = get_target_folder_name()
145+
target_folder = get_target_folder_name("operations")
155146
print(f"[INFO]: Starting extraction for {target_folder}")
156147

157148
exit_code = run_extraction(target_folder)

data_extract/shared/utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# =============================================================================
22
# EXTRACTOR UTILS
33
# =============================================================================
4+
5+
46
import pyparsing
57

68
if not hasattr(pyparsing, "DelimitedList"):
@@ -9,6 +11,9 @@
911
from googleapiclient.discovery import build
1012
from google.cloud import storage
1113
from typing import Any, TypeAlias
14+
from datetime import datetime as dt
15+
from zoneinfo import ZoneInfo
16+
1217

1318
GoogleDriveService: TypeAlias = Any
1419

@@ -80,6 +85,16 @@ def check_handshake(service: GoogleDriveService, folder_id: str) -> bool:
8085
return "file-upload=safe" in content
8186

8287

88+
def get_target_folder_name(folder_name: str):
89+
"""
90+
Creates target folder name with recent date as suffix (e.g. operations_YYYY_MM_DD).
91+
"""
92+
93+
pht_now = dt.now(ZoneInfo("Asia/Manila"))
94+
today = pht_now.strftime("%Y_%m_%d")
95+
return f"{folder_name}_{today}"
96+
97+
8398
# ------------------------------------------------------------
8499
# API UTILITIES
85100
# ------------------------------------------------------------
@@ -135,3 +150,24 @@ def upload_to_gcs(
135150
bucket = client.bucket(bucket_name)
136151
blob = bucket.blob(destination_blob_name)
137152
blob.upload_from_string(data, content_type=content_type)
153+
154+
155+
def plant_success_flag(bucket_name: str, folder_path: str):
156+
"""
157+
Uploads an empty _SUCCESS.txt file to signal the pipeline.
158+
159+
Args:
160+
bucket_name: Target GCS bucket.
161+
folder_path: The full path for the success mark
162+
"""
163+
164+
bucket_name = bucket_name.replace("gs://", "")
165+
166+
client = storage.Client()
167+
bucket = client.bucket(bucket_name)
168+
blob = bucket.blob(folder_path)
169+
170+
# Upload an empty string just to create the file
171+
blob.upload_from_string("")
172+
173+
print("[INFO] Flag planted successfully")

0 commit comments

Comments
 (0)