Skip to content

Commit b46408a

Browse files
authored
Merge branch 'main' into pr
2 parents 7580909 + 3ed02e6 commit b46408a

17 files changed

Lines changed: 1035 additions & 2 deletions

File tree

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup"
18+
19+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sourcetransform/async_event_time_filter"
20+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
21+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
22+
23+
RUN apt-get update \
24+
&& apt-get install --no-install-recommends -y \
25+
curl \
26+
wget \
27+
# deps for building python deps
28+
build-essential \
29+
&& apt-get install -y git \
30+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
31+
\
32+
# install dumb-init
33+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
34+
&& chmod +x /dumb-init \
35+
&& curl -sSL https://install.python-poetry.org | python3 -
36+
37+
####################################################################################################
38+
# udf: used for running the udf vertices
39+
####################################################################################################
40+
FROM builder AS udf
41+
42+
WORKDIR $PYSETUP_PATH
43+
COPY ./ ./
44+
45+
WORKDIR $EXAMPLE_PATH
46+
RUN poetry lock
47+
RUN poetry install --no-cache --no-root && \
48+
rm -rf ~/.cache/pypoetry/
49+
50+
RUN chmod +x entry.sh
51+
52+
ENTRYPOINT ["/dumb-init", "--"]
53+
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]
54+
55+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/async-mapt-event-time-filter:${TAG}
4+
DOCKER_FILE_PATH = examples/sourcetransform/async_event_time_filter/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}:${TAG}; fi
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import datetime
2+
import logging
3+
4+
from pynumaflow.sourcetransformer import Messages, Message, Datum
5+
from pynumaflow.sourcetransformer import SourceTransformAsyncServer
6+
7+
"""
8+
This is a simple User Defined Function example which receives a message, applies the following
9+
data transformation, and returns the message.
10+
If the message event time is before year 2022, drop the message with event time unchanged.
11+
If it's within year 2022, update the tag to "within_year_2022" and
12+
update the message event time to Jan 1st 2022.
13+
Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the
14+
message event time to Jan 1st 2023.
15+
"""
16+
17+
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
18+
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
19+
20+
21+
async def my_handler(keys: list[str], datum: Datum) -> Messages:
22+
val = datum.value
23+
event_time = datum.event_time
24+
messages = Messages()
25+
26+
if event_time < january_first_2022:
27+
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
28+
messages.append(Message.to_drop(event_time))
29+
elif event_time < january_first_2023:
30+
logging.info(
31+
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
32+
event_time,
33+
)
34+
messages.append(
35+
Message(value=val, event_time=january_first_2022, tags=["within_year_2022"])
36+
)
37+
else:
38+
logging.info(
39+
"Got event time:%s, it is after year 2022, so forwarding to after_year_2022", event_time
40+
)
41+
messages.append(Message(value=val, event_time=january_first_2023, tags=["after_year_2022"]))
42+
43+
return messages
44+
45+
46+
if __name__ == "__main__":
47+
grpc_server = SourceTransformAsyncServer(my_handler)
48+
grpc_server.start()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "async-mapt-event-time-filter"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
readme = "README.md"
7+
packages = [{include = "mapt_event_time_filter"}]
8+
9+
[tool.poetry.dependencies]
10+
python = ">=3.9, <3.12"
11+
pynumaflow = { path = "../../../"}
12+
13+
[build-system]
14+
requires = ["poetry-core"]
15+
build-backend = "poetry.core.masonry.api"

pynumaflow/_constants.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
SIDE_INPUT_DIR_PATH = "/var/numaflow/side-inputs"
88
ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
99

10-
# Get container type from env var, default to unknown-container
10+
# Error Constants
11+
RUNTIME_APPLICATION_ERRORS_PATH = "/var/numaflow/runtime/application-errors"
12+
CURRENT_CRITICAL_ERROR_FILE = "current-udf.json"
13+
INTERNAL_ERROR_CODE = "Internal error"
1114
CONTAINER_TYPE = os.getenv(ENV_UD_CONTAINER_TYPE, "unknown-container")
12-
# UDF exception error string with container type
1315
ERR_UDF_EXCEPTION_STRING = f"UDF_EXECUTION_ERROR({CONTAINER_TYPE})"
1416

1517
# Socket configs

pynumaflow/errors/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from pynumaflow.errors.errors import persist_critical_error
2+
3+
__all__ = ["persist_critical_error"]

pynumaflow/errors/_dtypes.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from dataclasses import dataclass, asdict
2+
3+
4+
@dataclass
5+
class _RuntimeErrorEntry:
6+
"""Represents a runtime error entry to be persisted."""
7+
8+
container: str
9+
timestamp: int
10+
code: str
11+
message: str
12+
details: str
13+
14+
def to_dict(self) -> dict:
15+
"""Converts the dataclass instance to a dictionary."""
16+
return asdict(self)

pynumaflow/errors/errors.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import os
2+
import json
3+
import threading
4+
import time
5+
from pynumaflow._constants import (
6+
CONTAINER_TYPE,
7+
RUNTIME_APPLICATION_ERRORS_PATH,
8+
CURRENT_CRITICAL_ERROR_FILE,
9+
INTERNAL_ERROR_CODE,
10+
)
11+
from pynumaflow.errors._dtypes import _RuntimeErrorEntry
12+
from typing import Union
13+
14+
15+
class _PersistErrorOnce:
16+
"""Ensures that the persist_critical_error function is executed only once."""
17+
18+
def __init__(self):
19+
self.done = False
20+
self.lock = threading.Lock()
21+
22+
def execute(self, func, *args, **kwargs):
23+
with self.lock:
24+
if self.done:
25+
raise RuntimeError("Persist critical error function has already been executed.")
26+
self.done = True
27+
return func(*args, **kwargs)
28+
29+
30+
_persist_error_once = _PersistErrorOnce()
31+
32+
33+
def persist_critical_error(
34+
error_code: str, error_message: str, error_details: str
35+
) -> Union[RuntimeError, None]:
36+
"""
37+
Persists a critical error to a file. This function will only execute once.
38+
Logs the error if persisting to the file fails.
39+
Returns None if successful, or raises RuntimeError if already executed.
40+
"""
41+
try:
42+
_persist_error_once.execute(
43+
_persist_critical_error_to_file,
44+
error_code,
45+
error_message,
46+
error_details,
47+
RUNTIME_APPLICATION_ERRORS_PATH,
48+
)
49+
except RuntimeError as e:
50+
return e
51+
except Exception as e:
52+
print(f"Error in persisting critical error: {e}")
53+
return None
54+
55+
56+
def _persist_critical_error_to_file(
57+
error_code: str, error_message: str, error_details: str, dir_path: str
58+
):
59+
"""Internal function to persist a critical error to a file."""
60+
61+
os.makedirs(dir_path, mode=0o777, exist_ok=True)
62+
container_dir = os.path.join(dir_path, CONTAINER_TYPE)
63+
os.makedirs(container_dir, mode=0o777, exist_ok=True)
64+
65+
current_file_path = os.path.join(container_dir, CURRENT_CRITICAL_ERROR_FILE)
66+
error_code = error_code or INTERNAL_ERROR_CODE
67+
current_timestamp = int(time.time())
68+
69+
runtime_error_entry = _RuntimeErrorEntry(
70+
container=CONTAINER_TYPE,
71+
timestamp=current_timestamp,
72+
code=error_code,
73+
message=error_message,
74+
details=error_details,
75+
)
76+
77+
with open(current_file_path, "w") as f:
78+
json.dump(runtime_error_entry.to_dict(), f)
79+
80+
final_file_name = f"{current_timestamp}-udf.json"
81+
final_file_path = os.path.join(container_dir, final_file_name)
82+
os.rename(current_file_path, final_file_path)

pynumaflow/sourcetransformer/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88
from pynumaflow.sourcetransformer.multiproc_server import SourceTransformMultiProcServer
99
from pynumaflow.sourcetransformer.server import SourceTransformServer
10+
from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer
1011

1112
__all__ = [
1213
"Message",
@@ -16,4 +17,5 @@
1617
"SourceTransformServer",
1718
"SourceTransformer",
1819
"SourceTransformMultiProcServer",
20+
"SourceTransformAsyncServer",
1921
]

0 commit comments

Comments
 (0)