Skip to content

Commit e669fe4

Browse files
committed
feat: add async source transformer
Signed-off-by: Sidhant Kohli <sidhant.kohli@gmail.com>
1 parent 157a90d commit e669fe4

3 files changed

Lines changed: 437 additions & 0 deletions

File tree

pynumaflow/sourcetransformer/_dtypes.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from dataclasses import dataclass
44
from datetime import datetime
55
from typing import TypeVar, Callable, Union, Optional
6+
from collections.abc import Awaitable
67
from warnings import warn
78

89
from pynumaflow._constants import DROP
@@ -210,3 +211,9 @@ def handler(self, keys: list[str], datum: Datum) -> Messages:
210211
# SourceTransformCallable is the type of the handler function for the
211212
# Source Transformer UDFunction.
212213
SourceTransformCallable = Union[SourceTransformHandler, SourceTransformer]
214+
215+
216+
# SourceTransformAsyncCallable is a callable which can be used as a handler
217+
# for the Asynchronous Transformer UDF
218+
SourceTransformHandlerAsyncHandlerCallable = Callable[[list[str], Datum], Awaitable[Messages]]
219+
SourceTransformAsyncCallable = Union[SourceTransformer, SourceTransformHandlerAsyncHandlerCallable]
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import aiorun
2+
import grpc
3+
4+
from pynumaflow._constants import (
5+
NUM_THREADS_DEFAULT,
6+
MAX_MESSAGE_SIZE,
7+
MAX_NUM_THREADS,
8+
SOURCE_TRANSFORMER_SOCK_PATH,
9+
SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
10+
)
11+
from pynumaflow.info.types import (
12+
ServerInfo,
13+
MINIMUM_NUMAFLOW_VERSION,
14+
ContainerType,
15+
)
16+
from pynumaflow.proto.sourcetransformer import transform_pb2_grpc
17+
from pynumaflow.shared.server import (
18+
NumaflowServer,
19+
start_async_server,
20+
)
21+
from pynumaflow.sourcetransformer._dtypes import SourceTransformAsyncCallable
22+
from pynumaflow.sourcetransformer.servicer._async_servicer import SourceTransformAsyncServicer
23+
24+
25+
class SourceTransformAsyncServer(NumaflowServer):
26+
"""
27+
Create a new grpc Source Transformer Server instance.
28+
A new servicer instance is created and attached to the server.
29+
The server instance is returned.
30+
Args:
31+
source_transform_instance: The source transformer instance to be used for
32+
Source Transformer UDF
33+
sock_path: The UNIX socket path to be used for the server
34+
max_message_size: The max message size in bytes the server can receive and send
35+
max_threads: The max number of threads to be spawned;
36+
defaults to 4 and max capped at 16
37+
38+
Example Invocation:
39+
40+
import datetime
41+
import logging
42+
43+
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer
44+
# This is a simple User Defined Function example which receives a message,
45+
# applies the following
46+
# data transformation, and returns the message.
47+
# If the message event time is before year 2022, drop the message with event time unchanged.
48+
# If it's within year 2022, update the tag to "within_year_2022" and
49+
# update the message event time to Jan 1st 2022.
50+
# Otherwise, (exclusively after year 2022), update the tag to
51+
# "after_year_2022" and update the
52+
# message event time to Jan 1st 2023.
53+
54+
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
55+
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
56+
57+
58+
async def my_handler(keys: list[str], datum: Datum) -> Messages:
59+
val = datum.value
60+
event_time = datum.event_time
61+
messages = Messages()
62+
63+
if event_time < january_first_2022:
64+
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
65+
messages.append(Message.to_drop(event_time))
66+
elif event_time < january_first_2023:
67+
logging.info(
68+
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
69+
event_time,
70+
)
71+
messages.append(
72+
Message(value=val, event_time=january_first_2022,
73+
tags=["within_year_2022"])
74+
)
75+
else:
76+
logging.info(
77+
"Got event time:%s, it is after year 2022, so forwarding to
78+
after_year_2022", event_time
79+
)
80+
messages.append(Message(value=val, event_time=january_first_2023,
81+
tags=["after_year_2022"]))
82+
83+
return messages
84+
85+
86+
if __name__ == "__main__":
87+
grpc_server = SourceTransformAsyncServer(my_handler)
88+
grpc_server.start()
89+
"""
90+
91+
def __init__(
92+
self,
93+
source_transform_instance: SourceTransformAsyncCallable,
94+
sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
95+
max_message_size=MAX_MESSAGE_SIZE,
96+
max_threads=NUM_THREADS_DEFAULT,
97+
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
98+
):
99+
"""
100+
Create a new grpc Asynchronous Map Server instance.
101+
A new servicer instance is created and attached to the server.
102+
The server instance is returned.
103+
Args:
104+
mapper_instance: The mapper instance to be used for Map UDF
105+
sock_path: The UNIX socket path to be used for the server
106+
max_message_size: The max message size in bytes the server can receive and send
107+
max_threads: The max number of threads to be spawned;
108+
defaults to 4 and max capped at 16
109+
"""
110+
self.sock_path = f"unix://{sock_path}"
111+
self.max_threads = min(max_threads, MAX_NUM_THREADS)
112+
self.max_message_size = max_message_size
113+
self.server_info_file = server_info_file
114+
115+
self.source_transform_instance = source_transform_instance
116+
117+
self._server_options = [
118+
("grpc.max_send_message_length", self.max_message_size),
119+
("grpc.max_receive_message_length", self.max_message_size),
120+
]
121+
self.servicer = SourceTransformAsyncServicer(handler=source_transform_instance)
122+
123+
def start(self) -> None:
124+
"""
125+
Starter function for the Async server class, need a separate caller
126+
so that all the async coroutines can be started from a single context
127+
"""
128+
aiorun.run(self.aexec(), use_uvloop=True)
129+
130+
async def aexec(self) -> None:
131+
"""
132+
Starts the Async gRPC server on the given UNIX socket with
133+
given max threads.
134+
"""
135+
136+
# As the server is async, we need to create a new server instance in the
137+
# same thread as the event loop so that all the async calls are made in the
138+
# same context
139+
140+
server_new = grpc.aio.server(options=self._server_options)
141+
server_new.add_insecure_port(self.sock_path)
142+
transform_pb2_grpc.add_SourceTransformServicer_to_server(self.servicer, server_new)
143+
144+
serv_info = ServerInfo.get_default_server_info()
145+
serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
146+
ContainerType.Sourcetransformer
147+
]
148+
149+
# Start the async server
150+
await start_async_server(
151+
server_async=server_new,
152+
sock_path=self.sock_path,
153+
max_threads=self.max_threads,
154+
cleanup_coroutines=list(),
155+
server_info_file=self.server_info_file,
156+
server_info=serv_info,
157+
)

0 commit comments

Comments
 (0)