@@ -33,7 +33,7 @@ For creating a reducer UDF we can use two different approaches:
3333 # We are passing the init_args for the class instance.
3434 grpc_server = ReduceAsyncServer(Example, init_args = (0 ,))
3535 grpc_server.start()
36- ```
36+ ```
3737
3838- Function based reducer
3939 For the function based reducer we need to create a function of the signature
@@ -45,23 +45,23 @@ For creating a reducer UDF we can use two different approaches:
4545 - Finally we need to call the `start` method on the `ReduceAsyncServer` instance to start the reducer server.
4646 - We must ensure that no init_args or init_kwargs are passed to the `ReduceAsyncServer` instance as they are not used for function based reducers.
4747 ```python
48- from numaflow import ReduceAsyncServer
49- async def handler(keys: list[str ], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
50- counter = 0
51- interval_window = md.interval_window
52- async for _ in datums:
53- counter + = 1
54- msg = (
55- f " counter: { counter} interval_window_start: { interval_window.start} "
56- f " interval_window_end: { interval_window.end} "
57- )
58- return Messages(Message(str .encode(msg), keys = keys))
59-
60- if __name__ == " __main__" :
48+ from collections.abc import AsyncIterable
49+ from pynumaflow.reducer import ReduceAsyncServer, Datum, Message, Messages, Metadata
50+
51+ async def handler(keys: list[str ], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
52+ counter = 0
53+ interval_window = md.interval_window
54+ async for _ in datums:
55+ counter + = 1
56+ msg = (
57+ f " counter: { counter} interval_window_start: { interval_window.start} "
58+ f " interval_window_end: { interval_window.end} "
59+ )
60+ return Messages(Message(str .encode(msg), keys = keys))
61+
62+ if __name__ == " __main__" :
6163 # Here we are using the function as the reducer_instance
6264 # which will be used to invoke the handler function.
6365 grpc_server = ReduceAsyncServer(handler)
6466 grpc_server.start()
6567 ```
66-
67-
0 commit comments