Skip to content

Commit 4308a9a

Browse files
committed
feat: support shutdown_callback argument in XXXAsyncServer
Signed-off-by: Kazuki Yamamoto <yamamoto.kazuki.24@gmail.com>
1 parent 27ad810 commit 4308a9a

6 files changed

Lines changed: 18 additions & 6 deletions

File tree

packages/pynumaflow/pynumaflow/mapper/async_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(
6060
max_message_size=MAX_MESSAGE_SIZE,
6161
max_threads=NUM_THREADS_DEFAULT,
6262
server_info_file=MAP_SERVER_INFO_FILE_PATH,
63+
shutdown_callback=None,
6364
):
6465
"""
6566
Create a new grpc Asynchronous Map Server instance.
@@ -77,6 +78,7 @@ def __init__(
7778
self.max_threads = min(max_threads, MAX_NUM_THREADS)
7879
self.max_message_size = max_message_size
7980
self.server_info_file = server_info_file
81+
self.shutdwon_callback = shutdown_callback
8082

8183
self.mapper_instance = mapper_instance
8284

@@ -92,7 +94,7 @@ def start(self) -> None:
9294
Starter function for the Async server class, need a separate caller
9395
so that all the async coroutines can be started from a single context
9496
"""
95-
aiorun.run(self.aexec(), use_uvloop=True)
97+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
9698

9799
async def aexec(self) -> None:
98100
"""

packages/pynumaflow/pynumaflow/mapstreamer/async_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(
3737
max_message_size=MAX_MESSAGE_SIZE,
3838
max_threads=NUM_THREADS_DEFAULT,
3939
server_info_file=MAP_SERVER_INFO_FILE_PATH,
40+
shutdown_callback=None,
4041
):
4142
"""
4243
Create a new grpc Async Map Stream Server instance.
@@ -98,6 +99,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag
9899
self.max_threads = min(max_threads, MAX_NUM_THREADS)
99100
self.max_message_size = max_message_size
100101
self.server_info_file = server_info_file
102+
self.shutdwon_callback = shutdown_callback
101103

102104
self._server_options = [
103105
("grpc.max_send_message_length", self.max_message_size),
@@ -111,7 +113,7 @@ def start(self):
111113
Starter function for the Async Map Stream server, we need a separate caller
112114
to the aexec so that all the async coroutines can be started from a single context
113115
"""
114-
aiorun.run(self.aexec(), use_uvloop=True)
116+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
115117

116118
async def aexec(self):
117119
"""

packages/pynumaflow/pynumaflow/reducer/async_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,15 @@ def __init__(
124124
max_message_size=MAX_MESSAGE_SIZE,
125125
max_threads=NUM_THREADS_DEFAULT,
126126
server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
127+
shutdown_callback=None,
127128
):
128129
init_kwargs = init_kwargs or {}
129130
self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs)
130131
self.sock_path = f"unix://{sock_path}"
131132
self.max_message_size = max_message_size
132133
self.max_threads = min(max_threads, MAX_NUM_THREADS)
133134
self.server_info_file = server_info_file
135+
self.shutdwon_callback = shutdown_callback
134136

135137
self._server_options = [
136138
("grpc.max_send_message_length", self.max_message_size),
@@ -147,7 +149,7 @@ def start(self):
147149
_LOGGER.info(
148150
"Starting Async Reduce Server",
149151
)
150-
aiorun.run(self.aexec(), use_uvloop=True)
152+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
151153

152154
async def aexec(self):
153155
"""

packages/pynumaflow/pynumaflow/reducestreamer/async_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,15 @@ def __init__(
138138
max_message_size=MAX_MESSAGE_SIZE,
139139
max_threads=NUM_THREADS_DEFAULT,
140140
server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
141+
shutdown_callback=None,
141142
):
142143
init_kwargs = init_kwargs or {}
143144
self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs)
144145
self.sock_path = f"unix://{sock_path}"
145146
self.max_message_size = max_message_size
146147
self.max_threads = min(max_threads, MAX_NUM_THREADS)
147148
self.server_info_file = server_info_file
149+
self.shutdwon_callback = shutdown_callback
148150

149151
self._server_options = [
150152
("grpc.max_send_message_length", self.max_message_size),
@@ -161,7 +163,7 @@ def start(self):
161163
_LOGGER.info(
162164
"Starting Async Reduce Stream Server",
163165
)
164-
aiorun.run(self.aexec(), use_uvloop=True)
166+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
165167

166168
async def aexec(self):
167169
"""

packages/pynumaflow/pynumaflow/sinker/async_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def __init__(
8888
max_message_size=MAX_MESSAGE_SIZE,
8989
max_threads=NUM_THREADS_DEFAULT,
9090
server_info_file=SINK_SERVER_INFO_FILE_PATH,
91+
shutdown_callback=None,
9192
):
9293
# If the container type is fallback sink, then use the fallback sink address and path.
9394
if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
@@ -103,6 +104,7 @@ def __init__(
103104
self.max_threads = min(max_threads, MAX_NUM_THREADS)
104105
self.max_message_size = max_message_size
105106
self.server_info_file = server_info_file
107+
self.shutdwon_callback = shutdown_callback
106108

107109
self.sinker_instance = sinker_instance
108110

@@ -118,7 +120,7 @@ def start(self):
118120
Starter function for the Async server class, need a separate caller
119121
so that all the async coroutines can be started from a single context
120122
"""
121-
aiorun.run(self.aexec(), use_uvloop=True)
123+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
122124

123125
async def aexec(self):
124126
"""

packages/pynumaflow/pynumaflow/sourcer/async_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(
2929
max_message_size=MAX_MESSAGE_SIZE,
3030
max_threads=NUM_THREADS_DEFAULT,
3131
server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
32+
shutdown_callback=None,
3233
):
3334
"""
3435
Create a new grpc Async Source Server instance.
@@ -138,6 +139,7 @@ async def partitions_handler(self) -> PartitionsResponse:
138139
self.max_threads = min(max_threads, MAX_NUM_THREADS)
139140
self.max_message_size = max_message_size
140141
self.server_info_file = server_info_file
142+
self.shutdown_callback = shutdown_callback
141143

142144
self.sourcer_instance = sourcer_instance
143145

@@ -153,7 +155,7 @@ def start(self):
153155
Starter function for the Async server class, need a separate caller
154156
so that all the async coroutines can be started from a single context
155157
"""
156-
aiorun.run(self.aexec(), use_uvloop=True)
158+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
157159

158160
async def aexec(self):
159161
"""

0 commit comments

Comments
 (0)