-
Notifications
You must be signed in to change notification settings - Fork 26
fix: Graceful shutdown for all UDFs #337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 12 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
2c2886b
graceful shutdown for all UDFs
BulkBeing aa9496f
file formatting
BulkBeing 6c37cb5
fix CI
BulkBeing 72849ec
tested graceful shutdown for source
BulkBeing 3fd5398
tested graceful shutdown from sync map
BulkBeing 3de2701
fixes
BulkBeing d8c1eac
multiproc map clean shutdown
BulkBeing 18b3d87
tested all for graceful shutdown on kubectl delete
BulkBeing 0003243
minor changes
BulkBeing 4f412c2
unit tests
BulkBeing 1b4db21
fix lints
BulkBeing abecb26
more tests
BulkBeing f5f73ce
more tests, remove unused variables
BulkBeing 6b360ac
Use get_running_loop instead of get_event_loop
BulkBeing e0e2624
More documentation comments
BulkBeing File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,3 +1,7 @@ | ||||||
| import asyncio | ||||||
| import contextlib | ||||||
| import sys | ||||||
|
|
||||||
| import aiorun | ||||||
| import grpc | ||||||
|
|
||||||
|
|
@@ -8,9 +12,11 @@ | |||||
| BATCH_MAP_SOCK_PATH, | ||||||
| MAP_SERVER_INFO_FILE_PATH, | ||||||
| MAX_NUM_THREADS, | ||||||
| NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS, | ||||||
| ) | ||||||
| from pynumaflow.batchmapper._dtypes import BatchMapCallable | ||||||
| from pynumaflow.batchmapper.servicer.async_servicer import AsyncBatchMapServicer | ||||||
| from pynumaflow.info.server import write as info_server_write | ||||||
| from pynumaflow.info.types import ( | ||||||
| ServerInfo, | ||||||
| MAP_MODE_KEY, | ||||||
|
|
@@ -19,7 +25,7 @@ | |||||
| ContainerType, | ||||||
| ) | ||||||
| from pynumaflow.proto.mapper import map_pb2_grpc | ||||||
| from pynumaflow.shared.server import NumaflowServer, start_async_server | ||||||
| from pynumaflow.shared.server import NumaflowServer | ||||||
|
|
||||||
|
|
||||||
| class BatchMapAsyncServer(NumaflowServer): | ||||||
|
|
@@ -92,13 +98,17 @@ async def handler( | |||||
| ] | ||||||
|
|
||||||
| self.servicer = AsyncBatchMapServicer(handler=self.batch_mapper_instance) | ||||||
| self._error: BaseException | None = None | ||||||
|
|
||||||
| def start(self): | ||||||
| """ | ||||||
| Starter function for the Async Batch Map server, we need a separate caller | ||||||
| to the aexec so that all the async coroutines can be started from a single context | ||||||
| """ | ||||||
| aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) | ||||||
| if self._error: | ||||||
| _LOGGER.critical("Server exiting due to UDF error: %s", self._error) | ||||||
| sys.exit(1) | ||||||
|
|
||||||
| async def aexec(self): | ||||||
| """ | ||||||
|
|
@@ -108,25 +118,61 @@ async def aexec(self): | |||||
| # As the server is async, we need to create a new server instance in the | ||||||
| # same thread as the event loop so that all the async calls are made in the | ||||||
| # same context | ||||||
| # Create a new async server instance and add the servicer to it | ||||||
| server = grpc.aio.server(options=self._server_options) | ||||||
| server.add_insecure_port(self.sock_path) | ||||||
| map_pb2_grpc.add_MapServicer_to_server( | ||||||
| self.servicer, | ||||||
| server, | ||||||
| ) | ||||||
| _LOGGER.info("Starting Batch Map Server") | ||||||
|
|
||||||
| # The asyncio.Event must be created here (inside aexec) rather than in __init__, | ||||||
| # because it must be bound to the running event loop that aiorun creates. | ||||||
| # At __init__ time no event loop exists yet. | ||||||
| shutdown_event = asyncio.Event() | ||||||
| self.servicer.set_shutdown_event(shutdown_event) | ||||||
|
|
||||||
| map_pb2_grpc.add_MapServicer_to_server(self.servicer, server) | ||||||
|
|
||||||
| serv_info = ServerInfo.get_default_server_info() | ||||||
| serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Mapper] | ||||||
| # Add the MAP_MODE metadata to the server info for the correct map mode | ||||||
| serv_info.metadata[MAP_MODE_KEY] = MapMode.BatchMap | ||||||
|
|
||||||
| # Start the async server | ||||||
| await start_async_server( | ||||||
| server_async=server, | ||||||
| sock_path=self.sock_path, | ||||||
| max_threads=self.max_threads, | ||||||
| cleanup_coroutines=list(), | ||||||
| server_info_file=self.server_info_file, | ||||||
| server_info=serv_info, | ||||||
| await server.start() | ||||||
| info_server_write(server_info=serv_info, info_file=self.server_info_file) | ||||||
|
|
||||||
| _LOGGER.info( | ||||||
| "Async GRPC Server listening on: %s with max threads: %s", | ||||||
| self.sock_path, | ||||||
| self.max_threads, | ||||||
| ) | ||||||
|
|
||||||
| async def _watch_for_shutdown(): | ||||||
| """Wait for the shutdown event and stop the server with a grace period.""" | ||||||
| await shutdown_event.wait() | ||||||
| _LOGGER.info("Shutdown signal received, stopping server gracefully...") | ||||||
| # Stop accepting new requests and wait for a maximum of | ||||||
| # NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS seconds for in-flight requests to complete | ||||||
| await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS) | ||||||
|
|
||||||
| shutdown_task = asyncio.create_task(_watch_for_shutdown()) | ||||||
| try: | ||||||
| await server.wait_for_termination() | ||||||
| except asyncio.CancelledError: | ||||||
| # SIGTERM received — aiorun cancels all tasks. We must stop | ||||||
| # the gRPC server explicitly so its __del__ doesn't try to | ||||||
| # schedule a coroutine on the already-closed event loop. | ||||||
| _LOGGER.info("Received cancellation, stopping server gracefully...") | ||||||
| await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS) | ||||||
|
|
||||||
| # Propagate error so start() can exit with a non-zero code | ||||||
| self._error = self.servicer._error | ||||||
|
|
||||||
| shutdown_task.cancel() | ||||||
| with contextlib.suppress(asyncio.CancelledError): | ||||||
| await shutdown_task | ||||||
|
|
||||||
| _LOGGER.info("Stopping event loop...") | ||||||
| # We use aiorun to manage the event loop. The aiorun.run() runs | ||||||
| # forever until loop.stop() is called. If we don't stop the | ||||||
| # event loop explicitly here, the python process will not exit. | ||||||
| # It reamins stuck for 5 minutes until liveness and readiness probe | ||||||
| # fails enough times and k8s sends a SIGTERM | ||||||
| asyncio.get_event_loop().stop() | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| _LOGGER.info("Event loop stopped") | ||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this happens, then rest of the code after
exceptwon't be invoked, correct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
asyncio.CancelledErrorwill be raised when event loop shutdown or the task is cancelled explicitly. This will cause the block of code underexcept asyncio.CancelledErrorto execute. We want to ignore this exception.All other exceptions will be caught in the
BaseExceptioncatching blocks, which are categorized as critical and mostly indicate a UDF error, which we should propagate to numa.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment why we are doing so for posterity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked this part of the code again. There are no
BaseExceptioncatching here. The reason is added underCancelledErrorexception block.I was seeing something like below due to Python's GC during shutdown of the server:
I will update the comment with more details.