Skip to content

Commit 2590aae

Browse files
committed
use match statement
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent eb992c2 commit 2590aae

4 files changed

Lines changed: 44 additions & 40 deletions

File tree

packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -224,20 +224,21 @@ async def process_input_stream(
224224
request_count = 0
225225
async for request in request_iterator:
226226
request_count += 1
227-
# check whether the request is an open or append operation
228-
if request.operation is int(WindowOperation.OPEN):
229-
# create a new task for the open operation and
230-
# put the request in the task iterator
231-
await self.create_task(request)
232-
elif request.operation is int(WindowOperation.APPEND):
233-
# append the task data to the existing task
234-
# if the task does not exist, create a new task
235-
await self.send_datum_to_task(request)
236-
elif request.operation is int(WindowOperation.CLOSE):
237-
# close the current task for req
238-
await self.close_task(request)
239-
else:
240-
_LOGGER.debug(f"No operation matched for request: {request}", exc_info=True)
227+
# check whether the request is an open, append, or close operation
228+
match request.operation:
229+
case int(WindowOperation.OPEN):
230+
# create a new task for the open operation and
231+
# put the request in the task iterator
232+
await self.create_task(request)
233+
case int(WindowOperation.APPEND):
234+
# append the task data to the existing task
235+
# if the task does not exist, create a new task
236+
await self.send_datum_to_task(request)
237+
case int(WindowOperation.CLOSE):
238+
# close the current task for req
239+
await self.close_task(request)
240+
case _:
241+
_LOGGER.debug(f"No operation matched for request: {request}", exc_info=True)
241242

242243
# If there is an error in the accumulator operation, log and
243244
# then send the error to the result queue

packages/pynumaflow/pynumaflow/reducer/servicer/async_servicer.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,15 @@ async def ReduceFn(
9292
try:
9393
async for request in datum_iterator:
9494
# check whether the request is an open or append operation
95-
if request.operation is int(WindowOperation.OPEN):
96-
# create a new task for the open operation
97-
# and inserts the request data into the task
98-
await task_manager.create_task(request)
99-
elif request.operation is int(WindowOperation.APPEND):
100-
# append the task data to the existing task
101-
# if the task does not exist, it will create a new task
102-
await task_manager.append_task(request)
95+
match request.operation:
96+
case int(WindowOperation.OPEN):
97+
# create a new task for the open operation
98+
# and inserts the request data into the task
99+
await task_manager.create_task(request)
100+
case int(WindowOperation.APPEND):
101+
# append the task data to the existing task
102+
# if the task does not exist, it will create a new task
103+
await task_manager.append_task(request)
103104
except BaseException as e:
104105
_LOGGER.critical("Reduce Error", exc_info=True)
105106
# Send a context abort signal for the rpc, this is required for numa container to get

packages/pynumaflow/pynumaflow/reducestreamer/servicer/task_manager.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,15 @@ async def process_input_stream(self, request_iterator: AsyncIterable[reduce_pb2.
208208
try:
209209
async for request in request_iterator:
210210
# check whether the request is an open or append operation
211-
if request.operation is int(WindowOperation.OPEN):
212-
# create a new task for the open operation and
213-
# put the request in the task iterator
214-
await self.create_task(request)
215-
elif request.operation is int(WindowOperation.APPEND):
216-
# append the task data to the existing task
217-
# if the task does not exist, create a new task
218-
await self.send_datum_to_task(request)
211+
match request.operation:
212+
case int(WindowOperation.OPEN):
213+
# create a new task for the open operation and
214+
# put the request in the task iterator
215+
await self.create_task(request)
216+
case int(WindowOperation.APPEND):
217+
# append the task data to the existing task
218+
# if the task does not exist, create a new task
219+
await self.send_datum_to_task(request)
219220
# If there is an error in the reduce operation, log and
220221
# then send the error to the result queue
221222
except BaseException as e:

packages/pynumaflow/pynumaflow/shared/server.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,17 @@ def _run_server(
9999
)
100100

101101
# add the correct servicer to the server based on the UDF type
102-
if udf_type == UDFType.Map:
103-
map_pb2_grpc.add_MapServicer_to_server(servicer, server)
104-
elif udf_type == UDFType.Sink:
105-
sink_pb2_grpc.add_SinkServicer_to_server(servicer, server)
106-
elif udf_type == UDFType.SourceTransformer:
107-
transform_pb2_grpc.add_SourceTransformServicer_to_server(servicer, server)
108-
elif udf_type == UDFType.Source:
109-
source_pb2_grpc.add_SourceServicer_to_server(servicer, server)
110-
elif udf_type == UDFType.SideInput:
111-
sideinput_pb2_grpc.add_SideInputServicer_to_server(servicer, server)
102+
match udf_type:
103+
case UDFType.Map:
104+
map_pb2_grpc.add_MapServicer_to_server(servicer, server)
105+
case UDFType.Sink:
106+
sink_pb2_grpc.add_SinkServicer_to_server(servicer, server)
107+
case UDFType.SourceTransformer:
108+
transform_pb2_grpc.add_SourceTransformServicer_to_server(servicer, server)
109+
case UDFType.Source:
110+
source_pb2_grpc.add_SourceServicer_to_server(servicer, server)
111+
case UDFType.SideInput:
112+
sideinput_pb2_grpc.add_SideInputServicer_to_server(servicer, server)
112113

113114
# bind the server to the UDS/TCP socket
114115
server.add_insecure_port(bind_address)

0 commit comments

Comments
 (0)