Skip to content

Commit e97f870

Browse files
committed
multiproc async
Signed-off-by: Sidhant Kohli <sidhant.kohli@gmail.com>
1 parent 99b592c commit e97f870

2 files changed

Lines changed: 34 additions & 17 deletions

File tree

pynumaflow/mapper/async_multiproc_server.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,26 @@
44
import aiorun
55
import grpc
66

7-
from pynumaflow._constants import MAX_NUM_THREADS, MAX_MESSAGE_SIZE, MAP_SOCK_PATH, MAP_SERVER_INFO_FILE_PATH, \
8-
_PROCESS_COUNT, NUM_THREADS_DEFAULT, MULTIPROC_MAP_SOCK_ADDR
7+
from pynumaflow._constants import (
8+
MAX_NUM_THREADS,
9+
MAX_MESSAGE_SIZE,
10+
MAP_SERVER_INFO_FILE_PATH,
11+
_PROCESS_COUNT,
12+
NUM_THREADS_DEFAULT,
13+
MULTIPROC_MAP_SOCK_ADDR,
14+
)
915
from pynumaflow.info.server import get_metadata_env
10-
from pynumaflow.info.types import ServerInfo, MINIMUM_NUMAFLOW_VERSION, ContainerType, MAP_MODE_KEY, MapMode, \
11-
METADATA_ENVS, MULTIPROC_KEY, MULTIPROC_ENDPOINTS, Protocol
16+
from pynumaflow.info.types import (
17+
ServerInfo,
18+
MINIMUM_NUMAFLOW_VERSION,
19+
ContainerType,
20+
MAP_MODE_KEY,
21+
MapMode,
22+
METADATA_ENVS,
23+
MULTIPROC_KEY,
24+
MULTIPROC_ENDPOINTS,
25+
Protocol,
26+
)
1227
from pynumaflow.mapper._dtypes import MapAsyncCallable
1328
from pynumaflow.mapper._servicer._async_servicer import AsyncMapServicer
1429
from pynumaflow.proto.mapper import map_pb2_grpc
@@ -25,14 +40,14 @@ class AsyncMultiprocMapServer(NumaflowServer):
2540
"""
2641

2742
def __init__(
28-
self,
29-
mapper_instance: MapAsyncCallable,
30-
server_count: int = _PROCESS_COUNT,
31-
sock_path: str = MULTIPROC_MAP_SOCK_ADDR,
32-
max_message_size: int = MAX_MESSAGE_SIZE,
33-
max_threads: int = NUM_THREADS_DEFAULT,
34-
server_info_file: str = MAP_SERVER_INFO_FILE_PATH,
35-
use_tcp: bool = False,
43+
self,
44+
mapper_instance: MapAsyncCallable,
45+
server_count: int = _PROCESS_COUNT,
46+
sock_path: str = MULTIPROC_MAP_SOCK_ADDR,
47+
max_message_size: int = MAX_MESSAGE_SIZE,
48+
max_threads: int = NUM_THREADS_DEFAULT,
49+
server_info_file: str = MAP_SERVER_INFO_FILE_PATH,
50+
use_tcp: bool = False,
3651
):
3752
self.sock_path = f"unix://{sock_path}"
3853
self.max_threads = min(max_threads, MAX_NUM_THREADS)
@@ -56,9 +71,7 @@ def start(self):
5671
"""
5772
Starts the multiprocess async gRPC servers.
5873
"""
59-
_LOGGER.info(
60-
"Starting async multiprocess gRPC server with %d workers", self._process_count
61-
)
74+
_LOGGER.info("Starting async multiprocess gRPC server with %d workers", self._process_count)
6275

6376
workers = []
6477
ports = []

pynumaflow/shared/server.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,12 @@ def get_exception_traceback_str(exc) -> str:
311311
return file.getvalue().rstrip()
312312

313313

314-
async def handle_async_error(context: NumaflowServicerContext, exception: BaseException,
315-
exception_type: str, parent: bool = False):
314+
async def handle_async_error(
315+
context: NumaflowServicerContext,
316+
exception: BaseException,
317+
exception_type: str,
318+
parent: bool = False,
319+
):
316320
"""
317321
Handle exceptions for async servers by updating the context and exiting.
318322
"""

0 commit comments

Comments
 (0)