Skip to content

Commit c2ada2d

Browse files
committed
First draft
1 parent dfded4b commit c2ada2d

2 files changed

Lines changed: 222 additions & 19 deletions

File tree

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import os
2+
import resource
3+
import shutil
4+
import logging
5+
6+
MB_MULTIPLIER = 1024**2
7+
8+
9+
def find_tmpfs_mounts():
10+
"""
11+
Returns a list of tmpfs mount points whose path contains 'in-memory',
12+
from /proc/mounts.
13+
"""
14+
tmpfs_mounts = []
15+
try:
16+
with open("/proc/mounts", "r") as f:
17+
for line in f:
18+
parts = line.split()
19+
if len(parts) >= 3 and parts[2] == "tmpfs" and "in-memory" in parts[1]:
20+
tmpfs_mounts.append(parts[1])
21+
except Exception as e:
22+
logging.error(f"Error reading /proc/mounts: {e}")
23+
return tmpfs_mounts
24+
25+
26+
def get_memory_limit_cgroup_bytes():
27+
"""
28+
Returns the memory limit for the process (in bytes) as set by cgroups, or None if not found.
29+
"""
30+
try:
31+
with open("/sys/fs/cgroup/memory/memory.limit_in_bytes", "r") as f:
32+
limit_bytes = int(f.read())
33+
# If the limit is a very large number (e.g., 2**63), treat as unlimited
34+
if limit_bytes < (2**60):
35+
return limit_bytes
36+
except Exception:
37+
pass
38+
return None
39+
40+
41+
def get_total_tmpfs_size_bytes():
42+
"""
43+
Returns the total size (in bytes) of all tmpfs mounts whose path contains 'in-memory',
44+
or None if none found or all unlimited.
45+
"""
46+
tmpfs_mounts = find_tmpfs_mounts()
47+
total_size = 0
48+
found = False
49+
for mount in tmpfs_mounts:
50+
if os.path.exists(mount):
51+
try:
52+
total, _, _ = shutil.disk_usage(mount)
53+
# If total is suspiciously large (>= 1 PB), treat as unlimited
54+
if total < 1 << 50: # Ignore unlimited mounts
55+
total_size += total
56+
found = True
57+
except Exception as e:
58+
logging.error(f"Error getting disk usage for {mount}: {e}")
59+
if found:
60+
return total_size
61+
return None
62+
63+
64+
def get_available_process_memory_bytes():
65+
"""
66+
Returns the available memory for the process in bytes:
67+
total process memory limit (cgroup) minus the total size of all tmpfs
68+
filesystems whose path contains 'in-memory'. If any value is unlimited
69+
or not found, returns None.
70+
"""
71+
mem_limit = get_memory_limit_cgroup_bytes()
72+
tmpfs_size = get_total_tmpfs_size_bytes()
73+
if mem_limit is None or tmpfs_size is None:
74+
logging.warning("Could not determine available process memory " "(limit or tmpfs size missing/unlimited).")
75+
return None
76+
available_bytes = mem_limit - tmpfs_size
77+
logging.info(
78+
"Process memory limit: %.2f MiB, total tmpfs size: %.2f MiB, available: %.2f MiB",
79+
mem_limit / MB_MULTIPLIER,
80+
tmpfs_size / MB_MULTIPLIER,
81+
available_bytes / MB_MULTIPLIER,
82+
)
83+
return available_bytes
84+
85+
86+
def limit_gcp_memory():
87+
# Debug: Log all environment variables to help troubleshoot MEMORY_LIMIT
88+
logging.info(f"All environment variables: {os.environ}")
89+
# Margin comes from env in megabytes (string), default 200 MiB
90+
memory_margin_str_mb = os.getenv("MEMORY_MARGIN_MB", "200")
91+
92+
available_memory_bytes = get_available_process_memory_bytes()
93+
if not available_memory_bytes or available_memory_bytes <= 0:
94+
logging.info("Could not find the total memory of the process.")
95+
return
96+
97+
memory_margin_mb = 200
98+
if memory_margin_str_mb:
99+
try:
100+
memory_margin_mb = int(memory_margin_str_mb)
101+
except ValueError as err:
102+
logging.error(
103+
"Invalid MEMORY_MARGIN_MB value: %s. Using default of 200MB. Error: %s",
104+
memory_margin_str_mb,
105+
err,
106+
)
107+
108+
memory_margin_bytes = memory_margin_mb * MB_MULTIPLIER if memory_margin_mb > 0 else 0
109+
logging.info(
110+
"Available memory: %.2f MiB, memory margin: %.2f MiB",
111+
available_memory_bytes / MB_MULTIPLIER,
112+
memory_margin_bytes / MB_MULTIPLIER,
113+
)
114+
mem_limit = available_memory_bytes - memory_margin_bytes
115+
if mem_limit <= 0:
116+
logging.warning(
117+
"Computed RLIMIT_AS <= 0 (%.2f MiB). Skipping setrlimit.",
118+
mem_limit / MB_MULTIPLIER,
119+
)
120+
return
121+
122+
# Set RLIMIT_AS in bytes, log the limit in MiB
123+
resource.setrlimit(resource.RLIMIT_AS, (mem_limit, mem_limit))
124+
logging.info(
125+
"RLIMIT_AS set to %.2f MiB (raw: %d bytes)",
126+
mem_limit / MB_MULTIPLIER,
127+
mem_limit,
128+
)
129+
130+
131+
if __name__ == "__main__":
132+
logging.basicConfig(level=logging.INFO)
133+
available = get_available_process_memory_bytes()
134+
if available is not None:
135+
print(f"Available process memory: {available / MB_MULTIPLIER:.2f} MiB")
136+
else:
137+
print("Could not determine available process memory.")

functions-python/batch_process_dataset/src/main.py

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import random
2222
import uuid
2323
import zipfile
24+
import shutil
2425
from dataclasses import dataclass
2526
from datetime import datetime
2627
from typing import Optional, List
@@ -31,6 +32,7 @@
3132
from sqlalchemy import func
3233
from sqlalchemy.orm import Session
3334

35+
from shared.common.gcp_memory_utils import limit_gcp_memory
3436
from shared.common.gcp_utils import create_refresh_materialized_view_task
3537
from shared.database.database import with_db_session
3638
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile, Gtfsfeed
@@ -45,6 +47,9 @@
4547

4648
init_logger()
4749

50+
# Limit the available memory of the process so if an OOM exception happens it can be handled properly by our code
51+
limit_gcp_memory()
52+
4853

4954
@dataclass
5055
class DatasetFile:
@@ -268,40 +273,101 @@ def upload_dataset(self, feed_id, public=True) -> DatasetFile or None:
268273

269274
@with_db_session
270275
def process_from_bucket(self, db_session, public=True) -> Optional[DatasetFile]:
276+
"""Process an existing dataset from the GCP bucket and update related DB entities.
277+
278+
To reduce local disk usage, we no longer unzip all files at once. Instead, we:
279+
- Download the dataset ZIP to a temporary local file.
280+
- Iterate over each member of the ZIP.
281+
- Extract a single file to a temporary path under WORKING_DIR.
282+
- Upload that file immediately to GCS and record it as a Gtfsfile.
283+
- Delete the local temporary extracted file before moving to the next one.
271284
"""
272-
Process an existing dataset from the GCP bucket updates the related database entities
273-
:return: The DatasetFile object created
274-
"""
275-
temp_file_path = None
285+
temp_zip_path = None
276286
try:
277-
temp_file_path = self.generate_temp_filename()
287+
temp_zip_path = self.generate_temp_filename()
278288
blob_file_path = f"{self.feed_stable_id}/{self.dataset_stable_id}/{self.dataset_stable_id}.zip"
279-
self.logger.info(f"Processing dataset from bucket: {blob_file_path}")
289+
self.logger.info("Processing dataset from bucket: %s", blob_file_path)
280290
download_from_gcs(
281-
os.getenv("DATASETS_BUCKET_NAME"), blob_file_path, temp_file_path
291+
os.getenv("DATASETS_BUCKET_NAME"), blob_file_path, temp_zip_path
282292
)
283293

284-
extracted_files_path = self.unzip_files(temp_file_path)
294+
# Stream files from ZIP to GCS one by one to minimize disk usage
295+
bucket = storage.Client().get_bucket(self.bucket_name)
296+
extracted_files: List[Gtfsfile] = []
297+
working_dir = os.getenv("WORKING_DIR", "/in-memory")
285298

286-
_, extracted_files = self.upload_files_to_storage(
287-
temp_file_path,
288-
self.dataset_stable_id,
289-
extracted_files_path,
290-
public=public,
291-
skip_dataset_upload=True, # Skip the upload of the dataset file
292-
)
299+
if not zipfile.is_zipfile(temp_zip_path):
300+
self.logger.error(
301+
"The downloaded file %s is not a valid ZIP file.", temp_zip_path
302+
)
303+
raise ValueError("Downloaded dataset is not a valid ZIP file.")
304+
305+
with zipfile.ZipFile(temp_zip_path, "r") as zf:
306+
for member in zf.infolist():
307+
# Skip directories
308+
if member.is_dir():
309+
continue
310+
311+
# Extract a single file to a temporary path
312+
temp_extracted_path = os.path.join(
313+
working_dir,
314+
f"{self.feed_stable_id}-{self.dataset_stable_id}-{member.filename.replace('/', '_')}",
315+
)
316+
317+
self.logger.info(
318+
"Extracting %s to %s", member.filename, temp_extracted_path
319+
)
320+
with zf.open(member, "r") as src, open(
321+
temp_extracted_path, "wb"
322+
) as dst:
323+
shutil.copyfileobj(src, dst)
324+
325+
# Upload this single file to GCS under extracted/
326+
if os.path.isfile(temp_extracted_path):
327+
target_path = f"{self.feed_stable_id}/{self.dataset_stable_id}/extracted/{member.filename}"
328+
file_blob = bucket.blob(target_path)
329+
file_blob.upload_from_filename(temp_extracted_path)
330+
if public:
331+
file_blob.make_public()
332+
self.logger.info(
333+
"Uploaded extracted file %s to %s",
334+
member.filename,
335+
file_blob.public_url,
336+
)
337+
338+
extracted_files.append(
339+
Gtfsfile(
340+
id=str(uuid.uuid4()),
341+
file_name=member.filename,
342+
file_size_bytes=os.path.getsize(temp_extracted_path),
343+
hosted_url=file_blob.public_url if public else None,
344+
hash=get_hash_from_file(temp_extracted_path),
345+
)
346+
)
347+
348+
# Remove the local temporary extracted file to free disk space
349+
try:
350+
if os.path.exists(temp_extracted_path):
351+
os.remove(temp_extracted_path)
352+
except Exception as cleanup_err:
353+
self.logger.warning(
354+
"Failed to remove temporary file %s: %s",
355+
temp_extracted_path,
356+
cleanup_err,
357+
)
293358

294359
dataset_file = DatasetFile(
295360
stable_id=self.dataset_stable_id,
296361
file_sha256_hash=self.latest_hash,
297362
hosted_url=f"{self.public_hosted_datasets_url}/{blob_file_path}",
298363
extracted_files=extracted_files,
299364
zipped_size=(
300-
os.path.getsize(temp_file_path)
301-
if os.path.exists(temp_file_path)
365+
os.path.getsize(temp_zip_path)
366+
if os.path.exists(temp_zip_path)
302367
else None
303368
),
304369
)
370+
305371
dataset, latest = self.create_dataset_entities(
306372
dataset_file, skip_dataset_creation=True, db_session=db_session
307373
)
@@ -319,8 +385,8 @@ def process_from_bucket(self, db_session, public=True) -> Optional[DatasetFile]:
319385
raise ValueError("Dataset update failed, dataset is None.")
320386
return dataset_file
321387
finally:
322-
if temp_file_path and os.path.exists(temp_file_path):
323-
os.remove(temp_file_path)
388+
if temp_zip_path and os.path.exists(temp_zip_path):
389+
os.remove(temp_zip_path)
324390

325391
def unzip_files(self, temp_file_path):
326392
extracted_files_path = os.path.join(temp_file_path.split(".")[0], "extracted")

0 commit comments

Comments
 (0)