From 266d9d3e06c367b10285dd77887cef9c4ae21fe0 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 24 Mar 2025 14:02:45 -0700 Subject: [PATCH 1/4] add proto Signed-off-by: Sidhant Kohli --- Makefile | 3 +- pynumaflow/proto/accumulator/__init__.py | 0 .../proto/accumulator/accumulator.proto | 81 +++++++++++ .../proto/accumulator/accumulator_pb2.py | 54 +++++++ .../proto/accumulator/accumulator_pb2.pyi | 134 ++++++++++++++++++ .../proto/accumulator/accumulator_pb2_grpc.py | 134 ++++++++++++++++++ 6 files changed, 404 insertions(+), 2 deletions(-) create mode 100644 pynumaflow/proto/accumulator/__init__.py create mode 100644 pynumaflow/proto/accumulator/accumulator.proto create mode 100644 pynumaflow/proto/accumulator/accumulator_pb2.py create mode 100644 pynumaflow/proto/accumulator/accumulator_pb2.pyi create mode 100644 pynumaflow/proto/accumulator/accumulator_pb2_grpc.py diff --git a/Makefile b/Makefile index b403ac66..82696bbb 100644 --- a/Makefile +++ b/Makefile @@ -28,12 +28,11 @@ setup: proto: python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sinker -I=pynumaflow/proto/sinker --python_out=pynumaflow/proto/sinker --grpc_python_out=pynumaflow/proto/sinker pynumaflow/proto/sinker/*.proto python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/mapper -I=pynumaflow/proto/mapper --python_out=pynumaflow/proto/mapper --grpc_python_out=pynumaflow/proto/mapper pynumaflow/proto/mapper/*.proto - python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/mapstreamer -I=pynumaflow/proto/mapstreamer --python_out=pynumaflow/proto/mapstreamer --grpc_python_out=pynumaflow/proto/mapstreamer pynumaflow/proto/mapstreamer/*.proto python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/reducer -I=pynumaflow/proto/reducer --python_out=pynumaflow/proto/reducer --grpc_python_out=pynumaflow/proto/reducer pynumaflow/proto/reducer/*.proto python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcetransformer -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sideinput -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcer -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto - python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/batchmapper -I=pynumaflow/proto/batchmapper --python_out=pynumaflow/proto/batchmapper --grpc_python_out=pynumaflow/proto/batchmapper pynumaflow/proto/batchmapper/*.proto + python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/accumulator -I=pynumaflow/proto/accumulator --python_out=pynumaflow/proto/accumulator --grpc_python_out=pynumaflow/proto/accumulator pynumaflow/proto/accumulator/*.proto sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py diff --git a/pynumaflow/proto/accumulator/__init__.py b/pynumaflow/proto/accumulator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pynumaflow/proto/accumulator/accumulator.proto b/pynumaflow/proto/accumulator/accumulator.proto new file mode 100644 index 00000000..f25691c1 --- /dev/null +++ b/pynumaflow/proto/accumulator/accumulator.proto @@ -0,0 +1,81 @@ +syntax = "proto3"; + +option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/accumulator/v1"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + + +package accumulator.v1; + +// AccumulatorWindow describes a special kind of SessionWindow (similar to Global Window) where output should +// always have monotonically increasing WM but it can be manipulated through event-time by reordering the messages. +// NOTE: Quite powerful, should not be abused; it can cause stalling of pipelines and leaks +service Accumulator { + // AccumulateFn applies a accumulate function to a request stream. + rpc AccumulateFn(stream AccumulatorRequest) returns (stream AccumulatorResponse); + + // IsReady is the heartbeat endpoint for gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); +} + +// Payload represents a payload element. +message Payload { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + string id = 5; + map headers = 6; +} + +// AccumulatorRequest represents a request element. +message AccumulatorRequest { + // WindowOperation represents a window operation. + // For Unaligned windows, OPEN, APPEND and CLOSE events are sent. + message WindowOperation { + enum Event { + OPEN = 0; + CLOSE = 1; + APPEND = 2; + } + Event event = 1; + KeyedWindow keyedWindow = 2; + } + + Payload payload = 1; + WindowOperation operation = 2; + optional Handshake handshake = 3; +} + + +// Window represents a window. +message KeyedWindow { + google.protobuf.Timestamp start = 1; + google.protobuf.Timestamp end = 2; + string slot = 3; + repeated string keys = 4; +} + +// AccumulatorResponse represents a response element. +message AccumulatorResponse { + Payload payload = 1; + // window represents a window to which the result belongs. + KeyedWindow window = 2; + repeated string tags = 3; + optional Handshake handshake = 4; + // EOF represents the end of the response for a window. + bool EOF = 5; +} + + +// ReadyResponse is the health check result. +message ReadyResponse { + bool ready = 1; +} + +// Handshake message between client and server to indicate the start of transmission. +message Handshake { + // Required field indicating the start of transmission. + bool sot = 1; +} diff --git a/pynumaflow/proto/accumulator/accumulator_pb2.py b/pynumaflow/proto/accumulator/accumulator_pb2.py new file mode 100644 index 00000000..422aacc1 --- /dev/null +++ b/pynumaflow/proto/accumulator/accumulator_pb2.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: accumulator.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x11\x61\x63\x63umulator.proto\x12\x0e\x61\x63\x63umulator.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto"\xf8\x01\n\x07Payload\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12\x35\n\x07headers\x18\x06 \x03(\x0b\x32$.accumulator.v1.Payload.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01"\xff\x02\n\x12\x41\x63\x63umulatorRequest\x12(\n\x07payload\x18\x01 \x01(\x0b\x32\x17.accumulator.v1.Payload\x12\x45\n\toperation\x18\x02 \x01(\x0b\x32\x32.accumulator.v1.AccumulatorRequest.WindowOperation\x12\x31\n\thandshake\x18\x03 \x01(\x0b\x32\x19.accumulator.v1.HandshakeH\x00\x88\x01\x01\x1a\xb6\x01\n\x0fWindowOperation\x12G\n\x05\x65vent\x18\x01 \x01(\x0e\x32\x38.accumulator.v1.AccumulatorRequest.WindowOperation.Event\x12\x30\n\x0bkeyedWindow\x18\x02 \x01(\x0b\x32\x1b.accumulator.v1.KeyedWindow"(\n\x05\x45vent\x12\x08\n\x04OPEN\x10\x00\x12\t\n\x05\x43LOSE\x10\x01\x12\n\n\x06\x41PPEND\x10\x02\x42\x0c\n\n_handshake"}\n\x0bKeyedWindow\x12)\n\x05start\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\x03\x65nd\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0c\n\x04slot\x18\x03 \x01(\t\x12\x0c\n\x04keys\x18\x04 \x03(\t"\xc8\x01\n\x13\x41\x63\x63umulatorResponse\x12(\n\x07payload\x18\x01 \x01(\x0b\x32\x17.accumulator.v1.Payload\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.accumulator.v1.KeyedWindow\x12\x0c\n\x04tags\x18\x03 \x03(\t\x12\x31\n\thandshake\x18\x04 \x01(\x0b\x32\x19.accumulator.v1.HandshakeH\x00\x88\x01\x01\x12\x0b\n\x03\x45OF\x18\x05 \x01(\x08\x42\x0c\n\n_handshake"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\x32\xac\x01\n\x0b\x41\x63\x63umulator\x12[\n\x0c\x41\x63\x63umulateFn\x12".accumulator.v1.AccumulatorRequest\x1a#.accumulator.v1.AccumulatorResponse(\x01\x30\x01\x12@\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x1d.accumulator.v1.ReadyResponseB?Z=github.com/numaproj/numaflow-go/pkg/apis/proto/accumulator/v1b\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "accumulator_pb2", _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + _globals["DESCRIPTOR"]._options = None + _globals[ + "DESCRIPTOR" + ]._serialized_options = b"Z=github.com/numaproj/numaflow-go/pkg/apis/proto/accumulator/v1" + _globals["_PAYLOAD_HEADERSENTRY"]._options = None + _globals["_PAYLOAD_HEADERSENTRY"]._serialized_options = b"8\001" + _globals["_PAYLOAD"]._serialized_start = 100 + _globals["_PAYLOAD"]._serialized_end = 348 + _globals["_PAYLOAD_HEADERSENTRY"]._serialized_start = 302 + _globals["_PAYLOAD_HEADERSENTRY"]._serialized_end = 348 + _globals["_ACCUMULATORREQUEST"]._serialized_start = 351 + _globals["_ACCUMULATORREQUEST"]._serialized_end = 734 + _globals["_ACCUMULATORREQUEST_WINDOWOPERATION"]._serialized_start = 538 + _globals["_ACCUMULATORREQUEST_WINDOWOPERATION"]._serialized_end = 720 + _globals["_ACCUMULATORREQUEST_WINDOWOPERATION_EVENT"]._serialized_start = 680 + _globals["_ACCUMULATORREQUEST_WINDOWOPERATION_EVENT"]._serialized_end = 720 + _globals["_KEYEDWINDOW"]._serialized_start = 736 + _globals["_KEYEDWINDOW"]._serialized_end = 861 + _globals["_ACCUMULATORRESPONSE"]._serialized_start = 864 + _globals["_ACCUMULATORRESPONSE"]._serialized_end = 1064 + _globals["_READYRESPONSE"]._serialized_start = 1066 + _globals["_READYRESPONSE"]._serialized_end = 1096 + _globals["_HANDSHAKE"]._serialized_start = 1098 + _globals["_HANDSHAKE"]._serialized_end = 1122 + _globals["_ACCUMULATOR"]._serialized_start = 1125 + _globals["_ACCUMULATOR"]._serialized_end = 1297 +# @@protoc_insertion_point(module_scope) diff --git a/pynumaflow/proto/accumulator/accumulator_pb2.pyi b/pynumaflow/proto/accumulator/accumulator_pb2.pyi new file mode 100644 index 00000000..5c893f90 --- /dev/null +++ b/pynumaflow/proto/accumulator/accumulator_pb2.pyi @@ -0,0 +1,134 @@ +from google.protobuf import empty_pb2 as _empty_pb2 +from google.protobuf import timestamp_pb2 as _timestamp_pb2 +from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ( + ClassVar as _ClassVar, + Iterable as _Iterable, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) + +DESCRIPTOR: _descriptor.FileDescriptor + +class Payload(_message.Message): + __slots__ = ("keys", "value", "event_time", "watermark", "id", "headers") + + class HeadersEntry(_message.Message): + __slots__ = ("key", "value") + KEY_FIELD_NUMBER: _ClassVar[int] + VALUE_FIELD_NUMBER: _ClassVar[int] + key: str + value: str + def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ... + KEYS_FIELD_NUMBER: _ClassVar[int] + VALUE_FIELD_NUMBER: _ClassVar[int] + EVENT_TIME_FIELD_NUMBER: _ClassVar[int] + WATERMARK_FIELD_NUMBER: _ClassVar[int] + ID_FIELD_NUMBER: _ClassVar[int] + HEADERS_FIELD_NUMBER: _ClassVar[int] + keys: _containers.RepeatedScalarFieldContainer[str] + value: bytes + event_time: _timestamp_pb2.Timestamp + watermark: _timestamp_pb2.Timestamp + id: str + headers: _containers.ScalarMap[str, str] + def __init__( + self, + keys: _Optional[_Iterable[str]] = ..., + value: _Optional[bytes] = ..., + event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., + watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., + id: _Optional[str] = ..., + headers: _Optional[_Mapping[str, str]] = ..., + ) -> None: ... + +class AccumulatorRequest(_message.Message): + __slots__ = ("payload", "operation", "handshake") + + class WindowOperation(_message.Message): + __slots__ = ("event", "keyedWindow") + + class Event(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + OPEN: _ClassVar[AccumulatorRequest.WindowOperation.Event] + CLOSE: _ClassVar[AccumulatorRequest.WindowOperation.Event] + APPEND: _ClassVar[AccumulatorRequest.WindowOperation.Event] + OPEN: AccumulatorRequest.WindowOperation.Event + CLOSE: AccumulatorRequest.WindowOperation.Event + APPEND: AccumulatorRequest.WindowOperation.Event + EVENT_FIELD_NUMBER: _ClassVar[int] + KEYEDWINDOW_FIELD_NUMBER: _ClassVar[int] + event: AccumulatorRequest.WindowOperation.Event + keyedWindow: KeyedWindow + def __init__( + self, + event: _Optional[_Union[AccumulatorRequest.WindowOperation.Event, str]] = ..., + keyedWindow: _Optional[_Union[KeyedWindow, _Mapping]] = ..., + ) -> None: ... + PAYLOAD_FIELD_NUMBER: _ClassVar[int] + OPERATION_FIELD_NUMBER: _ClassVar[int] + HANDSHAKE_FIELD_NUMBER: _ClassVar[int] + payload: Payload + operation: AccumulatorRequest.WindowOperation + handshake: Handshake + def __init__( + self, + payload: _Optional[_Union[Payload, _Mapping]] = ..., + operation: _Optional[_Union[AccumulatorRequest.WindowOperation, _Mapping]] = ..., + handshake: _Optional[_Union[Handshake, _Mapping]] = ..., + ) -> None: ... + +class KeyedWindow(_message.Message): + __slots__ = ("start", "end", "slot", "keys") + START_FIELD_NUMBER: _ClassVar[int] + END_FIELD_NUMBER: _ClassVar[int] + SLOT_FIELD_NUMBER: _ClassVar[int] + KEYS_FIELD_NUMBER: _ClassVar[int] + start: _timestamp_pb2.Timestamp + end: _timestamp_pb2.Timestamp + slot: str + keys: _containers.RepeatedScalarFieldContainer[str] + def __init__( + self, + start: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., + end: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., + slot: _Optional[str] = ..., + keys: _Optional[_Iterable[str]] = ..., + ) -> None: ... + +class AccumulatorResponse(_message.Message): + __slots__ = ("payload", "window", "tags", "handshake", "EOF") + PAYLOAD_FIELD_NUMBER: _ClassVar[int] + WINDOW_FIELD_NUMBER: _ClassVar[int] + TAGS_FIELD_NUMBER: _ClassVar[int] + HANDSHAKE_FIELD_NUMBER: _ClassVar[int] + EOF_FIELD_NUMBER: _ClassVar[int] + payload: Payload + window: KeyedWindow + tags: _containers.RepeatedScalarFieldContainer[str] + handshake: Handshake + EOF: bool + def __init__( + self, + payload: _Optional[_Union[Payload, _Mapping]] = ..., + window: _Optional[_Union[KeyedWindow, _Mapping]] = ..., + tags: _Optional[_Iterable[str]] = ..., + handshake: _Optional[_Union[Handshake, _Mapping]] = ..., + EOF: bool = ..., + ) -> None: ... + +class ReadyResponse(_message.Message): + __slots__ = ("ready",) + READY_FIELD_NUMBER: _ClassVar[int] + ready: bool + def __init__(self, ready: bool = ...) -> None: ... + +class Handshake(_message.Message): + __slots__ = ("sot",) + SOT_FIELD_NUMBER: _ClassVar[int] + sot: bool + def __init__(self, sot: bool = ...) -> None: ... diff --git a/pynumaflow/proto/accumulator/accumulator_pb2_grpc.py b/pynumaflow/proto/accumulator/accumulator_pb2_grpc.py new file mode 100644 index 00000000..f41606dd --- /dev/null +++ b/pynumaflow/proto/accumulator/accumulator_pb2_grpc.py @@ -0,0 +1,134 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from . import accumulator_pb2 as accumulator__pb2 +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + + +class AccumulatorStub(object): + """AccumulatorWindow describes a special kind of SessionWindow (similar to Global Window) where output should + always have monotonically increasing WM but it can be manipulated through event-time by reordering the messages. + NOTE: Quite powerful, should not be abused; it can cause stalling of pipelines and leaks + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.AccumulateFn = channel.stream_stream( + "/accumulator.v1.Accumulator/AccumulateFn", + request_serializer=accumulator__pb2.AccumulatorRequest.SerializeToString, + response_deserializer=accumulator__pb2.AccumulatorResponse.FromString, + ) + self.IsReady = channel.unary_unary( + "/accumulator.v1.Accumulator/IsReady", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=accumulator__pb2.ReadyResponse.FromString, + ) + + +class AccumulatorServicer(object): + """AccumulatorWindow describes a special kind of SessionWindow (similar to Global Window) where output should + always have monotonically increasing WM but it can be manipulated through event-time by reordering the messages. + NOTE: Quite powerful, should not be abused; it can cause stalling of pipelines and leaks + """ + + def AccumulateFn(self, request_iterator, context): + """AccumulateFn applies a accumulate function to a request stream.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def IsReady(self, request, context): + """IsReady is the heartbeat endpoint for gRPC.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_AccumulatorServicer_to_server(servicer, server): + rpc_method_handlers = { + "AccumulateFn": grpc.stream_stream_rpc_method_handler( + servicer.AccumulateFn, + request_deserializer=accumulator__pb2.AccumulatorRequest.FromString, + response_serializer=accumulator__pb2.AccumulatorResponse.SerializeToString, + ), + "IsReady": grpc.unary_unary_rpc_method_handler( + servicer.IsReady, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=accumulator__pb2.ReadyResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "accumulator.v1.Accumulator", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class Accumulator(object): + """AccumulatorWindow describes a special kind of SessionWindow (similar to Global Window) where output should + always have monotonically increasing WM but it can be manipulated through event-time by reordering the messages. + NOTE: Quite powerful, should not be abused; it can cause stalling of pipelines and leaks + """ + + @staticmethod + def AccumulateFn( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, + target, + "/accumulator.v1.Accumulator/AccumulateFn", + accumulator__pb2.AccumulatorRequest.SerializeToString, + accumulator__pb2.AccumulatorResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def IsReady( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/accumulator.v1.Accumulator/IsReady", + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + accumulator__pb2.ReadyResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) From 96b8c739576b7a6cd8c05037cc0c55270433a8ca Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 30 Apr 2025 10:38:53 -0700 Subject: [PATCH 2/4] init accumulator Signed-off-by: Sidhant Kohli --- pynumaflow/_constants.py | 2 + pynumaflow/accumulator/__init__.py | 21 + pynumaflow/accumulator/_dtypes.py | 388 ++++++++++++++++++ pynumaflow/accumulator/async_server.py | 204 +++++++++ pynumaflow/accumulator/servicer/__init__.py | 0 .../accumulator/servicer/async_servicer.py | 123 ++++++ .../accumulator/servicer/task_manager.py | 327 +++++++++++++++ pynumaflow/info/types.py | 2 + 8 files changed, 1067 insertions(+) create mode 100644 pynumaflow/accumulator/__init__.py create mode 100644 pynumaflow/accumulator/_dtypes.py create mode 100644 pynumaflow/accumulator/async_server.py create mode 100644 pynumaflow/accumulator/servicer/__init__.py create mode 100644 pynumaflow/accumulator/servicer/async_servicer.py create mode 100644 pynumaflow/accumulator/servicer/task_manager.py diff --git a/pynumaflow/_constants.py b/pynumaflow/_constants.py index c0b8bf12..7270913d 100644 --- a/pynumaflow/_constants.py +++ b/pynumaflow/_constants.py @@ -24,6 +24,7 @@ MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc" FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock" BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock" +ACCUMULATOR_SOCK_PATH = "/var/run/numaflow/accumulator.sock" # Server information file configs MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info" @@ -34,6 +35,7 @@ SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info" SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info" FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info" +ACCUMULATOR_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info" ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE" UD_CONTAINER_FALLBACK_SINK = "fb-udsink" diff --git a/pynumaflow/accumulator/__init__.py b/pynumaflow/accumulator/__init__.py new file mode 100644 index 00000000..85cf335d --- /dev/null +++ b/pynumaflow/accumulator/__init__.py @@ -0,0 +1,21 @@ +from pynumaflow.accumulator._dtypes import ( + Message, + Datum, + IntervalWindow, + Metadata, + DROP, + ReduceStreamer, + KeyedWindow, +) +from pynumaflow.accumulator.async_server import AccumulatorAsyncServer + +__all__ = [ + "Message", + "Datum", + "IntervalWindow", + "Metadata", + "DROP", + "AccumulatorAsyncServer", + "ReduceStreamer", + "KeyedWindow", +] diff --git a/pynumaflow/accumulator/_dtypes.py b/pynumaflow/accumulator/_dtypes.py new file mode 100644 index 00000000..00e4446f --- /dev/null +++ b/pynumaflow/accumulator/_dtypes.py @@ -0,0 +1,388 @@ +from abc import ABCMeta, abstractmethod +from asyncio import Task +from dataclasses import dataclass +from datetime import datetime +from enum import IntEnum +from typing import TypeVar, Callable, Union, Optional +from collections.abc import AsyncIterable + +from pynumaflow.shared.asynciter import NonBlockingIterator +from pynumaflow._constants import DROP + +M = TypeVar("M", bound="Message") + + +class WindowOperation(IntEnum): + """ + Enumerate the type of Window operation received. + """ + + OPEN = (0,) + CLOSE = (1,) + APPEND = (4,) + + +@dataclass(init=False) +class Datum: + """ + Class to define the important information for the event. + Args: + keys: the keys of the event. + value: the payload of the event. + event_time: the event time of the event. + watermark: the watermark of the event. + >>> # Example usage + >>> from pynumaflow.reducer import Datum + >>> from datetime import datetime, timezone + >>> payload = bytes("test_mock_message", encoding="utf-8") + >>> t1 = datetime.fromtimestamp(1662998400, timezone.utc) + >>> t2 = datetime.fromtimestamp(1662998460, timezone.utc) + >>> msg_headers = {"key1": "value1", "key2": "value2"} + >>> d = Datum( + ... keys=["test_key"], + ... value=payload, + ... event_time=t1, + ... watermark=t2, + ... headers=msg_headers + ... ) + """ + + __slots__ = ("_keys", "_value", "_event_time", "_watermark", "_headers", "_id") + + _keys: list[str] + _value: bytes + _event_time: datetime + _watermark: datetime + _headers: dict[str, str] + _id: str + + def __init__( + self, + keys: list[str], + value: bytes, + event_time: datetime, + watermark: datetime, + id_: str, + headers: Optional[dict[str, str]] = None, + ): + self._keys = keys or list() + self._value = value or b"" + if not isinstance(event_time, datetime): + raise TypeError(f"Wrong data type: {type(event_time)} for Datum.event_time") + self._event_time = event_time + if not isinstance(watermark, datetime): + raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark") + self._watermark = watermark + self._headers = headers or {} + self._id = id_ + + def keys(self) -> list[str]: + """Returns the keys of the event""" + return self._keys + + @property + def value(self) -> bytes: + """Returns the value of the event.""" + return self._value + + @property + def event_time(self) -> datetime: + """Returns the event time of the event.""" + return self._event_time + + @property + def watermark(self) -> datetime: + """Returns the watermark of the event.""" + return self._watermark + + @property + def headers(self) -> dict[str, str]: + """Returns the headers of the event.""" + return self._headers + + @property + def id(self) -> str: + """Returns the id of the event.""" + return self._id + + +@dataclass(init=False) +class IntervalWindow: + """Defines the start and end of the interval window for the event.""" + + __slots__ = ("_start", "_end") + + _start: datetime + _end: datetime + + def __init__(self, start: datetime, end: datetime): + self._start = start + self._end = end + + @property + def start(self): + """Returns the start point of the interval window.""" + return self._start + + @property + def end(self): + """Returns the end point of the interval window.""" + return self._end + + +@dataclass(init=False) +class KeyedWindow: + """ + Defines the window for a accumulator operation which includes the + interval window along with the slot. + """ + + __slots__ = ("_window", "_slot", "_keys") + + _window: IntervalWindow + _slot: str + _keys: list[str] + + def __init__(self, start: datetime, end: datetime, slot: str = "", keys: list[str] = []): + self._window = IntervalWindow(start=start, end=end) + self._slot = slot + self._keys = keys + + @property + def start(self): + """Returns the start point of the interval window.""" + return self._window.start + + @property + def end(self): + """Returns the end point of the interval window.""" + return self._window.end + + @property + def slot(self): + """Returns the slot from the window""" + return self._slot + + @property + def window(self): + """Return the interval window""" + return self._window + + @property + def keys(self): + """Return the keys for window""" + return self._keys + + +@dataclass(init=False) +class Metadata: + """Defines the metadata for the event.""" + + __slots__ = ("_interval_window",) + + _interval_window: IntervalWindow + + def __init__(self, interval_window: IntervalWindow): + self._interval_window = interval_window + + @property + def interval_window(self): + """Returns the interval window for the event.""" + return self._interval_window + + +@dataclass +class AccumulatorResult: + """Defines the object to hold the result of accumulator computation.""" + + __slots__ = ( + "_future", + "_iterator", + "_key", + "_result_queue", + "_consumer_future", + "_latest_watermark", + ) + + _future: Task + _iterator: NonBlockingIterator + _key: list[str] + _result_queue: NonBlockingIterator + _consumer_future: Task + _latest_watermark: datetime + + @property + def future(self): + """Returns the future result of computation.""" + return self._future + + @property + def iterator(self): + """Returns the handle to the producer queue.""" + return self._iterator + + @property + def keys(self) -> list[str]: + """Returns the keys of the partition.""" + return self._key + + @property + def result_queue(self): + """Returns the async queue used to write the output for the tasks""" + return self._result_queue + + @property + def consumer_future(self): + """Returns the async consumer task for the result queue""" + return self._consumer_future + + @property + def latest_watermark(self): + """Returns the latest watermark for task""" + return self._latest_watermark + + def update_watermark(self, new_watermark: datetime): + """Updates the latest watermark value.""" + if not isinstance(new_watermark, datetime): + raise TypeError("new_watermark must be a datetime object") + self._latest_watermark = new_watermark + + +@dataclass +class AccumulatorRequest: + """Defines the object to hold a request for the accumulator operation.""" + + __slots__ = ("_operation", "_windows", "_payload") + + _operation: WindowOperation + _windows: list[KeyedWindow] + _payload: Datum + + def __init__(self, operation: WindowOperation, windows: list[KeyedWindow], payload: Datum): + self._operation = operation + self._windows = windows + self._payload = payload + + @property + def operation(self) -> WindowOperation: + """Returns the future result of computation.""" + return self._operation + + @property + def windows(self) -> list[KeyedWindow]: + """Returns the handle to the producer queue.""" + return self._windows + + @property + def payload(self) -> Datum: + """Returns the payload of the window.""" + return self._payload + + +@dataclass(init=False) +class Message: + """ + Basic datatype for data passing to the next vertex/vertices. + + Args: + value: data in bytes + keys: []string keys for vertex (optional) + tags: []string tags for conditional forwarding (optional) + """ + + __slots__ = ("_value", "_keys", "_tags") + + _value: bytes + _keys: list[str] + _tags: list[str] + + def __init__( + self, + value: bytes, + keys: list[str] = None, + tags: list[str] = None, + ): + """ + Creates a Message object to send value to a vertex. + """ + self._keys = keys or [] + self._tags = tags or [] + self._value = value or b"" + # self._window = window or None + + # returns the Message Object which will be dropped + @classmethod + def to_drop(cls: type[M]) -> M: + return cls(b"", None, [DROP]) + + @property + def value(self) -> bytes: + return self._value + + @property + def keys(self) -> list[str]: + return self._keys + + @property + def tags(self) -> list[str]: + return self._tags + + +AccumulatorAsyncCallable = Callable[ + [list[str], AsyncIterable[Datum], NonBlockingIterator, Metadata], None +] + + +class Accumulator(metaclass=ABCMeta): + """ + Accumulate can read unordered from the input stream and emit the ordered + data to the output stream. Once the watermark (WM) of the output stream progresses, + the data in WAL until that WM will be garbage collected. + NOTE: A message can be silently dropped if need be, + and it will be cleared from the WAL when the WM progresses. + """ + + def __call__(self, *args, **kwargs): + """ + Allow to call handler function directly if class instance is sent + as the accumulator_instance. + """ + return self.handler(*args, **kwargs) + + @abstractmethod + async def handler( + self, + datums: AsyncIterable[Datum], + output: NonBlockingIterator, + ): + """ + Implement this handler function which implements the AccumulatorStreamCallable interface. + """ + pass + + +class _AccumulatorBuilderClass: + """ + Class to build an Accumulator class instance. + Used Internally + + Args: + accumulator_class: the Accumulator class to be used for Accumulator UDF + args: the arguments to be passed to the reducer class + kwargs: the keyword arguments to be passed to the reducer class + """ + + def __init__(self, accumulator_class: type[Accumulator], args: tuple, kwargs: dict): + self._accumulator_class: type[Accumulator] = accumulator_class + self._args = args + self._kwargs = kwargs + + def create(self) -> Accumulator: + """ + Create a new ReduceStreamer instance. + """ + return self._accumulator_class(*self._args, **self._kwargs) + + +# AccumulatorStreamCallable is a callable which can be used as a handler for the Reduce UDF. +AccumulatorStreamCallable = Union[AccumulatorAsyncCallable, type[Accumulator]] diff --git a/pynumaflow/accumulator/async_server.py b/pynumaflow/accumulator/async_server.py new file mode 100644 index 00000000..16569ad4 --- /dev/null +++ b/pynumaflow/accumulator/async_server.py @@ -0,0 +1,204 @@ +import inspect +from typing import Optional + +import aiorun +import grpc + +from pynumaflow.accumulator.servicer.async_servicer import AsyncAccumulatorServicer +from pynumaflow.info.types import ServerInfo, ContainerType, MINIMUM_NUMAFLOW_VERSION +from pynumaflow.proto.accumulator import accumulator_pb2_grpc + + +from pynumaflow._constants import ( + MAX_MESSAGE_SIZE, + NUM_THREADS_DEFAULT, + _LOGGER, + MAX_NUM_THREADS, + ACCUMULATOR_SOCK_PATH, + ACCUMULATOR_SERVER_INFO_FILE_PATH, +) + +from pynumaflow.accumulator._dtypes import ( + AccumulatorStreamCallable, + _AccumulatorBuilderClass, + Accumulator, +) + +from pynumaflow.shared.server import NumaflowServer, check_instance, start_async_server + + +def get_handler( + accumulator_handler: AccumulatorStreamCallable, + init_args: tuple = (), + init_kwargs: Optional[dict] = None, +): + """ + Get the correct handler type based on the arguments passed + """ + if inspect.isfunction(accumulator_handler): + if init_args or init_kwargs: + # if the init_args or init_kwargs are passed, then the accumulator_instance + # can only be of class Accumulator type + raise TypeError("Cannot pass function handler with init args or kwargs") + # return the function handler + return accumulator_handler + elif not check_instance(accumulator_handler, Accumulator) and issubclass( + accumulator_handler, Accumulator + ): + # if handler is type of Class Accumulator, create a new instance of + # a AccumulatorBuilderClass + return _AccumulatorBuilderClass(accumulator_handler, init_args, init_kwargs) + else: + _LOGGER.error( + _error_msg := f"Invalid Class Type {accumulator_handler}: " + f"Please make sure the class type is passed, and it is a subclass of Accumulator" + ) + raise TypeError(_error_msg) + + +class AccumulatorAsyncServer(NumaflowServer): + """ + Class for a new Accumulator Server instance. + A new servicer instance is created and attached to the server. + The server instance is returned. + Args: + accumulator_instance: The reducer instance to be used for + Reduce Streaming UDF + init_args: The arguments to be passed to the accumulator_handler + init_kwargs: The keyword arguments to be passed to the + accumulator_handler + sock_path: The UNIX socket path to be used for the server + max_message_size: The max message size in bytes the server can receive and send + max_threads: The max number of threads to be spawned; + defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + Example invocation: + import os + from collections.abc import AsyncIterable + from pynumaflow.accumulator import Messages, Message, Datum, Metadata, + AccumulatorAsyncServer, Accumulator + + class ReduceCounter(Accumulator): + def __init__(self, counter): + self.counter = counter + + async def handler( + self, + keys: list[str], + datums: AsyncIterable[Datum], + output: NonBlockingIterator, + md: Metadata, + ): + async for _ in datums: + self.counter += 1 + if self.counter > 20: + msg = f"counter:{self.counter}" + await output.put(Message(str.encode(msg), keys=keys)) + self.counter = 0 + msg = f"counter:{self.counter}" + await output.put(Message(str.encode(msg), keys=keys)) + + async def reduce_handler( + keys: list[str], + datums: AsyncIterable[Datum], + output: NonBlockingIterator, + md: Metadata, + ): + counter = 0 + async for _ in datums: + counter += 1 + if counter > 20: + msg = f"counter:{counter}" + await output.put(Message(str.encode(msg), keys=keys)) + counter = 0 + msg = f"counter:{counter}" + await output.put(Message(str.encode(msg), keys=keys)) + + if __name__ == "__main__": + invoke = os.getenv("INVOKE", "func_handler") + if invoke == "class": + # Here we are using the class instance as the reducer_instance + # which will be used to invoke the handler function. + # We are passing the init_args for the class instance. + grpc_server = AccumulatorAsyncServer(ReduceCounter, init_args=(0,)) + else: + # Here we are using the handler function directly as the reducer_instance. + grpc_server = AccumulatorAsyncServer(reduce_handler) + grpc_server.start() + + """ + + def __init__( + self, + accumulator_instance: AccumulatorStreamCallable, + init_args: tuple = (), + init_kwargs: dict = None, + sock_path=ACCUMULATOR_SOCK_PATH, + max_message_size=MAX_MESSAGE_SIZE, + max_threads=NUM_THREADS_DEFAULT, + server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH, + ): + """ + Create a new grpc Accumulator Server instance. + A new servicer instance is created and attached to the server. + The server instance is returned. + Args: + accumulator_instance: The Accumulator instance to be used for + Accumulator UDF + init_args: The arguments to be passed to the accumulator_handler + init_kwargs: The keyword arguments to be passed to the + accumulator_handler + sock_path: The UNIX socket path to be used for the server + max_message_size: The max message size in bytes the server can receive and send + max_threads: The max number of threads to be spawned; + defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + """ + if init_kwargs is None: + init_kwargs = {} + self.accumulator_handler = get_handler(accumulator_instance, init_args, init_kwargs) + self.sock_path = f"unix://{sock_path}" + self.max_message_size = max_message_size + self.max_threads = min(max_threads, MAX_NUM_THREADS) + self.server_info_file = server_info_file + + self._server_options = [ + ("grpc.max_send_message_length", self.max_message_size), + ("grpc.max_receive_message_length", self.max_message_size), + ] + # Get the servicer instance for the async server + self.servicer = AsyncAccumulatorServicer(self.accumulator_handler) + + def start(self): + """ + Starter function for the Async server class, need a separate caller + so that all the async coroutines can be started from a single context + """ + _LOGGER.info( + "Starting Async Accumulator Server", + ) + aiorun.run(self.aexec(), use_uvloop=True) + + async def aexec(self): + """ + Starts the Async gRPC server on the given UNIX socket with + given max threads. + """ + # As the server is async, we need to create a new server instance in the + # same thread as the event loop so that all the async calls are made in the + # same context + # Create a new async server instance and add the servicer to it + server = grpc.aio.server(options=self._server_options) + server.add_insecure_port(self.sock_path) + accumulator_pb2_grpc.add_AccumulatorServicer_to_server(self.servicer, server) + + serv_info = ServerInfo.get_default_server_info() + serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Accumulator] + await start_async_server( + server_async=server, + sock_path=self.sock_path, + max_threads=self.max_threads, + cleanup_coroutines=list(), + server_info_file=self.server_info_file, + server_info=serv_info, + ) diff --git a/pynumaflow/accumulator/servicer/__init__.py b/pynumaflow/accumulator/servicer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pynumaflow/accumulator/servicer/async_servicer.py b/pynumaflow/accumulator/servicer/async_servicer.py new file mode 100644 index 00000000..726711a5 --- /dev/null +++ b/pynumaflow/accumulator/servicer/async_servicer.py @@ -0,0 +1,123 @@ +import asyncio +from collections.abc import AsyncIterable +from typing import Union + +from google.protobuf import empty_pb2 as _empty_pb2 + +from pynumaflow._constants import ERR_UDF_EXCEPTION_STRING +from pynumaflow.proto.accumulator import accumulator_pb2, accumulator_pb2_grpc +from pynumaflow.accumulator._dtypes import ( + Datum, + AccumulatorAsyncCallable, + _AccumulatorBuilderClass, + AccumulatorRequest, +) +from pynumaflow.accumulator.servicer.task_manager import TaskManager +from pynumaflow.shared.server import handle_async_error +from pynumaflow.types import NumaflowServicerContext + + +async def datum_generator( + request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest], +) -> AsyncIterable[AccumulatorRequest]: + """Generate a AccumulatorRequest from a AccumulatorRequest proto message.""" + async for d in request_iterator: + reduce_request = AccumulatorRequest( + operation=d.operation.event, + windows=d.operation.windows, + payload=Datum( + keys=list(d.payload.keys), + value=d.payload.value, + event_time=d.payload.event_time.ToDatetime(), + watermark=d.payload.watermark.ToDatetime(), + headers=dict(d.payload.headers), + ), + ) + yield reduce_request + + +class AsyncAccumulatorServicer(accumulator_pb2_grpc.AccumulatorServicer): + """ + This class is used to create a new grpc Reduce servicer instance. + Provides the functionality for the required rpc methods. + """ + + def __init__( + self, + handler: Union[AccumulatorAsyncCallable, _AccumulatorBuilderClass], + ): + # The accumulator handler can be a function or a builder class instance. + self.__accumulator_handler: Union[ + AccumulatorAsyncCallable, _AccumulatorBuilderClass + ] = handler + + async def AccumulateFn( + self, + request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest], + context: NumaflowServicerContext, + ) -> accumulator_pb2.AccumulatorResponse: + """ + Applies a accumulator function to a datum stream. + The pascal case function name comes from the proto accumulator_pb2_grpc.py file. + """ + # Create a task manager instance + task_manager = TaskManager(handler=self.__accumulator_handler) + + # Create a consumer task to read from the result queue + # All the results from the accumulator function will be sent to the result queue + # We will read from the result queue and send the results to the client + consumer = task_manager.global_result_queue.read_iterator() + + # Create an async iterator from the request iterator + # datum_iterator = datum_generator(request_iterator=request_iterator) + + # Create a process_input_stream task in the task manager, + # this would read from the datum iterator + # and then create the required tasks to process the data requests + # The results from these tasks are then sent to the result queue + producer = asyncio.create_task(task_manager.process_input_stream(request_iterator)) + + # Start the consumer task where we read from the result queue + # and send the results to the client + # The task manager can write the following to the result queue: + # 1. A accumulator_pb2.ReduceResponse message + # This is the result of the reduce function, it contains the window and the + # result of the reduce function + # The result of the reduce function is a accumulator_pb2.ReduceResponse message and can be + # directly sent to the client + # + # 2. An Exception + # Any exceptions that occur during the processing reduce function tasks are + # sent to the result queue. We then forward these exception to the client + # + # 3. A accumulator_pb2.ReduceResponse message with EOF=True + # This is a special message that indicates the end of the processing for a window + # When we get this message, we send an EOF message to the client + try: + async for msg in consumer: + # If the message is an exception, we raise the exception + if isinstance(msg, BaseException): + await handle_async_error(context, msg, ERR_UDF_EXCEPTION_STRING) + return + # Send window EOF response or Window result response + # back to the client + else: + yield msg + except BaseException as e: + await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING) + return + # Wait for the process_input_stream task to finish for a clean exit + try: + await producer + except BaseException as e: + await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING) + return + + async def IsReady( + self, request: _empty_pb2.Empty, context: NumaflowServicerContext + ) -> accumulator_pb2.ReadyResponse: + """ + IsReady is the heartbeat endpoint for gRPC. + The pascal case function name comes from the proto accumulator_pb2_grpc.py file. + """ + return accumulator_pb2.ReadyResponse(ready=True) diff --git a/pynumaflow/accumulator/servicer/task_manager.py b/pynumaflow/accumulator/servicer/task_manager.py new file mode 100644 index 00000000..60301f22 --- /dev/null +++ b/pynumaflow/accumulator/servicer/task_manager.py @@ -0,0 +1,327 @@ +import asyncio +from collections.abc import AsyncIterable +from datetime import datetime +from typing import Union + +from pynumaflow._constants import ( + STREAM_EOF, + DELIMITER, + _LOGGER, +) +from pynumaflow.accumulator._dtypes import ( + AccumulatorResult, + Datum, + _AccumulatorBuilderClass, + AccumulatorAsyncCallable, + WindowOperation, +) +from pynumaflow.proto.accumulator import accumulator_pb2 +from pynumaflow.shared.asynciter import NonBlockingIterator + + +def build_unique_key_name(keys): + """ + Builds a unique key name for the given keys and window. + The key name is used to identify the Reduce task. + The format is: start_time:end_time:key1:key2:... + """ + return f"{DELIMITER.join(keys)}" + + +def build_window_hash(window): + """ + Builds a hash for the given window. + The hash is used to identify the Reduce Window + The format is: start_time:end_time + """ + return f"{window.start.ToMilliseconds()}:{window.end.ToMilliseconds()}" + + +def create_window_eof_response(window): + """Create a Reduce response with EOF=True for a given window""" + return accumulator_pb2.ReduceResponse(window=window, EOF=True) + + +class TaskManager: + """ + TaskManager is responsible for managing the Reduce tasks. + It is created whenever a new reduce operation is requested. + """ + + def __init__(self, handler: Union[AccumulatorAsyncCallable, _AccumulatorBuilderClass]): + # A dictionary to store the task information + self.tasks: dict[str, AccumulatorResult] = {} + # Collection for storing strong references to all running tasks. + # Event loop only keeps a weak reference, which can cause it to + # get lost during execution. + self.background_tasks = set() + # Handler for the reduce operation + self.__accumulator_handler = handler + # Queue to store the results of the reduce operation + # This queue is used to send the results to the client + # once the reduce operation is completed. + # This queue is also used to send the error/exceptions to the client + # if the reduce operation fails. + self.global_result_queue = NonBlockingIterator() + + def get_unique_windows(self): + """ + Returns the unique windows that are currently being processed + """ + # Dict to store unique windows + windows = dict() + # Iterate over all the tasks and add the windows + for task in self.tasks.values(): + window_hash = build_window_hash(task.window) + window_found = windows.get(window_hash, None) + # if window not seen yet, add to the dict + if not window_found: + windows[window_hash] = task.window + return windows + + def get_tasks(self): + """ + Returns the list of reduce tasks that are + currently being processed + """ + return list(self.tasks.values()) + + async def stream_send_eof(self): + """ + Sends EOF to input streams of all the Reduce + tasks that are currently being processed. + This is called when the input grpc stream is closed. + """ + for unified_key in self.tasks: + await self.tasks[unified_key].iterator.put(STREAM_EOF) + self.tasks.pop(unified_key) + + async def close_task(self, req): + d = req.payload + keys = d.keys() + unified_key = build_unique_key_name(keys) + curr_task = self.tasks.get(unified_key, None) + + if curr_task: + await self.tasks[unified_key].iterator.put(STREAM_EOF) + self.tasks.pop(unified_key) + else: + _LOGGER.critical("accumulator task not found", exc_info=True) + err = BaseException("accumulator task not found") + # Put the exception in the result queue + await self.global_result_queue.put(err) + + async def create_task(self, req): + """ + Creates a new accumulator task for the given request. + Based on the request we compute a unique key, and then + it creates a new task or appends the request to the existing task. + """ + d = req.payload + keys = d.keys() + unified_key = build_unique_key_name(keys) + curr_task = self.tasks.get(unified_key, None) + + # If the task does not exist, create a new task + if not curr_task: + niter = NonBlockingIterator() + riter = niter.read_iterator() + # Create a new result queue for the current task + # We create a new result queue for each task, so that + # the results of the reduce operation can be sent to the + # the global result queue, which in turn sends the results + # to the client. + res_queue = NonBlockingIterator() + + # Create a new write_to_global_queue task for the current, this will read from the + # result queue specifically for the current task and update the + # global result queue + consumer = asyncio.create_task( + self.write_to_global_queue(res_queue, self.global_result_queue, unified_key) + ) + # Save a reference to the result of this function, to avoid a + # task disappearing mid-execution. + self.background_tasks.add(consumer) + consumer.add_done_callback(self.clean_background) + + # Create a new task for the accumulator operation, this will invoke the + # Reduce handler with the given keys, request iterator, and window. + task = asyncio.create_task(self.__invoke_accumulator(riter, res_queue)) + # Save a reference to the result of this function, to avoid a + # task disappearing mid-execution. + self.background_tasks.add(task) + task.add_done_callback(self.clean_background) + + # Create a new AccumulatorResult object to store the task information + curr_task = AccumulatorResult( + task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1) + ) + + # Save the result of the reduce operation to the task list + self.tasks[unified_key] = curr_task + + # Put the request in the iterator + await curr_task.iterator.put(d) + + async def send_datum_to_task(self, req): + """ + Appends the request to the existing window reduce task. + If the task does not exist, create it. + """ + d = req.payload + keys = d.keys() + unified_key = build_unique_key_name(keys) + result = self.tasks.get(unified_key, None) + if not result: + await self.create_task(req) + else: + await result.iterator.put(d) + + async def __invoke_accumulator( + self, + request_iterator: AsyncIterable[Datum], + output: NonBlockingIterator, + ): + """ + Invokes the UDF reduce handler with the given keys, + request iterator, and window. Returns the result of the + reduce operation. + """ + new_instance = self.__accumulator_handler + + # If the accumulator handler is a class instance, create a new instance of it. + # It is required for a new key to be processed by a + # new instance of the reducer for a given window + # Otherwise the function handler can be called directly + if isinstance(self.__accumulator_handler, _AccumulatorBuilderClass): + new_instance = self.__accumulator_handler.create() + try: + _ = await new_instance(request_iterator, output) + # send EOF to the output stream + await output.put(STREAM_EOF) + # If there is an error in the reduce operation, log and + # then send the error to the result queue + except BaseException as err: + _LOGGER.critical("panic inside accumulator handle", exc_info=True) + # Put the exception in the result queue + await self.global_result_queue.put(err) + + async def process_input_stream( + self, request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest] + ): + # Start iterating through the request iterator and create tasks + # based on the operation type received. + try: + async for request in request_iterator: + # check whether the request is an open or append operation + if request.operation is int(WindowOperation.OPEN): + # create a new task for the open operation and + # put the request in the task iterator + await self.create_task(request) + elif request.operation is int(WindowOperation.APPEND): + # append the task data to the existing task + # if the task does not exist, create a new task + await self.send_datum_to_task(request) + elif request.operation is int(WindowOperation.CLOSE): + # close the current task for req + await self.close_task(request) + # If there is an error in the reduce operation, log and + # then send the error to the result queue + except BaseException as e: + err_msg = f"Accumulator Error: {repr(e)}" + _LOGGER.critical(err_msg, exc_info=True) + # Put the exception in the global result queue + await self.global_result_queue.put(e) + return + + try: + # send EOF to all the tasks once the request iterator is exhausted + # This will signal the tasks to stop reading the data on their + # respective iterators. + await self.stream_send_eof() + + # get the list of reduce tasks that are currently being processed + # iterate through the tasks and wait for them to complete + for task in self.get_tasks(): + # Once this is done, we know that the task has written all the results + # to the local result queue + fut = task.future + await fut + + # # Send an EOF message to the local result queue + # # This will signal that the task has completed processing + # await task.result_queue.put(STREAM_EOF) + + # Wait for the local queue to write + # all the results of this task to the global result queue + con_future = task.consumer_future + await con_future + + # # Once all tasks are completed, send EOF to all windows that + # # were processed in the Task Manager. We send a single + # # EOF message per window. + # current_windows = self.get_unique_windows() + # for window in current_windows.values(): + # # Send an EOF message to the global result queue + # # This will signal that window has been processed + # eof_window_msg = create_window_eof_response(window=window) + # await self.global_result_queue.put(eof_window_msg) + + # Once all tasks are completed, senf EOF the global result queue + await self.global_result_queue.put(STREAM_EOF) + except BaseException as e: + err_msg = f"Reduce Streaming Error: {repr(e)}" + _LOGGER.critical(err_msg, exc_info=True) + await self.global_result_queue.put(e) + + async def write_to_global_queue( + self, input_queue: NonBlockingIterator, output_queue: NonBlockingIterator, unified_key: str + ): + """ + This task is for given Reduce task. + This would from the local result queue for the task and then write + to the global result queue + """ + reader = input_queue.read_iterator() + task = self.tasks[unified_key] + + wm: datetime = task.latest_watermark + async for msg in reader: + # Convert the window to a datetime object + if wm < msg.watermark: + task.update_watermark(msg.watermark) + self.tasks[unified_key] = task + wm = msg.watermark + + start_dt = datetime.fromtimestamp(0) + end_dt = wm + res = accumulator_pb2.AccumulatorResponse( + payload=accumulator_pb2.Payload( + keys=msg.keys, + value=msg.value, + event_time=msg.event_time, + watermark=msg.watermark, + headers=msg.headers, + id=msg.id, + ), + window=accumulator_pb2.KeyedWindow( + start=start_dt, end=end_dt, slot="slot-0", keys=task.keys + ), + EOF=False, + tags=msg.tags, + ) + await output_queue.put(res) + # send EOF + res = accumulator_pb2.AccumulatorResponse( + window=accumulator_pb2.KeyedWindow( + start=datetime.fromtimestamp(0), end=wm, slot="slot-0", keys=task.keys + ), + EOF=True, + ) + await output_queue.put(res) + + def clean_background(self, task): + """ + Remove the task from the background tasks collection + """ + self.background_tasks.remove(task) diff --git a/pynumaflow/info/types.py b/pynumaflow/info/types.py index 2845c264..12375a70 100644 --- a/pynumaflow/info/types.py +++ b/pynumaflow/info/types.py @@ -71,6 +71,7 @@ class ContainerType(str, Enum): Sessionreducer = "sessionreducer" Sideinput = "sideinput" Fbsinker = "fb-sinker" + Accumulator = "accumulator" # Minimum version of Numaflow required by the current SDK version @@ -86,6 +87,7 @@ class ContainerType(str, Enum): ContainerType.Sessionreducer: "1.4.0-z", ContainerType.Sideinput: "1.4.0-z", ContainerType.Fbsinker: "1.4.0-z", + ContainerType.Accumulator: "1.5.0-z", } From 8b58b63761f48066c37df5f9781194525bf4ee8b Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 7 May 2025 16:33:09 -0700 Subject: [PATCH 3/4] minor fixes Signed-off-by: Sidhant Kohli --- pynumaflow/accumulator/__init__.py | 4 +- pynumaflow/accumulator/_dtypes.py | 69 ++++++++++++++----- .../accumulator/servicer/task_manager.py | 52 +++++++++----- 3 files changed, 87 insertions(+), 38 deletions(-) diff --git a/pynumaflow/accumulator/__init__.py b/pynumaflow/accumulator/__init__.py index 85cf335d..76690768 100644 --- a/pynumaflow/accumulator/__init__.py +++ b/pynumaflow/accumulator/__init__.py @@ -4,7 +4,7 @@ IntervalWindow, Metadata, DROP, - ReduceStreamer, + Accumulator, KeyedWindow, ) from pynumaflow.accumulator.async_server import AccumulatorAsyncServer @@ -16,6 +16,6 @@ "Metadata", "DROP", "AccumulatorAsyncServer", - "ReduceStreamer", + "Accumulator", "KeyedWindow", ] diff --git a/pynumaflow/accumulator/_dtypes.py b/pynumaflow/accumulator/_dtypes.py index 00e4446f..18a9b199 100644 --- a/pynumaflow/accumulator/_dtypes.py +++ b/pynumaflow/accumulator/_dtypes.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from datetime import datetime from enum import IntEnum -from typing import TypeVar, Callable, Union, Optional +from typing import TypeVar, Callable, Union, Optional, Type from collections.abc import AsyncIterable from pynumaflow.shared.asynciter import NonBlockingIterator @@ -282,38 +282,55 @@ def payload(self) -> Datum: @dataclass(init=False) class Message: """ - Basic datatype for data passing to the next vertex/vertices. - - Args: - value: data in bytes - keys: []string keys for vertex (optional) - tags: []string tags for conditional forwarding (optional) + Represents a unit of data passed to the next vertex in the pipeline. """ - __slots__ = ("_value", "_keys", "_tags") + __slots__ = ( + "_value", "_keys", "_tags", "_event_time", "_watermark", "_id", "_headers" + ) _value: bytes _keys: list[str] _tags: list[str] + _event_time: datetime + _watermark: datetime + _id: str + _headers: dict[str, str] def __init__( self, value: bytes, - keys: list[str] = None, - tags: list[str] = None, + keys: Optional[list[str]] = None, + tags: Optional[list[str]] = None, + event_time: Optional[datetime] = None, + watermark: Optional[datetime] = None, + id: Optional[str] = "", + headers: Optional[dict[str, str]] = None, ): - """ - Creates a Message object to send value to a vertex. - """ + self._value = value or b"" self._keys = keys or [] self._tags = tags or [] - self._value = value or b"" - # self._window = window or None + self._event_time = event_time or datetime.fromtimestamp(0) + self._watermark = watermark or datetime.fromtimestamp(0) + self._id = id or "" + self._headers = headers or {} + + @classmethod + def to_drop(cls: Type[M]) -> M: + return cls(b"", None, ["DROP"]) - # returns the Message Object which will be dropped @classmethod - def to_drop(cls: type[M]) -> M: - return cls(b"", None, [DROP]) + def from_datum(cls: Type[M], datum: "Datum") -> M: + """Creates a Message from a Datum.""" + return cls( + value=datum.value, + keys=datum.keys, + tags=[], + event_time=datum.event_time, + watermark=datum.watermark, + id=datum.id, + headers=datum.headers, + ) @property def value(self) -> bytes: @@ -327,6 +344,22 @@ def keys(self) -> list[str]: def tags(self) -> list[str]: return self._tags + @property + def event_time(self) -> datetime: + return self._event_time + + @property + def watermark(self) -> datetime: + return self._watermark + + @property + def id(self) -> str: + return self._id + + @property + def headers(self) -> dict[str, str]: + return self._headers + AccumulatorAsyncCallable = Callable[ [list[str], AsyncIterable[Datum], NonBlockingIterator, Metadata], None diff --git a/pynumaflow/accumulator/servicer/task_manager.py b/pynumaflow/accumulator/servicer/task_manager.py index 60301f22..6e9aab2b 100644 --- a/pynumaflow/accumulator/servicer/task_manager.py +++ b/pynumaflow/accumulator/servicer/task_manager.py @@ -17,6 +17,7 @@ ) from pynumaflow.proto.accumulator import accumulator_pb2 from pynumaflow.shared.asynciter import NonBlockingIterator +from google.protobuf import timestamp_pb2 as _timestamp_pb2 def build_unique_key_name(keys): @@ -94,11 +95,11 @@ async def stream_send_eof(self): """ for unified_key in self.tasks: await self.tasks[unified_key].iterator.put(STREAM_EOF) - self.tasks.pop(unified_key) + self.tasks.clear() async def close_task(self, req): d = req.payload - keys = d.keys() + keys = d.keys unified_key = build_unique_key_name(keys) curr_task = self.tasks.get(unified_key, None) @@ -118,7 +119,7 @@ async def create_task(self, req): it creates a new task or appends the request to the existing task. """ d = req.payload - keys = d.keys() + keys = d.keys unified_key = build_unique_key_name(keys) curr_task = self.tasks.get(unified_key, None) @@ -178,9 +179,9 @@ async def send_datum_to_task(self, req): await result.iterator.put(d) async def __invoke_accumulator( - self, - request_iterator: AsyncIterable[Datum], - output: NonBlockingIterator, + self, + request_iterator: AsyncIterable[Datum], + output: NonBlockingIterator, ): """ Invokes the UDF reduce handler with the given keys, @@ -207,22 +208,24 @@ async def __invoke_accumulator( await self.global_result_queue.put(err) async def process_input_stream( - self, request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest] + self, request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest] ): # Start iterating through the request iterator and create tasks # based on the operation type received. try: async for request in request_iterator: + # print("IM HERE", request.payload.keys) # check whether the request is an open or append operation - if request.operation is int(WindowOperation.OPEN): + if request.operation.event is int(WindowOperation.OPEN): + # create a new task for the open operation and # put the request in the task iterator await self.create_task(request) - elif request.operation is int(WindowOperation.APPEND): + elif request.operation.event is int(WindowOperation.APPEND): # append the task data to the existing task # if the task does not exist, create a new task await self.send_datum_to_task(request) - elif request.operation is int(WindowOperation.CLOSE): + elif request.operation.event is int(WindowOperation.CLOSE): # close the current task for req await self.close_task(request) # If there is an error in the reduce operation, log and @@ -275,7 +278,7 @@ async def process_input_stream( await self.global_result_queue.put(e) async def write_to_global_queue( - self, input_queue: NonBlockingIterator, output_queue: NonBlockingIterator, unified_key: str + self, input_queue: NonBlockingIterator, output_queue: NonBlockingIterator, unified_key: str ): """ This task is for given Reduce task. @@ -285,7 +288,7 @@ async def write_to_global_queue( reader = input_queue.read_iterator() task = self.tasks[unified_key] - wm: datetime = task.latest_watermark + wm = task.latest_watermark async for msg in reader: # Convert the window to a datetime object if wm < msg.watermark: @@ -293,8 +296,13 @@ async def write_to_global_queue( self.tasks[unified_key] = task wm = msg.watermark - start_dt = datetime.fromtimestamp(0) - end_dt = wm + event_time_timestamp = _timestamp_pb2.Timestamp() + t = datetime.fromtimestamp(0) + event_time_timestamp.FromDatetime(dt=t) + + event_time_timestamp_end = _timestamp_pb2.Timestamp() + event_time_timestamp_end.FromDatetime(dt=wm) + res = accumulator_pb2.AccumulatorResponse( payload=accumulator_pb2.Payload( keys=msg.keys, @@ -305,17 +313,25 @@ async def write_to_global_queue( id=msg.id, ), window=accumulator_pb2.KeyedWindow( - start=start_dt, end=end_dt, slot="slot-0", keys=task.keys + start=event_time_timestamp, end=event_time_timestamp_end, slot="slot-0", keys=task.keys ), EOF=False, tags=msg.tags, ) await output_queue.put(res) # send EOF + event_time_timestamp = _timestamp_pb2.Timestamp() + t = datetime.fromtimestamp(0) + event_time_timestamp.FromDatetime(dt=t) + + event_time_timestamp_end = _timestamp_pb2.Timestamp() + event_time_timestamp_end.FromDatetime(dt=wm) + + window = accumulator_pb2.KeyedWindow( + start=event_time_timestamp, end=event_time_timestamp_end, slot="slot-0", keys=task.keys + ) res = accumulator_pb2.AccumulatorResponse( - window=accumulator_pb2.KeyedWindow( - start=datetime.fromtimestamp(0), end=wm, slot="slot-0", keys=task.keys - ), + window=window, EOF=True, ) await output_queue.put(res) From fa8fa6d3c1ab05d091c0494d7b95f049b15f321d Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 16 Jul 2025 01:17:39 -0700 Subject: [PATCH 4/4] chore: add example Signed-off-by: Sidhant Kohli --- examples/accumulator/counter/Dockerfile | 55 ++++++++++++++ examples/accumulator/counter/Makefile | 22 ++++++ examples/accumulator/counter/entry.sh | 4 + examples/accumulator/counter/example.py | 46 ++++++++++++ examples/accumulator/counter/pipeline.yaml | 50 +++++++++++++ examples/accumulator/counter/pyproject.toml | 15 ++++ examples/accumulator/streamsorter/Dockerfile | 55 ++++++++++++++ examples/accumulator/streamsorter/Makefile | 22 ++++++ examples/accumulator/streamsorter/entry.sh | 4 + examples/accumulator/streamsorter/example.py | 75 +++++++++++++++++++ .../accumulator/streamsorter/pipeline.yaml | 51 +++++++++++++ .../accumulator/streamsorter/pyproject.toml | 15 ++++ 12 files changed, 414 insertions(+) create mode 100644 examples/accumulator/counter/Dockerfile create mode 100644 examples/accumulator/counter/Makefile create mode 100644 examples/accumulator/counter/entry.sh create mode 100644 examples/accumulator/counter/example.py create mode 100644 examples/accumulator/counter/pipeline.yaml create mode 100644 examples/accumulator/counter/pyproject.toml create mode 100644 examples/accumulator/streamsorter/Dockerfile create mode 100644 examples/accumulator/streamsorter/Makefile create mode 100644 examples/accumulator/streamsorter/entry.sh create mode 100644 examples/accumulator/streamsorter/example.py create mode 100644 examples/accumulator/streamsorter/pipeline.yaml create mode 100644 examples/accumulator/streamsorter/pyproject.toml diff --git a/examples/accumulator/counter/Dockerfile b/examples/accumulator/counter/Dockerfile new file mode 100644 index 00000000..de1756fd --- /dev/null +++ b/examples/accumulator/counter/Dockerfile @@ -0,0 +1,55 @@ +#################################################################################################### +# builder: install needed dependencies +#################################################################################################### + +FROM python:3.10-slim-bullseye AS builder + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_VERSION=1.2.2 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" + +ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/reducestream/counter" +ENV VENV_PATH="$EXAMPLE_PATH/.venv" +ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH" + +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + curl \ + wget \ + # deps for building python deps + build-essential \ + && apt-get install -y git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + \ + # install dumb-init + && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ + && chmod +x /dumb-init \ + && curl -sSL https://install.python-poetry.org | python3 - + +#################################################################################################### +# udf: used for running the udf vertices +#################################################################################################### +FROM builder AS udf + +WORKDIR $PYSETUP_PATH +COPY ./ ./ + +WORKDIR $EXAMPLE_PATH +RUN poetry lock +RUN poetry install --no-cache --no-root && \ + rm -rf ~/.cache/pypoetry/ + +RUN chmod +x entry.sh + +ENTRYPOINT ["/dumb-init", "--"] +CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"] + +EXPOSE 5000 diff --git a/examples/accumulator/counter/Makefile b/examples/accumulator/counter/Makefile new file mode 100644 index 00000000..ba3e8793 --- /dev/null +++ b/examples/accumulator/counter/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-python/reduce-stream-counter:${TAG} +DOCKER_FILE_PATH = examples/reducestream/counter/Dockerfile + +.PHONY: update +update: + poetry update -vv + +.PHONY: image-push +image-push: update + cd ../../../ && docker buildx build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} \ + --platform linux/amd64,linux/arm64 . --push + +.PHONY: image +image: update + cd ../../../ && docker build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi diff --git a/examples/accumulator/counter/entry.sh b/examples/accumulator/counter/entry.sh new file mode 100644 index 00000000..073b05e3 --- /dev/null +++ b/examples/accumulator/counter/entry.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -eux + +python example.py diff --git a/examples/accumulator/counter/example.py b/examples/accumulator/counter/example.py new file mode 100644 index 00000000..405d7f7a --- /dev/null +++ b/examples/accumulator/counter/example.py @@ -0,0 +1,46 @@ +import os +from collections.abc import AsyncIterable + +from pynumaflow.reducestreamer import ( + Message, + Datum, + Metadata, + ReduceStreamAsyncServer, + ReduceStreamer, +) +from pynumaflow.shared.asynciter import NonBlockingIterator + + +class ReduceCounter(ReduceStreamer): + def __init__(self, counter): + self.counter = counter + + async def handler( + self, + keys: list[str], + datums: AsyncIterable[Datum], + output: NonBlockingIterator, + md: Metadata, + ): + async for _ in datums: + self.counter += 1 + if self.counter > 10: + msg = f"counter:{self.counter}" + # NOTE: this is returning results because we have seen all the data + # use this only if you really need this feature because your next vertex + # will get both early result and final results and it should be able to + # handle both the scenarios. + await output.put(Message(str.encode(msg), keys=keys)) + self.counter = 0 + msg = f"counter:{self.counter}" + await output.put(Message(str.encode(msg), keys=keys)) + + +if __name__ == "__main__": + invoke = os.getenv("INVOKE", "class") + if invoke == "class": + # Here we are using the class instance as the reducer_instance + # which will be used to invoke the handler function. + # We are passing the init_args for the class instance. + grpc_server = ReduceStreamAsyncServer(ReduceCounter, init_args=(0,)) + grpc_server.start() diff --git a/examples/accumulator/counter/pipeline.yaml b/examples/accumulator/counter/pipeline.yaml new file mode 100644 index 00000000..5ac746c9 --- /dev/null +++ b/examples/accumulator/counter/pipeline.yaml @@ -0,0 +1,50 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: even-odd-sum +spec: + vertices: + - name: in + source: + http: {} + - name: atoi + scale: + min: 3 + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd:stable + imagePullPolicy: Always + - name: compute-sum + udf: + container: + # compute the sum + image: quay.io/numaio/numaflow-python/reduce-stream-counter:stable + imagePullPolicy: Always + env: + - name: PYTHONDEBUG + value: "true" + - name: INVOKE + value: "class" + groupBy: + window: + fixed: + length: 60s + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 10Gi + accessMode: ReadWriteOnce + partitions: 1 + - name: sink + scale: + min: 1 + sink: + log: {} + edges: + - from: in + to: atoi + - from: atoi + to: compute-sum + - from: compute-sum + to: sink diff --git a/examples/accumulator/counter/pyproject.toml b/examples/accumulator/counter/pyproject.toml new file mode 100644 index 00000000..aeeb4d30 --- /dev/null +++ b/examples/accumulator/counter/pyproject.toml @@ -0,0 +1,15 @@ +[tool.poetry] +name = "reduce-stream-counter" +version = "0.2.4" +description = "" +authors = ["Numaflow developers"] + +[tool.poetry.dependencies] +python = "~3.10" +pynumaflow = { path = "../../../"} + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/examples/accumulator/streamsorter/Dockerfile b/examples/accumulator/streamsorter/Dockerfile new file mode 100644 index 00000000..c5c1bda6 --- /dev/null +++ b/examples/accumulator/streamsorter/Dockerfile @@ -0,0 +1,55 @@ +#################################################################################################### +# builder: install needed dependencies +#################################################################################################### + +FROM python:3.11-slim-bullseye AS builder + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_VERSION=1.2.2 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" + +ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/streamsorter" +ENV VENV_PATH="$EXAMPLE_PATH/.venv" +ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH" + +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + curl \ + wget \ + # deps for building python deps + build-essential \ + && apt-get install -y git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + \ + # install dumb-init + && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ + && chmod +x /dumb-init \ + && curl -sSL https://install.python-poetry.org | python3 - + +#################################################################################################### +# udf: used for running the udf vertices +#################################################################################################### +FROM builder AS udf + +WORKDIR $PYSETUP_PATH +COPY ./ ./ + +WORKDIR $EXAMPLE_PATH +RUN poetry lock +RUN poetry install --no-cache --no-root && \ + rm -rf ~/.cache/pypoetry/ + +RUN chmod +x entry.sh + +ENTRYPOINT ["/dumb-init", "--"] +CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"] + +EXPOSE 5000 diff --git a/examples/accumulator/streamsorter/Makefile b/examples/accumulator/streamsorter/Makefile new file mode 100644 index 00000000..f36656b2 --- /dev/null +++ b/examples/accumulator/streamsorter/Makefile @@ -0,0 +1,22 @@ +TAG ?= test1 +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-python/accumulator-sorter:${TAG} +DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile + +.PHONY: update +update: + poetry update -vv + +.PHONY: image-push +image-push: update + cd ../../../ && docker buildx build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} \ + --platform linux/amd64,linux/arm64 . --push + +.PHONY: image +image: update + cd ../../../ && docker build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi diff --git a/examples/accumulator/streamsorter/entry.sh b/examples/accumulator/streamsorter/entry.sh new file mode 100644 index 00000000..073b05e3 --- /dev/null +++ b/examples/accumulator/streamsorter/entry.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -eux + +python example.py diff --git a/examples/accumulator/streamsorter/example.py b/examples/accumulator/streamsorter/example.py new file mode 100644 index 00000000..dbab4953 --- /dev/null +++ b/examples/accumulator/streamsorter/example.py @@ -0,0 +1,75 @@ +import logging +import os +from collections.abc import AsyncIterable +from datetime import datetime +from typing import List + +from pynumaflow import setup_logging +from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer +from pynumaflow.reducestreamer import ( + Message, + Datum, +) +from pynumaflow.shared.asynciter import NonBlockingIterator + +_LOGGER = setup_logging(__name__) +if os.getenv("PYTHONDEBUG"): + _LOGGER.setLevel(logging.DEBUG) + + +class StreamSorter(Accumulator): + def __init__(self): + _LOGGER.error("MEEEEE") + self.latest_wm = datetime.fromtimestamp(-1) + self.sorted_buffer: List[Datum] = [] + + async def handler( + self, + datums: AsyncIterable[Datum], + output: NonBlockingIterator, + ): + _LOGGER.info("HEREEEEE") + async for datum in datums: + _LOGGER.info(f"Received datum with event time: {datum.watermark}") + _LOGGER.info(f"Received datum with event time-2:{self.latest_wm}") + + # If watermark has moved forward + if datum.watermark.ToDatetime() and datum.watermark.ToDatetime() > self.latest_wm: + self.latest_wm = datum.watermark.ToDatetime() + await self.flush_buffer(output) + + self.insert_sorted(datum) + + def insert_sorted(self, datum: Datum): + # Binary insert to keep sorted buffer in order + left, right = 0, len(self.sorted_buffer) + while left < right: + mid = (left + right) // 2 + if self.sorted_buffer[mid].event_time.ToDatetime() > datum.event_time.ToDatetime(): + right = mid + else: + left = mid + 1 + self.sorted_buffer.insert(left, datum) + + async def flush_buffer(self, output: NonBlockingIterator): + _LOGGER.info(f"Watermark updated, flushing sortedBuffer: {self.latest_wm}") + i = 0 + for datum in self.sorted_buffer: + if datum.event_time > self.latest_wm: + break + await output.put(Message.from_datum(datum)) + logging.info(f"Sent datum with event time: {datum.watermark.ToDatetime()}") + i += 1 + # Remove flushed items + self.sorted_buffer = self.sorted_buffer[i:] + + +if __name__ == "__main__": + invoke = os.getenv("INVOKE", "class") + grpc_server = None + if invoke == "class": + # Here we are using the class instance as the reducer_instance + # which will be used to invoke the handler function. + # We are passing the init_args for the class instance. + grpc_server = AccumulatorAsyncServer(StreamSorter) + grpc_server.start() diff --git a/examples/accumulator/streamsorter/pipeline.yaml b/examples/accumulator/streamsorter/pipeline.yaml new file mode 100644 index 00000000..604e6997 --- /dev/null +++ b/examples/accumulator/streamsorter/pipeline.yaml @@ -0,0 +1,51 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: even-odd-sum +spec: + vertices: + - name: in + source: + http: {} + - name: atoi + scale: + min: 3 + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd:stable + imagePullPolicy: Always + - name: compute-sum + udf: + container: + # compute the sum + image: quay.io/numaio/numaflow-python/reduce-stream-sum:stable + imagePullPolicy: Always + env: + - name: PYTHONDEBUG + value: "true" + - name: INVOKE + value: "class" + groupBy: + window: + fixed: + length: 60s + streaming: true + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 10Gi + accessMode: ReadWriteOnce + partitions: 1 + - name: sink + scale: + min: 1 + sink: + log: {} + edges: + - from: in + to: atoi + - from: atoi + to: compute-sum + - from: compute-sum + to: sink diff --git a/examples/accumulator/streamsorter/pyproject.toml b/examples/accumulator/streamsorter/pyproject.toml new file mode 100644 index 00000000..6557f78f --- /dev/null +++ b/examples/accumulator/streamsorter/pyproject.toml @@ -0,0 +1,15 @@ +[tool.poetry] +name = "stream-sorter" +version = "0.2.4" +description = "" +authors = ["Numaflow developers"] + +[tool.poetry.dependencies] +python = ">=3.11,<3.13" +pynumaflow = { path = "../../../"} + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api"