Skip to content

Commit 60bdf75

Browse files
committed
format all files
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 349a26f commit 60bdf75

36 files changed

Lines changed: 69 additions & 152 deletions

packages/pynumaflow-lite/manifests/accumulator/accumulator_stream_sorter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,5 +126,6 @@ async def main():
126126
server.stop()
127127
return
128128

129+
129130
if __name__ == "__main__":
130131
asyncio.run(main())

packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77

88

99
class SimpleBatchCat(batchmapper.BatchMapper):
10-
async def handler(
11-
self, batch: AsyncIterable[batchmapper.Datum]
12-
) -> batchmapper.BatchResponses:
10+
async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.BatchResponses:
1311
responses = batchmapper.BatchResponses()
1412
async for d in batch:
1513
resp = batchmapper.BatchResponse(d.id)
@@ -21,10 +19,9 @@ async def handler(
2119
responses.append(resp)
2220
return responses
2321

22+
2423
async def start(
25-
f: Callable[
26-
[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]
27-
],
24+
f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]],
2825
):
2926
server = batchmapper.BatchMapAsyncServer()
3027

packages/pynumaflow-lite/manifests/map/map_cat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ async def handler(self, keys: list[str], payload: mapper.Datum) -> mapper.Messag
1717

1818
return messages
1919

20+
2021
async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages]]):
2122
server = mapper.MapAsyncServer()
2223

packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,15 @@
77

88

99
class SimpleStreamCat(mapstreamer.MapStreamer):
10-
async def handler(
11-
self, keys: list[str], datum: mapstreamer.Datum
12-
) -> AsyncIterator[Message]:
10+
async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncIterator[Message]:
1311
parts = datum.value.decode("utf-8").split(",")
1412
if not parts:
1513
yield Message.to_drop()
1614
return
1715
for s in parts:
1816
yield Message(s.encode(), keys)
1917

18+
2019
async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Message]]):
2120
# Use default socket/info file locations; no explicit sock file passed
2221
server = mapstreamer.MapStreamAsyncServer()

packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ async def handler(
1919
self.counter = 0
2020
async for _ in datums:
2121
self.counter += 1
22-
msg = (
23-
f"counter:{self.counter} interval_window_start:{iw.start} interval_window_end:{iw.end}"
24-
).encode()
22+
msg = (f"counter:{self.counter} interval_window_start:{iw.start} interval_window_end:{iw.end}").encode()
2523
out = reducer.Messages()
2624
out.append(reducer.Message(msg, keys))
2725
return out
2826

27+
2928
async def start(creator: type[reducer.Reducer], init_args: tuple):
3029
sock_file = "/var/run/numaflow/reduce.sock"
3130
server_info_file = "/var/run/numaflow/reducer-server-info"

packages/pynumaflow-lite/manifests/reducestream/reducestream_counter.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,20 @@ async def handler(
5353
# Emit intermediate result every 10 items
5454
if self.counter % 10 == 0:
5555
msg = (
56-
f"counter:{self.counter} "
57-
f"interval_window_start:{iw.start} "
58-
f"interval_window_end:{iw.end}"
56+
f"counter:{self.counter} interval_window_start:{iw.start} interval_window_end:{iw.end}"
5957
).encode()
6058
print(f"Yielding intermediate result: counter={self.counter}")
6159
# Early release of data - this is the key feature of reduce streaming!
6260
yield reducestreamer.Message(msg, keys=keys)
6361

6462
# Emit final result
6563
msg = (
66-
f"counter:{self.counter} (FINAL) "
67-
f"interval_window_start:{iw.start} "
68-
f"interval_window_end:{iw.end}"
64+
f"counter:{self.counter} (FINAL) interval_window_start:{iw.start} interval_window_end:{iw.end}"
6965
).encode()
7066
print(f"Yielding final result: counter={self.counter}")
7167
yield reducestreamer.Message(msg, keys=keys)
7268

69+
7370
async def start(creator: type, init_args: tuple):
7471
"""Start the reduce stream server."""
7572
sock_file = "/var/run/numaflow/reducestream.sock"

packages/pynumaflow-lite/manifests/session_reduce/session_reduce_counter_class.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,6 @@ async def main():
7575
server.stop()
7676
return
7777

78+
7879
if __name__ == "__main__":
7980
asyncio.run(main())

packages/pynumaflow-lite/manifests/sideinput/sideinput_example.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def update_data_from_file(self, path):
8686
except Exception as e:
8787
print(f"Error reading file: {e}")
8888

89+
8990
async def start_sideinput():
9091
"""Start the SideInput retriever server."""
9192
server = sideinputer.SideInputAsyncServer()

packages/pynumaflow-lite/manifests/sink/sink_log.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses
2525
# we can use Response.as_fallback(msg.id) to write the message to fallback sink
2626
return responses
2727

28+
2829
async def start(
2930
f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]],
3031
):

packages/pynumaflow-lite/manifests/source/simple_source.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,11 @@ def __init__(self):
2222
self.counter = 0
2323
self.partition_idx = 0
2424

25-
async def read_handler(
26-
self, datum: sourcer.ReadRequest
27-
) -> AsyncIterator[sourcer.Message]:
25+
async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[sourcer.Message]:
2826
"""
2927
The simple source generates messages with incrementing numbers.
3028
"""
31-
_LOGGER.info(
32-
f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}"
33-
)
29+
_LOGGER.info(f"Read request: num_records={datum.num_records}, timeout_ms={datum.timeout_ms}")
3430

3531
# Generate the requested number of messages
3632
for _ in range(datum.num_records):
@@ -66,19 +62,15 @@ async def ack_handler(self, request: sourcer.AckRequest) -> None:
6662
"""
6763
_LOGGER.info(f"Acknowledging {len(request.offsets)} offsets")
6864
for offset in request.offsets:
69-
_LOGGER.debug(
70-
f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
71-
)
65+
_LOGGER.debug(f"Acked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
7266

7367
async def nack_handler(self, request: sourcer.NackRequest) -> None:
7468
"""
7569
The simple source negatively acknowledges the offsets.
7670
"""
7771
_LOGGER.info(f"Negatively acknowledging {len(request.offsets)} offsets")
7872
for offset in request.offsets:
79-
_LOGGER.warning(
80-
f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}"
81-
)
73+
_LOGGER.warning(f"Nacked offset: {offset.offset.decode('utf-8')}, partition: {offset.partition_id}")
8274

8375
async def pending_handler(self) -> sourcer.PendingResponse:
8476
"""
@@ -92,6 +84,7 @@ async def partitions_handler(self) -> sourcer.PartitionsResponse:
9284
"""
9385
return sourcer.PartitionsResponse(partitions=[self.partition_idx])
9486

87+
9588
async def start():
9689
server = sourcer.SourceAsyncServer()
9790

0 commit comments

Comments
 (0)