Skip to content

Commit 6c2cd64

Browse files
authored
fix: stop funneling all jobs to one worker (JobsProgress shared state) (#517)
* fix: revert JobsProgress to in-memory set, add PingJobMirror JobsProgress persisted in-progress jobs to .runpod_jobs.pkl under os.getcwd(); on endpoints with a network volume every worker shared one file, so occupancy accounting cross-contaminated and jobs funneled onto a single worker (#432). Restore the 1.7.10 in-memory set and feed the separate ping process via a per-worker shared-memory mirror instead. Refs SLS-314, fixes #432 * fix: ping reads in-progress job ids from injected mirror The ping process no longer touches JobsProgress; it reads the job-id snapshot from the per-worker PingJobMirror passed in at process start. Refs SLS-314 * fix: JobScaler pushes job-id snapshot to ping mirror JobScaler updates the per-worker PingJobMirror after each job is acquired or finished, so the separate ping process always sees the current in-progress job ids without shared-file state. Refs SLS-314 * fix: create and share one PingJobMirror per worker run_worker constructs a single mirror in the main process and passes it to both the ping process and the JobScaler, completing the #432 fix. Refs SLS-314 * chore: drop removed job-state-file reference from local_sim The .runpod_jobs.pkl state file no longer exists; remove its cleanup from the local_sim Makefile. Refs SLS-314 * fix: sync ping mirror inside JobsProgress, cover API mode PR #517 review (capy-ai): the API/realtime path (rp_fastapi WorkerAPI) started the ping without a mirror while tracking jobs in JobsProgress, so heartbeats sent job_id=None there. Move mirror propagation into JobsProgress.add/remove/clear via an attached mirror, so every writer path (JobScaler and rp_fastapi) stays in sync from a single place. Attach the mirror in run_worker and WorkerAPI; drop JobScaler's now-redundant job_mirror plumbing. Refs SLS-314
1 parent 79f6c12 commit 6c2cd64

11 files changed

Lines changed: 397 additions & 404 deletions

File tree

requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ click >= 8.1.7
77
colorama >= 0.4.6, < 0.4.7
88
cryptography >= 48.0.1
99
fastapi[all] >= 0.94.0
10-
filelock >= 3.19.1
1110
paramiko >= 3.3.1
1211
prettytable >= 3.16.0
1312
psutil >= 5.9.0

runpod/serverless/modules/rp_fastapi.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from .rp_handler import is_generator
1818
from .rp_job import run_job, run_job_generator
1919
from .rp_ping import Heartbeat
20-
from .worker_state import JobsProgress
20+
from .worker_state import JobsProgress, PingJobMirror
2121

2222
RUNPOD_ENDPOINT_ID = os.environ.get("RUNPOD_ENDPOINT_ID", None)
2323

@@ -184,8 +184,14 @@ def __init__(self, config: Dict[str, Any]):
184184
2. Initializes the FastAPI web server.
185185
3. Sets the handler for processing jobs.
186186
"""
187-
# Start the heartbeat thread.
188-
heartbeat.start_ping()
187+
# One per-worker mirror so the separate ping process reports the
188+
# in-progress job ids tracked here. Attaching to job_list means every
189+
# add/remove syncs it automatically.
190+
mirror = PingJobMirror()
191+
job_list.set_mirror(mirror)
192+
193+
# Start the heartbeat process.
194+
heartbeat.start_ping(mirror)
189195

190196
self.config = config
191197

runpod/serverless/modules/rp_ping.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from runpod.http_client import SyncClientSession
1313
from runpod.serverless.modules.rp_logger import RunPodLogger
14-
from runpod.serverless.modules.worker_state import WORKER_ID, JobsProgress
14+
from runpod.serverless.modules.worker_state import WORKER_ID
1515
from runpod.version import __version__ as runpod_version
1616

1717
log = RunPodLogger()
@@ -30,6 +30,9 @@ def __init__(self, pool_connections=10, retries=3) -> None:
3030
self.PING_URL = self.PING_URL.replace("$RUNPOD_POD_ID", WORKER_ID)
3131
self.PING_INTERVAL = int(os.environ.get("RUNPOD_PING_INTERVAL", 10000)) // 1000
3232

33+
# In-progress job-id snapshot, injected by the main process at start.
34+
self._mirror = None
35+
3336
# Create a new HTTP session
3437
self._session = SyncClientSession()
3538
self._session.headers.update(
@@ -52,15 +55,16 @@ def __init__(self, pool_connections=10, retries=3) -> None:
5255
self._session.mount("https://", adapter)
5356

5457
@staticmethod
55-
def process_loop(test=False):
58+
def process_loop(mirror=None, test=False):
5659
"""
5760
Static helper to run the ping loop in a separate process.
5861
Creates a new Heartbeat instance to avoid pickling issues.
5962
"""
6063
hb = Heartbeat()
64+
hb._mirror = mirror
6165
hb.ping_loop(test)
6266

63-
def start_ping(self, test=False):
67+
def start_ping(self, mirror=None, test=False):
6468
"""
6569
Sends heartbeat pings to the Runpod server in a separate process.
6670
"""
@@ -77,7 +81,7 @@ def start_ping(self, test=False):
7781
return
7882

7983
if not Heartbeat._process_started:
80-
process = Process(target=Heartbeat.process_loop, args=(test,))
84+
process = Process(target=Heartbeat.process_loop, args=(mirror, test))
8185
process.daemon = True
8286
process.start()
8387
Heartbeat._process_started = True
@@ -96,8 +100,7 @@ def _send_ping(self):
96100
"""
97101
Sends a heartbeat to the Runpod server.
98102
"""
99-
jobs = JobsProgress() # Get the singleton instance
100-
job_ids = jobs.get_job_list()
103+
job_ids = self._mirror.get() if self._mirror is not None else None
101104
ping_params = {"job_id": job_ids, "runpod_version": runpod_version}
102105

103106
try:

runpod/serverless/modules/worker_state.py

Lines changed: 78 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22
Handles getting stuff from environment variables and updating the global state like job id.
33
"""
44

5+
import multiprocessing
56
import os
67
import time
78
import uuid
8-
import pickle
9-
import tempfile
109
from typing import Any, Dict, Optional, Set
1110

12-
from filelock import FileLock
13-
1411
from .rp_logger import RunPodLogger
1512

1613

@@ -20,6 +17,8 @@
2017

2118
WORKER_ID = os.environ.get("RUNPOD_POD_ID", str(uuid.uuid4()))
2219

20+
PING_MIRROR_CAPACITY = 65536 # bytes; ample headroom for a job-id snapshot
21+
2322

2423
# ----------------------------------- Flags ---------------------------------- #
2524
IS_LOCAL_TEST = os.environ.get("RUNPOD_WEBHOOK_GET_JOB", None) is None
@@ -63,87 +62,50 @@ def __str__(self) -> str:
6362

6463

6564
# ---------------------------------------------------------------------------- #
66-
# Tracker #
65+
# Tracker #
6766
# ---------------------------------------------------------------------------- #
6867
class JobsProgress(Set[Job]):
69-
"""Track the state of current jobs in progress with persistent state."""
68+
"""Track the state of current jobs in progress (in-memory, per process)."""
7069

7170
_instance = None
72-
_STATE_DIR = os.getcwd()
73-
_STATE_FILE = os.path.join(_STATE_DIR, ".runpod_jobs.pkl")
7471

7572
def __new__(cls):
7673
if JobsProgress._instance is None:
77-
os.makedirs(cls._STATE_DIR, exist_ok=True)
7874
JobsProgress._instance = set.__new__(cls)
79-
# Initialize as empty set before loading state
8075
set.__init__(JobsProgress._instance)
81-
JobsProgress._instance._load_state()
76+
# One-way snapshot to the ping process; attached in the main
77+
# process via set_mirror(). Stays None off-Runpod and in tests.
78+
JobsProgress._instance._mirror = None
8279
return JobsProgress._instance
8380

8481
def __init__(self):
85-
# This should never clear data in a singleton
86-
# Don't call parent __init__ as it would clear the set
82+
# Singleton: never re-initialize, it would clear the set.
8783
pass
88-
84+
8985
def __repr__(self) -> str:
9086
return f"<{self.__class__.__name__}>: {self.get_job_list()}"
9187

92-
def _load_state(self):
93-
"""Load jobs state from pickle file with file locking."""
94-
try:
95-
if (
96-
os.path.exists(self._STATE_FILE)
97-
and os.path.getsize(self._STATE_FILE) > 0
98-
):
99-
with FileLock(self._STATE_FILE + '.lock'):
100-
with open(self._STATE_FILE, "rb") as f:
101-
try:
102-
loaded_jobs = pickle.load(f)
103-
# Clear current state and add loaded jobs
104-
super().clear()
105-
for job in loaded_jobs:
106-
set.add(
107-
self, job
108-
) # Use set.add to avoid triggering _save_state
109-
110-
except (EOFError, pickle.UnpicklingError):
111-
# Handle empty or corrupted file
112-
log.debug(
113-
"JobsProgress: Failed to load state file, starting with empty state"
114-
)
115-
pass
116-
117-
except FileNotFoundError:
118-
log.debug("JobsProgress: No state file found, starting with empty state")
119-
pass
120-
121-
def _save_state(self):
122-
"""Save jobs state to pickle file with atomic write and file locking."""
123-
try:
124-
# Use temporary file for atomic write
125-
with FileLock(self._STATE_FILE + '.lock'):
126-
with tempfile.NamedTemporaryFile(
127-
dir=self._STATE_DIR, delete=False, mode="wb"
128-
) as temp_f:
129-
pickle.dump(set(self), temp_f)
130-
131-
# Atomically replace the state file
132-
os.replace(temp_f.name, self._STATE_FILE)
133-
except Exception as e:
134-
log.error(f"Failed to save job state: {e}")
88+
def set_mirror(self, mirror) -> None:
89+
"""Attach a PingJobMirror that mirrors the in-progress job ids to the
90+
ping process. Every add/remove/clear then pushes the snapshot to it."""
91+
self._mirror = mirror
92+
self._notify_mirror()
93+
94+
def _notify_mirror(self) -> None:
95+
"""Push the current job-id snapshot to the attached mirror, if any."""
96+
if self._mirror is not None:
97+
self._mirror.set(self.get_job_list())
13598

13699
def clear(self) -> None:
137100
super().clear()
138-
self._save_state()
101+
self._notify_mirror()
139102

140103
def add(self, element: Any):
141104
"""
142105
Adds a Job object to the set.
143106
144-
If the added element is a string, then `Job(id=element)` is added
145-
146-
If the added element is a dict, that `Job(**element)` is added
107+
If the added element is a string, then `Job(id=element)` is added.
108+
If the added element is a dict, then `Job(**element)` is added.
147109
"""
148110
if isinstance(element, str):
149111
element = Job(id=element)
@@ -155,16 +117,15 @@ def add(self, element: Any):
155117
raise TypeError("Only Job objects can be added to JobsProgress.")
156118

157119
result = super().add(element)
158-
self._save_state()
120+
self._notify_mirror()
159121
return result
160122

161123
def remove(self, element: Any):
162124
"""
163125
Removes a Job object from the set.
164126
165-
If the element is a string, then `Job(id=element)` is removed
166-
167-
If the element is a dict, then `Job(**element)` is removed
127+
If the element is a string, then `Job(id=element)` is removed.
128+
If the element is a dict, then `Job(**element)` is removed.
168129
"""
169130
if isinstance(element, str):
170131
element = Job(id=element)
@@ -176,7 +137,7 @@ def remove(self, element: Any):
176137
raise TypeError("Only Job objects can be removed from JobsProgress.")
177138

178139
result = super().discard(element)
179-
self._save_state()
140+
self._notify_mirror()
180141
return result
181142

182143
def get(self, element: Any) -> Optional[Job]:
@@ -193,10 +154,8 @@ def get(self, element: Any) -> Optional[Job]:
193154

194155
def get_job_list(self) -> Optional[str]:
195156
"""
196-
Returns the list of job IDs as comma-separated string.
157+
Returns the list of job IDs as a comma-separated string, or None if empty.
197158
"""
198-
self._load_state()
199-
200159
if not len(self):
201160
return None
202161

@@ -207,3 +166,53 @@ def get_job_count(self) -> int:
207166
Returns the number of jobs.
208167
"""
209168
return len(self)
169+
170+
171+
# ---------------------------------------------------------------------------- #
172+
# Ping Job Mirror #
173+
# ---------------------------------------------------------------------------- #
174+
class PingJobMirror:
175+
"""
176+
One-way snapshot of in-progress job ids from the worker (main) process to
177+
the separate ping process.
178+
179+
Backed by a fixed-size shared-memory buffer created in the main process and
180+
passed to the ping process via ``Process(args=...)``. It lives only in this
181+
worker's own process tree, so it cannot be shared across workers and never
182+
touches the filesystem. All operations are best-effort and never raise into
183+
the caller (a failure here must not break job processing or kill the ping).
184+
"""
185+
186+
def __init__(self, capacity: int = PING_MIRROR_CAPACITY, ctx=None):
187+
ctx = ctx or multiprocessing
188+
self._capacity = capacity
189+
self._buffer = ctx.Array("c", capacity) # SynchronizedString with .get_lock()
190+
191+
def set(self, job_ids: Optional[str]) -> None:
192+
"""Write the current job-id snapshot. Best-effort; never raises."""
193+
try:
194+
data = (job_ids or "").encode("utf-8")
195+
limit = self._capacity - 1 # reserve a byte for the NUL terminator
196+
if len(data) > limit:
197+
data = data[:limit]
198+
cut = data.rfind(b",")
199+
if cut != -1:
200+
data = data[:cut]
201+
log.warn(
202+
f"PingJobMirror: job-id snapshot exceeded {limit} bytes; truncated"
203+
)
204+
with self._buffer.get_lock():
205+
self._buffer.value = data
206+
except Exception as err: # never break job processing
207+
log.error(f"PingJobMirror.set failed: {err}")
208+
209+
def get(self) -> Optional[str]:
210+
"""Read the current job-id snapshot. Best-effort; never raises."""
211+
try:
212+
with self._buffer.get_lock():
213+
data = self._buffer.value
214+
text = data.decode("utf-8")
215+
return text or None
216+
except Exception as err: # never kill the ping loop
217+
log.debug(f"PingJobMirror.get failed: {err}")
218+
return None

runpod/serverless/worker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,14 @@ def run_worker(config: Dict[str, Any]) -> None:
3939
# Run fitness checks before starting worker (production only)
4040
asyncio.run(run_fitness_checks())
4141

42+
# One per-worker mirror: the job tracker writes it, the ping process reads
43+
# it. Attaching to JobsProgress means every add/remove syncs automatically.
44+
from runpod.serverless.modules.worker_state import JobsProgress, PingJobMirror
45+
mirror = PingJobMirror()
46+
JobsProgress().set_mirror(mirror)
47+
4248
# Start pinging Runpod to show that the worker is alive.
43-
heartbeat.start_ping()
49+
heartbeat.start_ping(mirror)
4450

4551
# Create a JobScaler responsible for adjusting the concurrency
4652
job_scaler = rp_scale.JobScaler(config)

tests/test_serverless/local_sim/Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ worker:
99
python worker.py
1010

1111
clean:
12-
find . -type f -name ".runpod_jobs.pkl" -delete
1312
find . -type f -name "*.pyc" -delete
1413
find . -type d -name "__pycache__" -delete

0 commit comments

Comments
 (0)