forked from numaproj/numaflow-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
72 lines (61 loc) · 2.32 KB
/
example.py
File metadata and controls
72 lines (61 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
import os
from collections.abc import AsyncIterable
from datetime import datetime
from pynumaflow import setup_logging
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
from pynumaflow.accumulator import (
Message,
Datum,
)
from pynumaflow.shared.asynciter import NonBlockingIterator
_LOGGER = setup_logging(__name__)
if os.getenv("PYTHONDEBUG"):
_LOGGER.setLevel(logging.DEBUG)
class StreamSorter(Accumulator):
def __init__(self):
_LOGGER.info("StreamSorter initialized")
self.latest_wm = datetime.fromtimestamp(-1)
self.sorted_buffer: list[Datum] = []
async def handler(
self,
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
):
_LOGGER.info("StreamSorter handler started")
async for datum in datums:
_LOGGER.info(
f"Received datum with event time: {datum.event_time}, "
f"Current latest watermark: {self.latest_wm}, "
f"Datum watermark: {datum.watermark}"
)
# If watermark has moved forward
if datum.watermark and datum.watermark > self.latest_wm:
self.latest_wm = datum.watermark
await self.flush_buffer(output)
self.insert_sorted(datum)
def insert_sorted(self, datum: Datum):
# Binary insert to keep sorted buffer in order
left, right = 0, len(self.sorted_buffer)
while left < right:
mid = (left + right) // 2
if self.sorted_buffer[mid].event_time > datum.event_time:
right = mid
else:
left = mid + 1
self.sorted_buffer.insert(left, datum)
async def flush_buffer(self, output: NonBlockingIterator):
_LOGGER.info(f"Watermark updated, flushing sortedBuffer: {self.latest_wm}")
i = 0
for datum in self.sorted_buffer:
if datum.event_time > self.latest_wm:
break
await output.put(Message.from_datum(datum))
_LOGGER.info(f"Sent datum with event time: {datum.event_time}")
i += 1
# Remove flushed items
self.sorted_buffer = self.sorted_buffer[i:]
if __name__ == "__main__":
grpc_server = None
grpc_server = AccumulatorAsyncServer(StreamSorter)
grpc_server.start()