Skip to content

Commit d8c1eac

Browse files
committed
multiproc map clean shutdown
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 3de2701 commit d8c1eac

3 files changed

Lines changed: 24 additions & 0 deletions

File tree

packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ async def __invoke_accumulator(
213213
_ = await new_instance(request_iterator, output)
214214
# send EOF to the output stream
215215
await output.put(STREAM_EOF)
216+
except asyncio.CancelledError:
217+
# Task cancelled during shutdown (e.g. SIGTERM) — not a UDF fault.
218+
return
216219
# If there is an error in the accumulator operation, log and
217220
# then send the error to the result queue
218221
except BaseException as err:
@@ -243,6 +246,9 @@ async def process_input_stream(self, request_iterator: AsyncIterable[Accumulator
243246
case _:
244247
_LOGGER.debug(f"No operation matched for request: {request}", exc_info=True)
245248

249+
except asyncio.CancelledError:
250+
# Task cancelled during shutdown (e.g. SIGTERM) — not a UDF fault.
251+
return
246252
# If there is an error in the accumulator operation, log and
247253
# then send the error to the result queue
248254
except BaseException as e:
@@ -274,6 +280,9 @@ async def process_input_stream(self, request_iterator: AsyncIterable[Accumulator
274280

275281
# Now send STREAM_EOF to terminate the global result queue iterator
276282
await self.global_result_queue.put(STREAM_EOF)
283+
except asyncio.CancelledError:
284+
# Task cancelled during shutdown (e.g. SIGTERM) — not a UDF fault.
285+
return
277286
except BaseException as e:
278287
err_msg = f"Accumulator Streaming Error: {repr(e)}"
279288
_LOGGER.critical(err_msg, exc_info=True)

packages/pynumaflow/pynumaflow/reducer/servicer/task_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ async def __invoke_reduce(
165165
new_instance = self.__reduce_handler.create()
166166
try:
167167
msgs = await new_instance(keys, request_iterator, md)
168+
except asyncio.CancelledError:
169+
# Task cancelled during shutdown (e.g. SIGTERM) — not a UDF fault.
170+
raise
168171
except BaseException as err:
169172
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
170173
err_msg = f"ReduceError: {repr(err)}"

packages/pynumaflow/pynumaflow/shared/server.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import multiprocessing
44
import multiprocessing.synchronize
55
import os
6+
import signal
67
import socket
78
import threading
89
import traceback
@@ -191,6 +192,17 @@ def start_multiproc_server(
191192
server_info.metadata[MULTIPROC_KEY] = str(process_count)
192193
info_server_write(server_info=server_info, info_file=server_info_file)
193194

195+
# Register a SIGTERM handler so that kubectl delete triggers graceful
196+
# shutdown of all child workers via the shared multiprocessing.Event,
197+
# instead of the default abrupt kill.
198+
if shutdown_event is not None:
199+
200+
def _sigterm_handler(signum, frame):
201+
_LOGGER.info("SIGTERM received, signalling workers to shut down...")
202+
shutdown_event.set()
203+
204+
signal.signal(signal.SIGTERM, _sigterm_handler)
205+
194206
for worker in workers:
195207
worker.join()
196208

0 commit comments

Comments
 (0)