-
Notifications
You must be signed in to change notification settings - Fork 26
feat: add Accumulator #237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
266d9d3
add proto
kohlisid 96b8c73
init accumulator
kohlisid 67f5e87
Merge branch 'main' into accum
kohlisid 8b58b63
minor fixes
kohlisid c96b488
3/5 accum test files pass
0647b3e
Fix accumulator async runtime issues
fa8fa6d
chore: add example
kohlisid 464411c
Fix accumulator tests and improve error handling
b49d53c
merge
19d61a3
Fix merge conflicts
10ebf46
fix: example
32fda32
fix: tests and add e2e test
b4d7ccc
fix: cleanup logs
6466f31
fix: resolve conflicts
35156ba
fix: use optimized Dockerfile
ea5576c
fix: lint
54c0224
fix: tests and lint
1d2863c
fix: update example
82f83d5
fix: tests
ee3acf9
fix: lint
6b76b9d
fix: update proto
5774e80
fix: update docstring
7cb73d7
fix: update docker image name
d6968b6
fix: update close task
8aebf04
fix: tests
2070401
fix: add comprehensive accumulator window operation tests
0156e24
fix: conflicts
6d8399a
fix: comments and tests
53ec8da
fix: remove extra STREAM_EOF
e1a7c9a
fix: lint
42c0264
fix: remove tests
be9c47f
fix: lint
597638f
fix: add accumulator for e2e tests
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| #################################################################################################### | ||
| # Stage 1: Base Builder - installs core dependencies using poetry | ||
| #################################################################################################### | ||
| FROM python:3.10-slim-bullseye AS base-builder | ||
|
|
||
| ENV PYSETUP_PATH="/opt/pysetup" | ||
| WORKDIR $PYSETUP_PATH | ||
|
|
||
| # Copy only core dependency files first for better caching | ||
| COPY pyproject.toml poetry.lock README.md ./ | ||
| COPY pynumaflow/ ./pynumaflow/ | ||
| RUN apt-get update && apt-get install --no-install-recommends -y \ | ||
| curl wget build-essential git \ | ||
| && apt-get clean && rm -rf /var/lib/apt/lists/* \ | ||
| && pip install poetry \ | ||
| && poetry install --no-root --no-interaction | ||
|
|
||
| #################################################################################################### | ||
| # Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps | ||
| #################################################################################################### | ||
| FROM base-builder AS udf-builder | ||
|
|
||
| ENV EXAMPLE_PATH="/opt/pysetup/examples/accumulator/streamsorter" | ||
| ENV POETRY_VIRTUALENVS_IN_PROJECT=true | ||
|
|
||
| WORKDIR $EXAMPLE_PATH | ||
| COPY examples/accumulator/streamsorter/ ./ | ||
| RUN poetry install --no-root --no-interaction | ||
|
|
||
| #################################################################################################### | ||
| # Stage 3: UDF Runtime - clean container with only needed stuff | ||
| #################################################################################################### | ||
| FROM python:3.10-slim-bullseye AS udf | ||
|
|
||
| ENV PYSETUP_PATH="/opt/pysetup" | ||
| ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/streamsorter" | ||
| ENV VENV_PATH="$EXAMPLE_PATH/.venv" | ||
| ENV PATH="$VENV_PATH/bin:$PATH" | ||
|
|
||
| RUN apt-get update && apt-get install --no-install-recommends -y wget \ | ||
| && apt-get clean && rm -rf /var/lib/apt/lists/* \ | ||
| && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ | ||
| && chmod +x /dumb-init | ||
|
|
||
| WORKDIR $PYSETUP_PATH | ||
| COPY --from=udf-builder $VENV_PATH $VENV_PATH | ||
| COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH | ||
|
|
||
| WORKDIR $EXAMPLE_PATH | ||
| RUN chmod +x entry.sh | ||
|
|
||
| ENTRYPOINT ["/dumb-init", "--"] | ||
| CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"] | ||
|
|
||
| EXPOSE 5000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| TAG ?= test4 | ||
| PUSH ?= false | ||
| IMAGE_REGISTRY = docker.intuit.com/personal/srao12/py-accumulator-sorter:${TAG} | ||
| DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile | ||
|
|
||
| .PHONY: update | ||
| update: | ||
| poetry update -vv | ||
|
|
||
| .PHONY: image-push | ||
| image-push: update | ||
| cd ../../../ && docker buildx build \ | ||
| -f ${DOCKER_FILE_PATH} \ | ||
| -t ${IMAGE_REGISTRY} \ | ||
| --platform linux/amd64,linux/arm64 . --push | ||
|
|
||
| .PHONY: image | ||
| image: update | ||
| cd ../../../ && docker build \ | ||
| -f ${DOCKER_FILE_PATH} \ | ||
| -t ${IMAGE_REGISTRY} . | ||
| @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| TAG ?= stable | ||
| PUSH ?= false | ||
| IMAGE_REGISTRY = quay.io/numaio/numaflow-python/streamsorter:${TAG} | ||
| DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile | ||
| BASE_IMAGE_NAME = numaflow-python-base | ||
|
|
||
| .PHONY: base-image | ||
| base-image: | ||
| @echo "Building shared base image..." | ||
| docker build -f Dockerfile.base -t ${BASE_IMAGE_NAME} . | ||
|
|
||
| .PHONY: update | ||
| update: | ||
| poetry update -vv | ||
|
|
||
| .PHONY: image-push | ||
| image-push: base-image update | ||
| cd ../../../ && docker buildx build \ | ||
| -f ${DOCKER_FILE_PATH} \ | ||
| -t ${IMAGE_REGISTRY} \ | ||
| --platform linux/amd64,linux/arm64 . --push | ||
|
|
||
| .PHONY: image | ||
| image: base-image update | ||
| cd ../../../ && docker build \ | ||
| -f ${DOCKER_FILE_PATH} \ | ||
| -t ${IMAGE_REGISTRY} . | ||
| @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi | ||
|
|
||
| .PHONY: image-fast | ||
| image-fast: update | ||
| @echo "Building with shared base image (fastest option)..." | ||
| cd ../../../ && docker build \ | ||
| -f examples/map/even_odd/Dockerfile.shared-base \ | ||
| -t ${IMAGE_REGISTRY} . | ||
| @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi | ||
|
|
||
| .PHONY: clean | ||
| clean: | ||
| docker rmi ${BASE_IMAGE_NAME} 2>/dev/null || true | ||
| docker rmi ${IMAGE_REGISTRY} 2>/dev/null || true | ||
|
|
||
| .PHONY: help | ||
| help: | ||
| @echo "Available targets:" | ||
| @echo " base-image - Build the shared base image with pynumaflow" | ||
| @echo " image - Build UDF image with optimized multi-stage build" | ||
| @echo " image-fast - Build UDF image using shared base (fastest)" | ||
| @echo " image-push - Build and push multi-platform image" | ||
| @echo " update - Update poetry dependencies" | ||
| @echo " clean - Remove built images" | ||
| @echo " help - Show this help message" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| # Stream Sorter | ||
|
|
||
| An example User Defined Function that sorts the incoming stream by event time. | ||
|
|
||
| ### Applying the Pipeline | ||
|
|
||
| To apply the pipeline, use the following command: | ||
|
|
||
| ```shell | ||
| kubectl apply -f pipeline.yaml | ||
| ``` | ||
|
|
||
| ### Publish messages | ||
|
|
||
| Port-forward the HTTP endpoint, and make POST requests using curl. Remember to replace xxxx with the appropriate pod names. | ||
|
|
||
| ```shell | ||
| kubectl port-forward stream-sorter-http-one-0-xxxx 8444:8443 | ||
|
|
||
| # Post data to the HTTP endpoint | ||
| curl -kq -X POST -d "101" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 60000" | ||
| curl -kq -X POST -d "102" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 61000" | ||
| curl -kq -X POST -d "103" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 62000" | ||
| curl -kq -X POST -d "104" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 63000" | ||
| ``` | ||
|
|
||
| ```shell | ||
| kubectl port-forward stream-sorter-http-two-0-xxxx 8445:8443 | ||
|
|
||
| # Post data to the HTTP endpoint | ||
| curl -kq -X POST -d "105" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 70000" | ||
| curl -kq -X POST -d "106" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 71000" | ||
| curl -kq -X POST -d "107" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 72000" | ||
| curl -kq -X POST -d "108" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 73000" | ||
| ``` | ||
|
|
||
| ### Verify the output | ||
|
|
||
| ```shell | ||
| kubectl logs -f stream-sorter-log-sink-0-xxxx | ||
| ``` | ||
|
|
||
| The output should be sorted by event time. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| #!/bin/sh | ||
| set -eux | ||
|
|
||
| python example.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import logging | ||
| import os | ||
| from collections.abc import AsyncIterable | ||
| from datetime import datetime | ||
|
|
||
| from pynumaflow import setup_logging | ||
| from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer | ||
| from pynumaflow.accumulator import ( | ||
| Message, | ||
| Datum, | ||
| ) | ||
| from pynumaflow.shared.asynciter import NonBlockingIterator | ||
|
|
||
| _LOGGER = setup_logging(__name__) | ||
| if os.getenv("PYTHONDEBUG"): | ||
| _LOGGER.setLevel(logging.DEBUG) | ||
|
|
||
|
|
||
| class StreamSorter(Accumulator): | ||
| def __init__(self): | ||
| _LOGGER.info("StreamSorter initialized") | ||
| self.latest_wm = datetime.fromtimestamp(-1) | ||
| self.sorted_buffer: list[Datum] = [] | ||
|
|
||
| async def handler( | ||
| self, | ||
| datums: AsyncIterable[Datum], | ||
| output: NonBlockingIterator, | ||
| ): | ||
| _LOGGER.info("StreamSorter handler started") | ||
| async for datum in datums: | ||
| _LOGGER.info( | ||
| f"Received datum with event time: {datum.event_time}, " | ||
| f"Current latest watermark: {self.latest_wm}, " | ||
| f"Datum watermark: {datum.watermark}" | ||
| ) | ||
|
|
||
| # If watermark has moved forward | ||
| if datum.watermark and datum.watermark > self.latest_wm: | ||
| self.latest_wm = datum.watermark | ||
| await self.flush_buffer(output) | ||
|
|
||
| self.insert_sorted(datum) | ||
|
|
||
| def insert_sorted(self, datum: Datum): | ||
| # Binary insert to keep sorted buffer in order | ||
| left, right = 0, len(self.sorted_buffer) | ||
| while left < right: | ||
| mid = (left + right) // 2 | ||
| if self.sorted_buffer[mid].event_time > datum.event_time: | ||
| right = mid | ||
| else: | ||
| left = mid + 1 | ||
| self.sorted_buffer.insert(left, datum) | ||
|
|
||
| async def flush_buffer(self, output: NonBlockingIterator): | ||
| _LOGGER.info(f"Watermark updated, flushing sortedBuffer: {self.latest_wm}") | ||
| i = 0 | ||
| for datum in self.sorted_buffer: | ||
| if datum.event_time > self.latest_wm: | ||
| break | ||
| await output.put(Message.from_datum(datum)) | ||
| _LOGGER.info(f"Sent datum with event time: {datum.event_time}") | ||
| i += 1 | ||
| # Remove flushed items | ||
| self.sorted_buffer = self.sorted_buffer[i:] | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| grpc_server = None | ||
| grpc_server = AccumulatorAsyncServer(StreamSorter) | ||
| grpc_server.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| apiVersion: numaflow.numaproj.io/v1alpha1 | ||
| kind: Pipeline | ||
| metadata: | ||
| name: stream-sorter | ||
| spec: | ||
| limits: | ||
| readBatchSize: 1 | ||
| vertices: | ||
| - name: http-one | ||
| scale: | ||
| min: 1 | ||
| max: 1 | ||
| source: | ||
| http: {} | ||
| - name: http-two | ||
| scale: | ||
| min: 1 | ||
| max: 1 | ||
| source: | ||
| http: {} | ||
| - name: py-accum | ||
| udf: | ||
| container: | ||
| image: quay.io/numaio/numaflow-python/streamsorter:stable | ||
| imagePullPolicy: Always | ||
| env: | ||
| - name: PYTHONDEBUG | ||
| value: "true" | ||
| groupBy: | ||
| window: | ||
| accumulator: | ||
| timeout: 10s | ||
| keyed: true | ||
| storage: | ||
| persistentVolumeClaim: | ||
| volumeSize: 1Gi | ||
| - name: py-sink | ||
| scale: | ||
| min: 1 | ||
| max: 1 | ||
| sink: | ||
| log: {} | ||
| edges: | ||
| - from: http-one | ||
| to: py-accum | ||
| - from: http-two | ||
| to: py-accum | ||
| - from: py-accum | ||
| to: py-sink |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| [tool.poetry] | ||
| name = "stream-sorter" | ||
| version = "0.2.4" | ||
| description = "" | ||
| authors = ["Numaflow developers"] | ||
|
|
||
| [tool.poetry.dependencies] | ||
| python = "~3.10" | ||
| pynumaflow = { path = "../../../"} | ||
|
|
||
| [build-system] | ||
| requires = ["poetry-core>=1.0.0"] | ||
| build-backend = "poetry.core.masonry.api" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| from pynumaflow.accumulator._dtypes import ( | ||
| Message, | ||
| Datum, | ||
| IntervalWindow, | ||
| Metadata, | ||
| DROP, | ||
| KeyedWindow, | ||
| Accumulator, | ||
| ) | ||
| from pynumaflow.accumulator.async_server import AccumulatorAsyncServer | ||
|
|
||
| __all__ = [ | ||
| "Message", | ||
| "Datum", | ||
| "IntervalWindow", | ||
| "Metadata", | ||
| "DROP", | ||
| "AccumulatorAsyncServer", | ||
| "KeyedWindow", | ||
| "Accumulator", | ||
| ] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.