Skip to content

Commit 54c0224

Browse files
srao12srao12
authored andcommitted
fix: tests and lint
Signed-off-by: srao12 <Shrivardhan_Rao@intuit.com>
1 parent ea5576c commit 54c0224

10 files changed

Lines changed: 644 additions & 49 deletions

File tree

examples/accumulator/streamsorter/Dockerfile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ WORKDIR $PYSETUP_PATH
99
# Copy only core dependency files first for better caching
1010
COPY pyproject.toml poetry.lock README.md ./
1111
COPY pynumaflow/ ./pynumaflow/
12-
RUN echo "Simulating long build step..." && sleep 20
1312
RUN apt-get update && apt-get install --no-install-recommends -y \
1413
curl wget build-essential git \
1514
&& apt-get clean && rm -rf /var/lib/apt/lists/* \

examples/accumulator/streamsorter/example.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ async def handler(
2929
):
3030
_LOGGER.info("StreamSorter handler started")
3131
async for datum in datums:
32-
_LOGGER.info(f"Received datum with event time: {datum.event_time}")
33-
_LOGGER.info(f"Current latest watermark: {self.latest_wm}")
34-
_LOGGER.info(f"Datum watermark: {datum.watermark}")
32+
_LOGGER.info(
33+
f"Received datum with event time: {datum.event_time}, "
34+
f"Current latest watermark: {self.latest_wm}, "
35+
f"Datum watermark: {datum.watermark}"
36+
)
3537

3638
# If watermark has moved forward
3739
if datum.watermark and datum.watermark > self.latest_wm:
@@ -68,7 +70,7 @@ async def flush_buffer(self, output: NonBlockingIterator):
6870
invoke = os.getenv("INVOKE", "class")
6971
grpc_server = None
7072
if invoke == "class":
71-
# Here we are using the class instance as the reducer_instance
73+
# Here we are using the class instance as the accumulator_instance
7274
# which will be used to invoke the handler function.
7375
# We are passing the init_args for the class instance.
7476
grpc_server = AccumulatorAsyncServer(StreamSorter)

examples/map/even_odd/Dockerfile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ WORKDIR $PYSETUP_PATH
99
# Copy only core dependency files first for better caching
1010
COPY pyproject.toml poetry.lock README.md ./
1111
COPY pynumaflow/ ./pynumaflow/
12-
RUN echo "Simulating long build step..." && sleep 20
1312
RUN apt-get update && apt-get install --no-install-recommends -y \
1413
curl wget build-essential git \
1514
&& apt-get clean && rm -rf /var/lib/apt/lists/* \

pynumaflow/accumulator/_dtypes.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class Datum:
3232
event_time: the event time of the event.
3333
watermark: the watermark of the event.
3434
>>> # Example usage
35-
>>> from pynumaflow.reducer import Datum
35+
>>> from pynumaflow.accumulator import Datum
3636
>>> from datetime import datetime, timezone
3737
>>> payload = bytes("test_mock_message", encoding="utf-8")
3838
>>> t1 = datetime.fromtimestamp(1662998400, timezone.utc)
@@ -419,8 +419,8 @@ class _AccumulatorBuilderClass:
419419
420420
Args:
421421
accumulator_class: the Accumulator class to be used for Accumulator UDF
422-
args: the arguments to be passed to the reducer class
423-
kwargs: the keyword arguments to be passed to the reducer class
422+
args: the arguments to be passed to the accumulator class
423+
kwargs: the keyword arguments to be passed to the accumulator class
424424
"""
425425

426426
def __init__(self, accumulator_class: type[Accumulator], args: tuple, kwargs: dict):
@@ -430,10 +430,10 @@ def __init__(self, accumulator_class: type[Accumulator], args: tuple, kwargs: di
430430

431431
def create(self) -> Accumulator:
432432
"""
433-
Create a new ReduceStreamer instance.
433+
Create a new Accumulator instance.
434434
"""
435435
return self._accumulator_class(*self._args, **self._kwargs)
436436

437437

438-
# AccumulatorStreamCallable is a callable which can be used as a handler for the Reduce UDF.
438+
# AccumulatorStreamCallable is a callable which can be used as a handler for the Accumulator UDF.
439439
AccumulatorStreamCallable = Union[AccumulatorAsyncCallable, type[Accumulator]]

pynumaflow/accumulator/async_server.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ class AccumulatorAsyncServer(NumaflowServer):
6262
A new servicer instance is created and attached to the server.
6363
The server instance is returned.
6464
Args:
65-
accumulator_instance: The reducer instance to be used for
66-
Reduce Streaming UDF
65+
accumulator_instance: The accumulator instance to be used for
66+
Accumulator UDF
6767
init_args: The arguments to be passed to the accumulator_handler
6868
init_kwargs: The keyword arguments to be passed to the
6969
accumulator_handler
@@ -78,7 +78,7 @@ class AccumulatorAsyncServer(NumaflowServer):
7878
from pynumaflow.accumulator import Messages, Message, Datum, Metadata,
7979
AccumulatorAsyncServer, Accumulator
8080
81-
class ReduceCounter(Accumulator):
81+
class StreamSorter(Accumulator):
8282
def __init__(self, counter):
8383
self.counter = counter
8484
@@ -117,13 +117,13 @@ async def reduce_handler(
117117
if __name__ == "__main__":
118118
invoke = os.getenv("INVOKE", "func_handler")
119119
if invoke == "class":
120-
# Here we are using the class instance as the reducer_instance
120+
# Here we are using the class instance as the accumulator_instance
121121
# which will be used to invoke the handler function.
122122
# We are passing the init_args for the class instance.
123-
grpc_server = AccumulatorAsyncServer(ReduceCounter, init_args=(0,))
123+
grpc_server = AccumulatorAsyncServer(StreamSorter, init_args=(0,))
124124
else:
125-
# Here we are using the handler function directly as the reducer_instance.
126-
grpc_server = AccumulatorAsyncServer(reduce_handler)
125+
# Here we are using the handler function directly as the accumulator_instance.
126+
grpc_server = AccumulatorAsyncServer(accumulator_handler)
127127
grpc_server.start()
128128
129129
"""

pynumaflow/accumulator/servicer/async_servicer.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ async def datum_generator(
4848

4949
class AsyncAccumulatorServicer(accumulator_pb2_grpc.AccumulatorServicer):
5050
"""
51-
This class is used to create a new grpc Reduce servicer instance.
51+
This class is used to create a new grpc Accumulator servicer instance.
5252
Provides the functionality for the required rpc methods.
5353
"""
5454

@@ -90,17 +90,17 @@ async def AccumulateFn(
9090
# Start the consumer task where we read from the result queue
9191
# and send the results to the client
9292
# The task manager can write the following to the result queue:
93-
# 1. A accumulator_pb2.ReduceResponse message
94-
# This is the result of the reduce function, it contains the window and the
95-
# result of the reduce function
96-
# The result of the reduce function is a accumulator_pb2.ReduceResponse message and can be
97-
# directly sent to the client
93+
# 1. A accumulator_pb2.AccumulatorResponse message
94+
# This is the result of the accumulator function, it contains the window and the
95+
# result of the accumulator function
96+
# The result of the accumulator function is a accumulator_pb2.AccumulatorResponse message
97+
# and can be directly sent to the client
9898
#
9999
# 2. An Exception
100-
# Any exceptions that occur during the processing reduce function tasks are
100+
# Any exceptions that occur during the processing accumulator function tasks are
101101
# sent to the result queue. We then forward these exception to the client
102102
#
103-
# 3. A accumulator_pb2.ReduceResponse message with EOF=True
103+
# 3. A accumulator_pb2.AccumulatorResponse message with EOF=True
104104
# This is a special message that indicates the end of the processing for a window
105105
# When we get this message, we send an EOF message to the client
106106
try:

pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
def build_unique_key_name(keys):
2424
"""
2525
Builds a unique key name for the given keys and window.
26-
The key name is used to identify the Reduce task.
26+
The key name is used to identify the Accumulator task.
2727
The format is: start_time:end_time:key1:key2:...
2828
"""
2929
return f"{DELIMITER.join(keys)}"
@@ -32,21 +32,21 @@ def build_unique_key_name(keys):
3232
def build_window_hash(window):
3333
"""
3434
Builds a hash for the given window.
35-
The hash is used to identify the Reduce Window
35+
The hash is used to identify the Accumulator Window
3636
The format is: start_time:end_time
3737
"""
3838
return f"{window.start.ToMilliseconds()}:{window.end.ToMilliseconds()}"
3939

4040

4141
def create_window_eof_response(window):
42-
"""Create a Reduce response with EOF=True for a given window"""
43-
return accumulator_pb2.ReduceResponse(window=window, EOF=True)
42+
"""Create a Accumulator response with EOF=True for a given window"""
43+
return accumulator_pb2.AccumulatorResponse(window=window, EOF=True)
4444

4545

4646
class TaskManager:
4747
"""
48-
TaskManager is responsible for managing the Reduce tasks.
49-
It is created whenever a new reduce operation is requested.
48+
TaskManager is responsible for managing the Accumulator tasks.
49+
It is created whenever a new accumulator operation is requested.
5050
"""
5151

5252
def __init__(self, handler: Union[AccumulatorAsyncCallable, _AccumulatorBuilderClass]):
@@ -56,13 +56,13 @@ def __init__(self, handler: Union[AccumulatorAsyncCallable, _AccumulatorBuilderC
5656
# Event loop only keeps a weak reference, which can cause it to
5757
# get lost during execution.
5858
self.background_tasks = set()
59-
# Handler for the reduce operation
59+
# Handler for the accumulator operation
6060
self.__accumulator_handler = handler
61-
# Queue to store the results of the reduce operation
61+
# Queue to store the results of the accumulator operation
6262
# This queue is used to send the results to the client
63-
# once the reduce operation is completed.
63+
# once the accumulator operation is completed.
6464
# This queue is also used to send the error/exceptions to the client
65-
# if the reduce operation fails.
65+
# if the accumulator operation fails.
6666
self.global_result_queue = NonBlockingIterator()
6767
# EOF response counting to ensure proper termination
6868
self._expected_eof_count = 0
@@ -87,14 +87,14 @@ def get_unique_windows(self):
8787

8888
def get_tasks(self):
8989
"""
90-
Returns the list of reduce tasks that are
90+
Returns the list of accumulator tasks that are
9191
currently being processed
9292
"""
9393
return list(self.tasks.values())
9494

9595
async def stream_send_eof(self):
9696
"""
97-
Sends EOF to input streams of all the Reduce
97+
Sends EOF to input streams of all the Accumulator
9898
tasks that are currently being processed.
9999
This is called when the input grpc stream is closed.
100100
"""
@@ -136,7 +136,7 @@ async def create_task(self, req):
136136
riter = niter.read_iterator()
137137
# Create a new result queue for the current task
138138
# We create a new result queue for each task, so that
139-
# the results of the reduce operation can be sent to the
139+
# the results of the accumulator operation can be sent to the
140140
# the global result queue, which in turn sends the results
141141
# to the client.
142142
res_queue = NonBlockingIterator()
@@ -153,7 +153,7 @@ async def create_task(self, req):
153153
consumer.add_done_callback(self.clean_background)
154154

155155
# Create a new task for the accumulator operation, this will invoke the
156-
# Reduce handler with the given keys, request iterator, and window.
156+
# Accumulator handler with the given keys, request iterator, and window.
157157
task = asyncio.create_task(self.__invoke_accumulator(riter, res_queue))
158158
# Save a reference to the result of this function, to avoid a
159159
# task disappearing mid-execution.
@@ -165,7 +165,7 @@ async def create_task(self, req):
165165
task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1)
166166
)
167167

168-
# Save the result of the reduce operation to the task list
168+
# Save the result of the accumulator operation to the task list
169169
self.tasks[unified_key] = curr_task
170170

171171
# Increment expected EOF count since we created a new task
@@ -195,23 +195,23 @@ async def __invoke_accumulator(
195195
output: NonBlockingIterator,
196196
):
197197
"""
198-
Invokes the UDF reduce handler with the given keys,
198+
Invokes the UDF accumulator handler with the given keys,
199199
request iterator, and window. Returns the result of the
200-
reduce operation.
200+
accumulator operation.
201201
"""
202202
new_instance = self.__accumulator_handler
203203

204204
# If the accumulator handler is a class instance, create a new instance of it.
205205
# It is required for a new key to be processed by a
206-
# new instance of the reducer for a given window
206+
# new instance of the accumulator for a given window
207207
# Otherwise the function handler can be called directly
208208
if isinstance(self.__accumulator_handler, _AccumulatorBuilderClass):
209209
new_instance = self.__accumulator_handler.create()
210210
try:
211211
_ = await new_instance(request_iterator, output)
212212
# send EOF to the output stream
213213
await output.put(STREAM_EOF)
214-
# If there is an error in the reduce operation, log and
214+
# If there is an error in the accumulator operation, log and
215215
# then send the error to the result queue
216216
except BaseException as err:
217217
_LOGGER.critical("panic inside accumulator handle", exc_info=True)
@@ -242,7 +242,7 @@ async def process_input_stream(
242242
else:
243243
_LOGGER.debug(f"No operation matched for request: {request}", exc_info=True)
244244

245-
# If there is an error in the reduce operation, log and
245+
# If there is an error in the accumulator operation, log and
246246
# then send the error to the result queue
247247
except BaseException as e:
248248
err_msg = f"Accumulator Error: {repr(e)}"
@@ -257,7 +257,7 @@ async def process_input_stream(
257257
# respective iterators.
258258
await self.stream_send_eof()
259259

260-
# get the list of reduce tasks that are currently being processed
260+
# get the list of accumulator tasks that are currently being processed
261261
# iterate through the tasks and wait for them to complete
262262
for task in self.get_tasks():
263263
# Once this is done, we know that the task has written all the results
@@ -281,15 +281,15 @@ async def process_input_stream(
281281
# Now send STREAM_EOF to terminate the global result queue iterator
282282
await self.global_result_queue.put(STREAM_EOF)
283283
except BaseException as e:
284-
err_msg = f"Reduce Streaming Error: {repr(e)}"
284+
err_msg = f"Accumulator Streaming Error: {repr(e)}"
285285
_LOGGER.critical(err_msg, exc_info=True)
286286
await self.global_result_queue.put(e)
287287

288288
async def write_to_global_queue(
289289
self, input_queue: NonBlockingIterator, output_queue: NonBlockingIterator, unified_key: str
290290
):
291291
"""
292-
This task is for given Reduce task.
292+
This task is for given Accumulator task.
293293
This would from the local result queue for the task and then write
294294
to the global result queue
295295
"""

tests/accumulator/test_async_accumulator.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,60 @@ def test_max_threads(self):
270270
server = AccumulatorAsyncServer(accumulator_instance=ExampleClass)
271271
self.assertEqual(server.max_threads, 4)
272272

273+
def test_server_info_file_path_handling(self):
274+
"""Test AccumulatorAsyncServer with custom server info file path."""
275+
276+
server = AccumulatorAsyncServer(
277+
ExampleClass, init_args=(0,), server_info_file="/custom/path/server_info.json"
278+
)
279+
280+
self.assertEqual(server.server_info_file, "/custom/path/server_info.json")
281+
282+
def test_init_kwargs_none_handling(self):
283+
"""Test init_kwargs None handling in AccumulatorAsyncServer."""
284+
285+
server = AccumulatorAsyncServer(
286+
ExampleClass, init_args=(0,), init_kwargs=None # This should be converted to {}
287+
)
288+
289+
# Should not raise any errors and should work correctly
290+
self.assertIsNotNone(server.accumulator_handler)
291+
292+
def test_server_with_zero_max_threads(self):
293+
"""Test server creation with max_threads set to 0."""
294+
295+
server = AccumulatorAsyncServer(ExampleClass, max_threads=0)
296+
297+
# Should be set to 0 (minimum)
298+
self.assertEqual(server.max_threads, 0)
299+
300+
def test_server_with_negative_max_threads(self):
301+
"""Test server creation with negative max_threads."""
302+
303+
server = AccumulatorAsyncServer(ExampleClass, max_threads=-5)
304+
305+
# Should be set to -5 (the minimum function will handle this)
306+
self.assertEqual(server.max_threads, -5)
307+
308+
def test_server_start_method_logging(self):
309+
"""Test server start method includes proper logging."""
310+
from unittest.mock import patch
311+
312+
server = AccumulatorAsyncServer(ExampleClass)
313+
314+
# Mock aiorun.run to prevent actual server startup
315+
with patch("pynumaflow.accumulator.async_server.aiorun") as mock_aiorun, patch(
316+
"pynumaflow.accumulator.async_server._LOGGER"
317+
) as mock_logger:
318+
server.start()
319+
320+
# Verify logging was called
321+
mock_logger.info.assert_called_once_with("Starting Async Accumulator Server")
322+
323+
# Verify aiorun.run was called with correct parameters
324+
mock_aiorun.run.assert_called_once()
325+
self.assertTrue(mock_aiorun.run.call_args[1]["use_uvloop"])
326+
273327

274328
if __name__ == "__main__":
275329
logging.basicConfig(level=logging.DEBUG)

0 commit comments

Comments
 (0)