1313 Datum ,
1414 _AccumulatorBuilderClass ,
1515 AccumulatorAsyncCallable ,
16- WindowOperation ,
16+ WindowOperation , AccumulatorRequest ,
1717)
1818from pynumaflow .proto .accumulator import accumulator_pb2
1919from pynumaflow .shared .asynciter import NonBlockingIterator
@@ -93,7 +93,7 @@ async def stream_send_eof(self):
9393 for unified_key in task_keys :
9494 await self .tasks [unified_key ].iterator .put (STREAM_EOF )
9595
96- async def close_task (self , req ):
96+ async def close_task (self , req : AccumulatorRequest ):
9797 """
9898 Closes a running accumulator task for a given key.
9999 Based on the request we compute the unique key, and then
@@ -121,7 +121,7 @@ async def close_task(self, req):
121121 # Put the exception in the result queue
122122 await self .global_result_queue .put (err )
123123
124- async def create_task (self , req ):
124+ async def create_task (self , req : AccumulatorRequest ):
125125 """
126126 Creates a new accumulator task for the given request.
127127 Based on the request we compute a unique key, and then
@@ -141,7 +141,7 @@ async def create_task(self, req):
141141 # Create a new result queue for the current task
142142 # We create a new result queue for each task, so that
143143 # the results of the accumulator operation can be sent to the
144- # the global result queue, which in turn sends the results
144+ # global result queue, which in turn sends the results
145145 # to the client.
146146 res_queue = NonBlockingIterator ()
147147
@@ -175,7 +175,7 @@ async def create_task(self, req):
175175 # Put the request in the iterator
176176 await curr_task .iterator .put (d )
177177
178- async def send_datum_to_task (self , req ):
178+ async def send_datum_to_task (self , req : AccumulatorRequest ):
179179 """
180180 Appends the request to the existing window reduce task.
181181 If the task does not exist, create it.
@@ -220,7 +220,7 @@ async def __invoke_accumulator(
220220 await self .global_result_queue .put (err )
221221
222222 async def process_input_stream (
223- self , request_iterator : AsyncIterable [accumulator_pb2 . AccumulatorRequest ]
223+ self , request_iterator : AsyncIterable [AccumulatorRequest ]
224224 ):
225225 # Start iterating through the request iterator and create tasks
226226 # based on the operation type received.
@@ -230,15 +230,15 @@ async def process_input_stream(
230230 request_count += 1
231231 # check whether the request is an open, append, or close operation
232232 match request .operation :
233- case int ( WindowOperation .OPEN ) :
233+ case WindowOperation .OPEN :
234234 # create a new task for the open operation and
235235 # put the request in the task iterator
236236 await self .create_task (request )
237- case int ( WindowOperation .APPEND ) :
237+ case WindowOperation .APPEND :
238238 # append the task data to the existing task
239239 # if the task does not exist, create a new task
240240 await self .send_datum_to_task (request )
241- case int ( WindowOperation .CLOSE ) :
241+ case WindowOperation .CLOSE :
242242 # close the current task for req
243243 await self .close_task (request )
244244 case _:
0 commit comments