Skip to content

Commit 18b3d87

Browse files
committed
tested all for graceful shutdown on kubectl delete
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent d8c1eac commit 18b3d87

3 files changed

Lines changed: 8 additions & 4 deletions

File tree

packages/pynumaflow/pynumaflow/mapper/multiproc_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def start(self) -> None:
130130
server_info.metadata[MAP_MODE_KEY] = MapMode.UnaryMap
131131

132132
# Start the multiproc server
133-
start_multiproc_server(
133+
has_error = start_multiproc_server(
134134
max_threads=self.max_threads,
135135
servicer=self.servicer,
136136
process_count=self._process_count,
@@ -141,6 +141,6 @@ def start(self) -> None:
141141
shutdown_event=self._shutdown_event,
142142
)
143143

144-
if self._shutdown_event.is_set():
144+
if has_error:
145145
_LOGGER.critical("Server exiting due to worker error")
146146
sys.exit(1)

packages/pynumaflow/pynumaflow/shared/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ def _sigterm_handler(signum, frame):
206206
for worker in workers:
207207
worker.join()
208208

209+
# Return True if any worker exited with a non-zero code (i.e. a real error,
210+
# not a clean SIGTERM shutdown).
211+
return any(w.exitcode != 0 for w in workers)
212+
209213

210214
async def start_async_server(
211215
server_async: grpc.aio.Server,

packages/pynumaflow/pynumaflow/sourcetransformer/multiproc_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def start(self):
149149
serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
150150
ContainerType.Sourcetransformer
151151
]
152-
start_multiproc_server(
152+
has_error = start_multiproc_server(
153153
max_threads=self.max_threads,
154154
servicer=self.servicer,
155155
process_count=self._process_count,
@@ -160,6 +160,6 @@ def start(self):
160160
shutdown_event=self._shutdown_event,
161161
)
162162

163-
if self._shutdown_event.is_set():
163+
if has_error:
164164
_LOGGER.critical("Server exiting due to worker error")
165165
sys.exit(1)

0 commit comments

Comments
 (0)