@@ -35,6 +35,10 @@ class SourceTransformAsyncServer(NumaflowServer):
3535 max_message_size: The max message size in bytes the server can receive and send
3636 max_threads: The max number of threads to be spawned;
3737 defaults to 4 and max capped at 16
38+ server_info_file: The path to the server info file
39+ shutdown_callback: Callable, executed after loop is stopped, before
40+ cancelling any tasks.
41+ Useful for graceful shutdown.
3842
3943
4044 Below is a simple User Defined Function example which receives a message, applies the
@@ -96,11 +100,13 @@ def __init__(
96100 max_message_size = MAX_MESSAGE_SIZE ,
97101 max_threads = NUM_THREADS_DEFAULT ,
98102 server_info_file = SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH ,
103+ shutdown_callback = None ,
99104 ):
100105 self .sock_path = f"unix://{ sock_path } "
101106 self .max_threads = min (max_threads , MAX_NUM_THREADS )
102107 self .max_message_size = max_message_size
103108 self .server_info_file = server_info_file
109+ self .shutdown_callback = shutdown_callback
104110
105111 self .source_transform_instance = source_transform_instance
106112
@@ -115,7 +121,7 @@ def start(self) -> None:
115121 Starter function for the Async server class, need a separate caller
116122 so that all the async coroutines can be started from a single context
117123 """
118- aiorun .run (self .aexec (), use_uvloop = True )
124+ aiorun .run (self .aexec (), use_uvloop = True , shutdown_callback = self . shutdown_callback )
119125
120126 async def aexec (self ) -> None :
121127 """
0 commit comments