Skip to content

Commit a7836a5

Browse files
authored
feat: Handle memory errors in batch_process_dataset (#1602)
1 parent a153148 commit a7836a5

File tree

5 files changed

+816
-264
lines changed

5 files changed

+816
-264
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import logging
2+
import os
3+
import resource
4+
import shutil
5+
import sys
6+
7+
MB_MULTIPLIER = 1024**2
8+
9+
10+
def is_filesystem_tmpfs(mount_point):
11+
"""
12+
Check if the given mount_point is a tmpfs filesystem in /proc/mounts.
13+
14+
Args:
15+
mount_point: The mount point path to check (e.g., "/tmp/in-memory")
16+
17+
Returns:
18+
True if mount_point is found as a tmpfs filesystem,
19+
False if mount_point is found but is not tmpfs,
20+
None if undetermined (e.g., not on Linux, /proc/mounts unreadable, or mount_point not found).
21+
"""
22+
23+
# Check if we're on Linux (only Linux has /proc/mounts for tmpfs detection)
24+
if not os.path.exists("/proc/mounts"):
25+
logging.debug(f"Not on Linux (platform: {sys.platform}). tmpfs detection not available.")
26+
return None
27+
28+
try:
29+
with open("/proc/mounts", "r") as f:
30+
for line in f:
31+
parts = line.split()
32+
if len(parts) >= 3 and parts[1] == mount_point:
33+
if parts[2] == "tmpfs":
34+
return True
35+
else:
36+
logging.warning(f"{mount_point} is not a tmpfs filesystem (found: {parts[2]})")
37+
return False
38+
except Exception as e:
39+
logging.error(f"Error reading /proc/mounts: {e}")
40+
return None
41+
logging.warning(f"{mount_point} not found in /proc/mounts")
42+
return False
43+
44+
45+
def get_memory_limit_cgroup_bytes():
46+
"""
47+
Returns the memory limit for the process (in bytes) as set by cgroups, or None if not found.
48+
Note that this value includes any in-memory file systems.
49+
Tries cgroup v1 first, then falls back to cgroup v2.
50+
"""
51+
# cgroup v1
52+
try:
53+
with open("/sys/fs/cgroup/memory/memory.limit_in_bytes", "r") as f:
54+
limit_bytes = int(f.read())
55+
if limit_bytes < (2**60):
56+
return limit_bytes
57+
except Exception as e:
58+
logging.error("cgroup v1 memory limit not available: %s", e)
59+
60+
# cgroup v2 fallback: memory.max contains either a byte limit or the
61+
# string "max" when no limit is set. Skip "max" since we need a concrete value.
62+
try:
63+
with open("/sys/fs/cgroup/memory.max", "r") as f:
64+
value = f.read().strip()
65+
if value != "max":
66+
return int(value)
67+
except Exception as e:
68+
logging.error("cgroup v2 memory limit not available: %s", e)
69+
70+
return None
71+
72+
73+
def get_tmpfs_size_bytes(mount_point):
74+
"""
75+
Returns the size (in bytes) of the tmpfs at the given mount_point,
76+
or None if not found or not a tmpfs.
77+
78+
Args:
79+
mount_point: The mount point path to check (e.g., "/tmp/in-memory")
80+
"""
81+
if is_filesystem_tmpfs(mount_point) is not True:
82+
return None
83+
try:
84+
total, _, _ = shutil.disk_usage(mount_point)
85+
return total
86+
except Exception as e:
87+
logging.error(f"Error getting disk usage for {mount_point}: {e}")
88+
return None
89+
90+
91+
def get_available_process_memory_bytes(mount_point):
92+
"""
93+
Returns the available memory for the process in bytes:
94+
total process memory limit (cgroup) minus the size of the tmpfs
95+
filesystem at the given mount point.
96+
97+
Args:
98+
mount_point: The tmpfs mount point path (e.g., "/tmp/in-memory")
99+
100+
Returns:
101+
Available process memory in bytes, or None if not determinable.
102+
"""
103+
mem_limit = get_memory_limit_cgroup_bytes()
104+
if mem_limit is None:
105+
logging.warning("Could not determine cgroup memory limit.")
106+
return None
107+
tmpfs_size = get_tmpfs_size_bytes(mount_point)
108+
if tmpfs_size is None:
109+
logging.warning("Could not determine tmpfs size for %s. Using full cgroup limit.", mount_point)
110+
tmpfs_size = 0
111+
available_bytes = mem_limit - tmpfs_size
112+
logging.info(
113+
"Process memory limit: %.2f MiB, total tmpfs size: %.2f MiB, available: %.2f MiB",
114+
mem_limit / MB_MULTIPLIER,
115+
tmpfs_size / MB_MULTIPLIER,
116+
available_bytes / MB_MULTIPLIER,
117+
)
118+
return available_bytes
119+
120+
121+
def limit_gcp_memory(mount_point):
122+
"""
123+
Set memory limits for the process to prevent OOM kills in GCP Cloud Run/Functions.
124+
125+
In GCP containerized environments, the cgroup memory limit includes both process memory
126+
and tmpfs (in-memory filesystem) usage. To prevent the kernel from OOM-killing the process,
127+
this function:
128+
1. Calculates available process memory (cgroup limit - tmpfs size)
129+
2. Subtracts a safety margin (default 200 MiB)
130+
3. Sets RLIMIT_AS to this value
131+
132+
This causes Python to raise MemoryError before hitting the cgroup hard limit.
133+
The safety margin reserves enough headroom so that after MemoryError is raised,
134+
Python can still allocate the memory needed to unwind the stack, run exception
135+
handlers, log the error, send an HTTP response, and shut down gracefully.
136+
137+
Args:
138+
mount_point: The tmpfs mount point path to check (e.g., "/tmp/in-memory")
139+
140+
Environment Variables:
141+
MEMORY_MARGIN_MB: Safety margin in megabytes (default: 200)
142+
"""
143+
# Calculate available memory: cgroup limit - tmpfs size
144+
available_memory_bytes = get_available_process_memory_bytes(mount_point)
145+
if not available_memory_bytes or available_memory_bytes <= 0:
146+
logging.info("Could not find the total memory of the process. Memory limit not set.")
147+
return
148+
149+
# Parse and validate the memory margin
150+
memory_margin_mb = 200
151+
# Get the memory margin from environment variable (default: 200 MiB)
152+
memory_margin_str_mb = os.getenv("MEMORY_MARGIN_MB", "200")
153+
154+
if memory_margin_str_mb:
155+
try:
156+
memory_margin_mb = int(memory_margin_str_mb)
157+
except ValueError as err:
158+
logging.error(
159+
"Invalid MEMORY_MARGIN_MB value: %s. Using default of 200MB. Error: %s",
160+
memory_margin_str_mb,
161+
err,
162+
)
163+
164+
# Convert margin to bytes and calculate final memory limit
165+
memory_margin_bytes = memory_margin_mb * MB_MULTIPLIER if memory_margin_mb > 0 else 0
166+
logging.info(
167+
"Available memory: %.2f MiB, memory margin: %d MiB",
168+
available_memory_bytes / MB_MULTIPLIER,
169+
memory_margin_mb,
170+
)
171+
172+
# Subtract safety margin so Python has breathing room to handle MemoryError
173+
# (stack unwinding, logging, sending HTTP response, graceful shutdown)
174+
mem_limit = available_memory_bytes - memory_margin_bytes
175+
if mem_limit <= 0:
176+
logging.warning(
177+
"Computed RLIMIT_AS <= 0 (%.2f MiB). Skipping setrlimit.",
178+
mem_limit / MB_MULTIPLIER,
179+
)
180+
return
181+
182+
# Set RLIMIT_AS (address space limit) to prevent OOM kills
183+
# When this limit is exceeded, Python will raise MemoryError instead of being killed
184+
resource.setrlimit(resource.RLIMIT_AS, (mem_limit, mem_limit))
185+
logging.info("RLIMIT_AS set to %.2f MiB", mem_limit / MB_MULTIPLIER)

api/src/shared/database/database.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from shared.database_gen.sqlacodegen_models import (
1111
Base,
1212
Feed,
13+
Gtfsdataset,
1314
Gtfsfeed,
1415
Gtfsrealtimefeed,
1516
Gbfsversion,
@@ -94,6 +95,9 @@ def configure_polymorphic_mappers():
9495
Validationreport: [
9596
Validationreport.notices, # notice_validation_report_id_fkey
9697
],
98+
Gtfsdataset: [
99+
Gtfsdataset.gtfsfiles,
100+
],
97101
}
98102

99103

0 commit comments

Comments
 (0)