Skip to content

Commit fa8fa6d

Browse files
committed
chore: add example
Signed-off-by: Sidhant Kohli <sidhant.kohli@gmail.com>
1 parent 8b58b63 commit fa8fa6d

12 files changed

Lines changed: 414 additions & 0 deletions

File tree

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup"
18+
19+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/reducestream/counter"
20+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
21+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
22+
23+
RUN apt-get update \
24+
&& apt-get install --no-install-recommends -y \
25+
curl \
26+
wget \
27+
# deps for building python deps
28+
build-essential \
29+
&& apt-get install -y git \
30+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
31+
\
32+
# install dumb-init
33+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
34+
&& chmod +x /dumb-init \
35+
&& curl -sSL https://install.python-poetry.org | python3 -
36+
37+
####################################################################################################
38+
# udf: used for running the udf vertices
39+
####################################################################################################
40+
FROM builder AS udf
41+
42+
WORKDIR $PYSETUP_PATH
43+
COPY ./ ./
44+
45+
WORKDIR $EXAMPLE_PATH
46+
RUN poetry lock
47+
RUN poetry install --no-cache --no-root && \
48+
rm -rf ~/.cache/pypoetry/
49+
50+
RUN chmod +x entry.sh
51+
52+
ENTRYPOINT ["/dumb-init", "--"]
53+
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]
54+
55+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/reduce-stream-counter:${TAG}
4+
DOCKER_FILE_PATH = examples/reducestream/counter/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import os
2+
from collections.abc import AsyncIterable
3+
4+
from pynumaflow.reducestreamer import (
5+
Message,
6+
Datum,
7+
Metadata,
8+
ReduceStreamAsyncServer,
9+
ReduceStreamer,
10+
)
11+
from pynumaflow.shared.asynciter import NonBlockingIterator
12+
13+
14+
class ReduceCounter(ReduceStreamer):
15+
def __init__(self, counter):
16+
self.counter = counter
17+
18+
async def handler(
19+
self,
20+
keys: list[str],
21+
datums: AsyncIterable[Datum],
22+
output: NonBlockingIterator,
23+
md: Metadata,
24+
):
25+
async for _ in datums:
26+
self.counter += 1
27+
if self.counter > 10:
28+
msg = f"counter:{self.counter}"
29+
# NOTE: this is returning results because we have seen all the data
30+
# use this only if you really need this feature because your next vertex
31+
# will get both early result and final results and it should be able to
32+
# handle both the scenarios.
33+
await output.put(Message(str.encode(msg), keys=keys))
34+
self.counter = 0
35+
msg = f"counter:{self.counter}"
36+
await output.put(Message(str.encode(msg), keys=keys))
37+
38+
39+
if __name__ == "__main__":
40+
invoke = os.getenv("INVOKE", "class")
41+
if invoke == "class":
42+
# Here we are using the class instance as the reducer_instance
43+
# which will be used to invoke the handler function.
44+
# We are passing the init_args for the class instance.
45+
grpc_server = ReduceStreamAsyncServer(ReduceCounter, init_args=(0,))
46+
grpc_server.start()
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: even-odd-sum
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
http: {}
10+
- name: atoi
11+
scale:
12+
min: 3
13+
udf:
14+
container:
15+
# Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd
16+
image: quay.io/numaio/numaflow-go/map-even-odd:stable
17+
imagePullPolicy: Always
18+
- name: compute-sum
19+
udf:
20+
container:
21+
# compute the sum
22+
image: quay.io/numaio/numaflow-python/reduce-stream-counter:stable
23+
imagePullPolicy: Always
24+
env:
25+
- name: PYTHONDEBUG
26+
value: "true"
27+
- name: INVOKE
28+
value: "class"
29+
groupBy:
30+
window:
31+
fixed:
32+
length: 60s
33+
keyed: true
34+
storage:
35+
persistentVolumeClaim:
36+
volumeSize: 10Gi
37+
accessMode: ReadWriteOnce
38+
partitions: 1
39+
- name: sink
40+
scale:
41+
min: 1
42+
sink:
43+
log: {}
44+
edges:
45+
- from: in
46+
to: atoi
47+
- from: atoi
48+
to: compute-sum
49+
- from: compute-sum
50+
to: sink
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "reduce-stream-counter"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = { path = "../../../"}
10+
11+
[tool.poetry.dev-dependencies]
12+
13+
[build-system]
14+
requires = ["poetry-core>=1.0.0"]
15+
build-backend = "poetry.core.masonry.api"
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.11-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup"
18+
19+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/streamsorter"
20+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
21+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
22+
23+
RUN apt-get update \
24+
&& apt-get install --no-install-recommends -y \
25+
curl \
26+
wget \
27+
# deps for building python deps
28+
build-essential \
29+
&& apt-get install -y git \
30+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
31+
\
32+
# install dumb-init
33+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
34+
&& chmod +x /dumb-init \
35+
&& curl -sSL https://install.python-poetry.org | python3 -
36+
37+
####################################################################################################
38+
# udf: used for running the udf vertices
39+
####################################################################################################
40+
FROM builder AS udf
41+
42+
WORKDIR $PYSETUP_PATH
43+
COPY ./ ./
44+
45+
WORKDIR $EXAMPLE_PATH
46+
RUN poetry lock
47+
RUN poetry install --no-cache --no-root && \
48+
rm -rf ~/.cache/pypoetry/
49+
50+
RUN chmod +x entry.sh
51+
52+
ENTRYPOINT ["/dumb-init", "--"]
53+
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]
54+
55+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= test1
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/accumulator-sorter:${TAG}
4+
DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import logging
2+
import os
3+
from collections.abc import AsyncIterable
4+
from datetime import datetime
5+
from typing import List
6+
7+
from pynumaflow import setup_logging
8+
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
9+
from pynumaflow.reducestreamer import (
10+
Message,
11+
Datum,
12+
)
13+
from pynumaflow.shared.asynciter import NonBlockingIterator
14+
15+
_LOGGER = setup_logging(__name__)
16+
if os.getenv("PYTHONDEBUG"):
17+
_LOGGER.setLevel(logging.DEBUG)
18+
19+
20+
class StreamSorter(Accumulator):
21+
def __init__(self):
22+
_LOGGER.error("MEEEEE")
23+
self.latest_wm = datetime.fromtimestamp(-1)
24+
self.sorted_buffer: List[Datum] = []
25+
26+
async def handler(
27+
self,
28+
datums: AsyncIterable[Datum],
29+
output: NonBlockingIterator,
30+
):
31+
_LOGGER.info("HEREEEEE")
32+
async for datum in datums:
33+
_LOGGER.info(f"Received datum with event time: {datum.watermark}")
34+
_LOGGER.info(f"Received datum with event time-2:{self.latest_wm}")
35+
36+
# If watermark has moved forward
37+
if datum.watermark.ToDatetime() and datum.watermark.ToDatetime() > self.latest_wm:
38+
self.latest_wm = datum.watermark.ToDatetime()
39+
await self.flush_buffer(output)
40+
41+
self.insert_sorted(datum)
42+
43+
def insert_sorted(self, datum: Datum):
44+
# Binary insert to keep sorted buffer in order
45+
left, right = 0, len(self.sorted_buffer)
46+
while left < right:
47+
mid = (left + right) // 2
48+
if self.sorted_buffer[mid].event_time.ToDatetime() > datum.event_time.ToDatetime():
49+
right = mid
50+
else:
51+
left = mid + 1
52+
self.sorted_buffer.insert(left, datum)
53+
54+
async def flush_buffer(self, output: NonBlockingIterator):
55+
_LOGGER.info(f"Watermark updated, flushing sortedBuffer: {self.latest_wm}")
56+
i = 0
57+
for datum in self.sorted_buffer:
58+
if datum.event_time > self.latest_wm:
59+
break
60+
await output.put(Message.from_datum(datum))
61+
logging.info(f"Sent datum with event time: {datum.watermark.ToDatetime()}")
62+
i += 1
63+
# Remove flushed items
64+
self.sorted_buffer = self.sorted_buffer[i:]
65+
66+
67+
if __name__ == "__main__":
68+
invoke = os.getenv("INVOKE", "class")
69+
grpc_server = None
70+
if invoke == "class":
71+
# Here we are using the class instance as the reducer_instance
72+
# which will be used to invoke the handler function.
73+
# We are passing the init_args for the class instance.
74+
grpc_server = AccumulatorAsyncServer(StreamSorter)
75+
grpc_server.start()

0 commit comments

Comments
 (0)