Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ proto:
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcetransformer -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sideinput -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcer -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/accumulator -I=pynumaflow/proto/accumulator --python_out=pynumaflow/proto/accumulator --grpc_python_out=pynumaflow/proto/accumulator pynumaflow/proto/accumulator/*.proto


sed -i.bak -e 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py
Expand Down
55 changes: 55 additions & 0 deletions examples/accumulator/streamsorter/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
####################################################################################################
# Stage 1: Base Builder - installs core dependencies using poetry
####################################################################################################
FROM python:3.10-slim-bullseye AS base-builder

ENV PYSETUP_PATH="/opt/pysetup"
WORKDIR $PYSETUP_PATH

# Copy only core dependency files first for better caching
COPY pyproject.toml poetry.lock README.md ./
COPY pynumaflow/ ./pynumaflow/
RUN apt-get update && apt-get install --no-install-recommends -y \
curl wget build-essential git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& pip install poetry \
&& poetry install --no-root --no-interaction

####################################################################################################
# Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps
####################################################################################################
FROM base-builder AS udf-builder

ENV EXAMPLE_PATH="/opt/pysetup/examples/accumulator/streamsorter"
ENV POETRY_VIRTUALENVS_IN_PROJECT=true

WORKDIR $EXAMPLE_PATH
COPY examples/accumulator/streamsorter/ ./
RUN poetry install --no-root --no-interaction

####################################################################################################
# Stage 3: UDF Runtime - clean container with only needed stuff
####################################################################################################
FROM python:3.10-slim-bullseye AS udf

ENV PYSETUP_PATH="/opt/pysetup"
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/streamsorter"
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
ENV PATH="$VENV_PATH/bin:$PATH"

RUN apt-get update && apt-get install --no-install-recommends -y wget \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init

WORKDIR $PYSETUP_PATH
COPY --from=udf-builder $VENV_PATH $VENV_PATH
COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH

WORKDIR $EXAMPLE_PATH
RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]

EXPOSE 5000
22 changes: 22 additions & 0 deletions examples/accumulator/streamsorter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/streamsorter:${TAG}
DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile

.PHONY: update
update:
poetry update -vv

.PHONY: image-push
image-push: update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
52 changes: 52 additions & 0 deletions examples/accumulator/streamsorter/Makefile.optimized
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/streamsorter:${TAG}
DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile
BASE_IMAGE_NAME = numaflow-python-base

.PHONY: base-image
base-image:
@echo "Building shared base image..."
docker build -f Dockerfile.base -t ${BASE_IMAGE_NAME} .

.PHONY: update
update:
poetry update -vv

.PHONY: image-push
image-push: base-image update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: base-image update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: image-fast
image-fast: update
@echo "Building with shared base image (fastest option)..."
cd ../../../ && docker build \
-f examples/map/even_odd/Dockerfile.shared-base \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
docker rmi ${BASE_IMAGE_NAME} 2>/dev/null || true
docker rmi ${IMAGE_REGISTRY} 2>/dev/null || true

.PHONY: help
help:
@echo "Available targets:"
@echo " base-image - Build the shared base image with pynumaflow"
@echo " image - Build UDF image with optimized multi-stage build"
@echo " image-fast - Build UDF image using shared base (fastest)"
@echo " image-push - Build and push multi-platform image"
@echo " update - Update poetry dependencies"
@echo " clean - Remove built images"
@echo " help - Show this help message"
43 changes: 43 additions & 0 deletions examples/accumulator/streamsorter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Stream Sorter

An example User Defined Function that sorts the incoming stream by event time.

### Applying the Pipeline

To apply the pipeline, use the following command:

```shell
kubectl apply -f pipeline.yaml
```

### Publish messages

Port-forward the HTTP endpoint, and make POST requests using curl. Remember to replace xxxx with the appropriate pod names.

```shell
kubectl port-forward stream-sorter-http-one-0-xxxx 8444:8443

# Post data to the HTTP endpoint
curl -kq -X POST -d "101" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 60000"
curl -kq -X POST -d "102" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 61000"
curl -kq -X POST -d "103" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 62000"
curl -kq -X POST -d "104" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 63000"
```

```shell
kubectl port-forward stream-sorter-http-two-0-xxxx 8445:8443

# Post data to the HTTP endpoint
curl -kq -X POST -d "105" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 70000"
curl -kq -X POST -d "106" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 71000"
curl -kq -X POST -d "107" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 72000"
curl -kq -X POST -d "108" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 73000"
```

### Verify the output

```shell
kubectl logs -f stream-sorter-log-sink-0-xxxx
```

The output should be sorted by event time.
4 changes: 4 additions & 0 deletions examples/accumulator/streamsorter/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
72 changes: 72 additions & 0 deletions examples/accumulator/streamsorter/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
import os
from collections.abc import AsyncIterable
from datetime import datetime

from pynumaflow import setup_logging
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
from pynumaflow.accumulator import (
Message,
Datum,
)
from pynumaflow.shared.asynciter import NonBlockingIterator

_LOGGER = setup_logging(__name__)
if os.getenv("PYTHONDEBUG"):
_LOGGER.setLevel(logging.DEBUG)


class StreamSorter(Accumulator):
def __init__(self):
_LOGGER.info("StreamSorter initialized")
self.latest_wm = datetime.fromtimestamp(-1)
self.sorted_buffer: list[Datum] = []

async def handler(
self,
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
):
_LOGGER.info("StreamSorter handler started")
async for datum in datums:
_LOGGER.info(
f"Received datum with event time: {datum.event_time}, "
f"Current latest watermark: {self.latest_wm}, "
f"Datum watermark: {datum.watermark}"
)

# If watermark has moved forward
if datum.watermark and datum.watermark > self.latest_wm:
self.latest_wm = datum.watermark
await self.flush_buffer(output)

self.insert_sorted(datum)

def insert_sorted(self, datum: Datum):
# Binary insert to keep sorted buffer in order
left, right = 0, len(self.sorted_buffer)
while left < right:
mid = (left + right) // 2
if self.sorted_buffer[mid].event_time > datum.event_time:
right = mid
else:
left = mid + 1
self.sorted_buffer.insert(left, datum)

async def flush_buffer(self, output: NonBlockingIterator):
_LOGGER.info(f"Watermark updated, flushing sortedBuffer: {self.latest_wm}")
i = 0
for datum in self.sorted_buffer:
if datum.event_time > self.latest_wm:
break
await output.put(Message.from_datum(datum))
_LOGGER.info(f"Sent datum with event time: {datum.event_time}")
i += 1
# Remove flushed items
self.sorted_buffer = self.sorted_buffer[i:]


if __name__ == "__main__":
grpc_server = None
grpc_server = AccumulatorAsyncServer(StreamSorter)
grpc_server.start()
49 changes: 49 additions & 0 deletions examples/accumulator/streamsorter/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: stream-sorter
spec:
limits:
readBatchSize: 1
vertices:
- name: http-one
scale:
min: 1
max: 1
source:
http: {}
- name: http-two
scale:
min: 1
max: 1
source:
http: {}
- name: py-accum
udf:
container:
image: quay.io/numaio/numaflow-python/streamsorter:stable
imagePullPolicy: Always
env:
- name: PYTHONDEBUG
value: "true"
groupBy:
window:
accumulator:
timeout: 10s
keyed: true
storage:
persistentVolumeClaim:
volumeSize: 1Gi
- name: py-sink
scale:
min: 1
max: 1
sink:
log: {}
edges:
- from: http-one
to: py-accum
- from: http-two
to: py-accum
- from: py-accum
to: py-sink
13 changes: 13 additions & 0 deletions examples/accumulator/streamsorter/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[tool.poetry]
name = "stream-sorter"
version = "0.2.4"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
pynumaflow = { path = "../../../"}

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
1 change: 0 additions & 1 deletion examples/map/even_odd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ WORKDIR $PYSETUP_PATH
# Copy only core dependency files first for better caching
COPY pyproject.toml poetry.lock README.md ./
COPY pynumaflow/ ./pynumaflow/
RUN echo "Simulating long build step..." && sleep 20
RUN apt-get update && apt-get install --no-install-recommends -y \
curl wget build-essential git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
Expand Down
2 changes: 2 additions & 0 deletions pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc"
FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock"
BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock"
ACCUMULATOR_SOCK_PATH = "/var/run/numaflow/accumulator.sock"

# Server information file configs
MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"
Expand All @@ -36,6 +37,7 @@
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"
ACCUMULATOR_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info"

ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"
Expand Down
21 changes: 21 additions & 0 deletions pynumaflow/accumulator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from pynumaflow.accumulator._dtypes import (
Message,
Datum,
IntervalWindow,
Metadata,
DROP,
KeyedWindow,
Accumulator,
)
from pynumaflow.accumulator.async_server import AccumulatorAsyncServer

__all__ = [
"Message",
"Datum",
"IntervalWindow",
"Metadata",
"DROP",
"AccumulatorAsyncServer",
"KeyedWindow",
"Accumulator",
]
Loading