Skip to content

Commit ea5576c

Browse files
srao12srao12
authored andcommitted
fix: lint
Signed-off-by: srao12 <Shrivardhan_Rao@intuit.com>
1 parent 35156ba commit ea5576c

9 files changed

Lines changed: 84 additions & 112 deletions

File tree

examples/accumulator/streamsorter/example.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import os
33
from collections.abc import AsyncIterable
44
from datetime import datetime
5-
from typing import List
65

76
from pynumaflow import setup_logging
87
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
@@ -21,7 +20,7 @@ class StreamSorter(Accumulator):
2120
def __init__(self):
2221
_LOGGER.info("StreamSorter initialized")
2322
self.latest_wm = datetime.fromtimestamp(-1)
24-
self.sorted_buffer: List[Datum] = []
23+
self.sorted_buffer: list[Datum] = []
2524

2625
async def handler(
2726
self,

pynumaflow/accumulator/_dtypes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,10 @@ def id(self) -> str:
362362
@classmethod
363363
def from_datum(cls, datum: Datum):
364364
"""Create a Message instance from a Datum object.
365-
365+
366366
Args:
367367
datum: The Datum object to convert
368-
368+
369369
Returns:
370370
Message: A new Message instance with data from the datum
371371
"""
@@ -375,7 +375,7 @@ def from_datum(cls, datum: Datum):
375375
watermark=datum.watermark,
376376
event_time=datum.event_time,
377377
headers=datum.headers,
378-
id=datum.id
378+
id=datum.id,
379379
)
380380

381381

pynumaflow/accumulator/servicer/async_servicer.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
from collections.abc import AsyncIterable
33
from typing import Union
4-
import logging
54

65
from google.protobuf import empty_pb2 as _empty_pb2
76

@@ -31,7 +30,7 @@ async def datum_generator(
3130
slot=d.operation.keyedWindow.slot,
3231
keys=list(d.operation.keyedWindow.keys),
3332
)
34-
33+
3534
accumulator_request = AccumulatorRequest(
3635
operation=d.operation.event,
3736
keyed_window=keyed_window, # Use the new parameter name
@@ -105,23 +104,16 @@ async def AccumulateFn(
105104
# This is a special message that indicates the end of the processing for a window
106105
# When we get this message, we send an EOF message to the client
107106
try:
108-
logging.info("[ACCUMULATOR_DEBUG] Starting to read from consumer")
109107
async for msg in consumer:
110-
logging.info(f"[ACCUMULATOR_DEBUG] Received message type: {type(msg)}")
111108
# If the message is an exception, we raise the exception
112109
if isinstance(msg, BaseException):
113-
logging.info(f"[ACCUMULATOR_DEBUG] Found exception: {msg}")
114-
logging.info(f"[ACCUMULATOR_DEBUG] Calling handle_async_error with exception: {repr(msg)}")
115110
await handle_async_error(context, msg, ERR_UDF_EXCEPTION_STRING)
116-
logging.info(f"[ACCUMULATOR_DEBUG] Returning after handle_async_error")
117111
return
118112
# Send window EOF response or Window result response
119113
# back to the client
120114
else:
121-
logging.info(f"[ACCUMULATOR_DEBUG] Yielding message: {msg}")
122115
yield msg
123116
except BaseException as e:
124-
logging.info(f"[ACCUMULATOR_DEBUG] Caught exception in try block: {e}")
125117
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
126118
return
127119
# Wait for the process_input_stream task to finish for a clean exit

pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ async def create_task(self, req):
167167

168168
# Save the result of the reduce operation to the task list
169169
self.tasks[unified_key] = curr_task
170-
170+
171171
# Increment expected EOF count since we created a new task
172172
async with self._eof_count_lock:
173173
self._expected_eof_count += 1
@@ -277,7 +277,7 @@ async def process_input_stream(
277277
# Wait for all tasks to send their EOF responses before terminating the stream
278278
# This ensures proper ordering: all messages -> all EOF responses -> STREAM_EOF
279279
await self._stream_termination_event.wait()
280-
280+
281281
# Now send STREAM_EOF to terminate the global result queue iterator
282282
await self.global_result_queue.put(STREAM_EOF)
283283
except BaseException as e:
@@ -309,17 +309,17 @@ async def write_to_global_queue(
309309
event_time_pb = timestamp_pb2.Timestamp()
310310
if msg.event_time is not None:
311311
event_time_pb.FromDatetime(msg.event_time)
312-
312+
313313
watermark_pb = timestamp_pb2.Timestamp()
314314
if msg.watermark is not None:
315315
watermark_pb.FromDatetime(msg.watermark)
316-
316+
317317
start_dt_pb = timestamp_pb2.Timestamp()
318318
start_dt_pb.FromDatetime(datetime.fromtimestamp(0))
319-
319+
320320
end_dt_pb = timestamp_pb2.Timestamp()
321321
end_dt_pb.FromDatetime(wm)
322-
322+
323323
res = accumulator_pb2.AccumulatorResponse(
324324
payload=accumulator_pb2.Payload(
325325
keys=msg.keys,
@@ -339,18 +339,18 @@ async def write_to_global_queue(
339339
# send EOF
340340
start_eof_pb = timestamp_pb2.Timestamp()
341341
start_eof_pb.FromDatetime(datetime.fromtimestamp(0))
342-
342+
343343
end_eof_pb = timestamp_pb2.Timestamp()
344344
end_eof_pb.FromDatetime(wm)
345-
345+
346346
res = accumulator_pb2.AccumulatorResponse(
347347
window=accumulator_pb2.KeyedWindow(
348348
start=start_eof_pb, end=end_eof_pb, slot="slot-0", keys=task.keys
349349
),
350350
EOF=True,
351351
)
352352
await output_queue.put(res)
353-
353+
354354
# Increment received EOF count and check if all tasks are done
355355
async with self._eof_count_lock:
356356
self._received_eof_count += 1

tests/accumulator/test_async_accumulator.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@ def request_generator(count, request, resetkey: bool = False):
3333
# Clear previous keys and add new ones
3434
del request.payload.keys[:]
3535
request.payload.keys.extend([f"key-{i}"])
36-
36+
3737
# Set operation based on index - first is OPEN, rest are APPEND
3838
if i == 0:
3939
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN
4040
else:
41-
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND
41+
request.operation.event = (
42+
accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND
43+
)
4244
yield request
4345

4446

@@ -82,27 +84,19 @@ class ExampleClass(Accumulator):
8284
def __init__(self, counter):
8385
self.counter = counter
8486

85-
async def handler(
86-
self, datums: AsyncIterable[Datum], output: NonBlockingIterator
87-
):
87+
async def handler(self, datums: AsyncIterable[Datum], output: NonBlockingIterator):
8888
async for datum in datums:
8989
self.counter += 1
9090
msg = f"counter:{self.counter}"
91-
await output.put(
92-
Message(str.encode(msg), keys=datum.keys(), tags=[])
93-
)
91+
await output.put(Message(str.encode(msg), keys=datum.keys(), tags=[]))
9492

9593

96-
async def accumulator_handler_func(
97-
datums: AsyncIterable[Datum], output: NonBlockingIterator
98-
):
94+
async def accumulator_handler_func(datums: AsyncIterable[Datum], output: NonBlockingIterator):
9995
counter = 0
10096
async for datum in datums:
10197
counter += 1
10298
msg = f"counter:{counter}"
103-
await output.put(
104-
Message(str.encode(msg), keys=datum.keys(), tags=[])
105-
)
99+
await output.put(Message(str.encode(msg), keys=datum.keys(), tags=[]))
106100

107101

108102
def NewAsyncAccumulator():
@@ -167,7 +161,7 @@ def test_accumulate(self) -> None:
167161
count = 0
168162
eof_count = 0
169163
for r in generator_response:
170-
if hasattr(r, 'payload') and r.payload.value:
164+
if hasattr(r, "payload") and r.payload.value:
171165
count += 1
172166
# Each datum should increment the counter
173167
expected_msg = f"counter:{count}"
@@ -181,7 +175,7 @@ def test_accumulate(self) -> None:
181175
else:
182176
self.assertEqual(r.EOF, True)
183177
eof_count += 1
184-
178+
185179
# We should have received 5 messages (one for each datum)
186180
self.assertEqual(5, count)
187181
self.assertEqual(1, eof_count)
@@ -209,7 +203,7 @@ def test_accumulate_with_multiple_keys(self) -> None:
209203
# Track count per key
210204
key = r.payload.keys[0] if r.payload.keys else "no_key"
211205
key_counts[key] = key_counts.get(key, 0) + 1
212-
206+
213207
# Each key should have its own counter starting from 1
214208
expected_msg = f"counter:{key_counts[key]}"
215209
self.assertEqual(
@@ -220,7 +214,7 @@ def test_accumulate_with_multiple_keys(self) -> None:
220214
else:
221215
eof_count += 1
222216
self.assertEqual(r.EOF, True)
223-
217+
224218
# We should have 10 messages (one for each key)
225219
self.assertEqual(10, count)
226220
self.assertEqual(10, eof_count) # Each key/task sends its own EOF

tests/accumulator/test_async_accumulator_err.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
from pynumaflow.shared.asynciter import NonBlockingIterator
2020
from tests.testing_utils import (
2121
mock_message,
22-
mock_interval_window_start,
23-
mock_interval_window_end,
2422
get_time_args,
2523
mock_terminate_on_stop,
2624
)
@@ -74,33 +72,25 @@ class ExampleErrorClass(Accumulator):
7472
def __init__(self, counter):
7573
self.counter = counter
7674

77-
async def handler(
78-
self, datums: AsyncIterable[Datum], output: NonBlockingIterator
79-
):
75+
async def handler(self, datums: AsyncIterable[Datum], output: NonBlockingIterator):
8076
async for datum in datums:
8177
self.counter += 1
8278
if self.counter == 2:
8379
# Simulate an error on the second datum
8480
raise RuntimeError("Simulated error in accumulator handler")
8581
msg = f"counter:{self.counter}"
86-
await output.put(
87-
Message(str.encode(msg), keys=datum.keys(), tags=[])
88-
)
82+
await output.put(Message(str.encode(msg), keys=datum.keys(), tags=[]))
8983

9084

91-
async def error_accumulator_handler_func(
92-
datums: AsyncIterable[Datum], output: NonBlockingIterator
93-
):
85+
async def error_accumulator_handler_func(datums: AsyncIterable[Datum], output: NonBlockingIterator):
9486
counter = 0
9587
async for datum in datums:
9688
counter += 1
9789
if counter == 2:
9890
# Simulate an error on the second datum
9991
raise RuntimeError("Simulated error in accumulator function")
10092
msg = f"counter:{counter}"
101-
await output.put(
102-
Message(str.encode(msg), keys=datum.keys(), tags=[])
103-
)
93+
await output.put(Message(str.encode(msg), keys=datum.keys(), tags=[]))
10494

10595

10696
def NewAsyncAccumulatorError():
@@ -121,6 +111,7 @@ async def start_server(udfs):
121111
await server.start()
122112
await server.wait_for_termination()
123113

114+
124115
@patch("psutil.Process.kill", mock_terminate_on_stop)
125116
class TestAsyncAccumulatorError(unittest.TestCase):
126117
@classmethod
@@ -156,15 +147,15 @@ def test_accumulate_error(self) -> None:
156147
stub = self.__stub()
157148
request = start_request()
158149
generator_response = None
159-
150+
160151
try:
161152
generator_response = stub.AccumulateFn(
162153
request_iterator=request_generator(count=5, request=request)
163154
)
164-
155+
165156
# Try to consume the generator
166157
counter = 0
167-
logging.info(f"[TEST_DEBUG] About to iterate through generator_response")
158+
logging.info("[TEST_DEBUG] About to iterate through generator_response")
168159
for response in generator_response:
169160
counter += 1
170161
logging.info(f"[TEST_DEBUG] Received response {counter}: {response}")
@@ -180,12 +171,12 @@ def test_accumulate_partial_success(self) -> None:
180171
"""Test that the first datum is processed before error occurs"""
181172
stub = self.__stub()
182173
request = start_request()
183-
174+
184175
try:
185176
generator_response = stub.AccumulateFn(
186177
request_iterator=request_generator(count=3, request=request)
187178
)
188-
179+
189180
# Try to consume the generator
190181
counter = 0
191182
for _ in generator_response:

0 commit comments

Comments
 (0)