Skip to content

Commit 27ad810

Browse files
authored
fix(typing): pyi, examples, and manifest (#309)
Signed-off-by: Vigith Maurice <vigith@gmail.com>
1 parent e1c38d6 commit 27ad810

25 files changed

Lines changed: 59 additions & 33 deletions

packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import signal
33
from collections.abc import AsyncIterable
4+
from typing import Awaitable, Callable
45

56
from pynumaflow_lite import batchmapper
67
from pynumaflow_lite.batchmapper import Message
@@ -28,7 +29,7 @@ async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.
2829
pass
2930

3031

31-
async def start(f: callable):
32+
async def start(f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]):
3233
server = batchmapper.BatchMapAsyncServer()
3334

3435
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run

packages/pynumaflow-lite/manifests/map/map_cat.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
22
import signal
3+
from typing import Awaitable, Callable
4+
35
from pynumaflow_lite import mapper
46

57

@@ -26,7 +28,7 @@ async def handler(
2628
pass
2729

2830

29-
async def start(f: callable):
31+
async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages]]):
3032
server = mapper.MapAsyncServer()
3133

3234
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run

packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import signal
33
from collections.abc import AsyncIterator
4+
from typing import Callable
45

56
from pynumaflow_lite import mapstreamer
67
from pynumaflow_lite.mapstreamer import Message
@@ -24,7 +25,7 @@ async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncItera
2425
pass
2526

2627

27-
async def start(f: callable):
28+
async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Message]]):
2829
# Use default socket/info file locations; no explicit sock file passed
2930
server = mapstreamer.MapStreamAsyncServer()
3031

packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async def handler(
3232
pass
3333

3434

35-
async def start(creator: type, init_args: tuple):
35+
async def start(creator: type[reducer.Reducer], init_args: tuple):
3636
sock_file = "/var/run/numaflow/reduce.sock"
3737
server_info_file = "/var/run/numaflow/reducer-server-info"
3838
server = reducer.ReduceAsyncServer(sock_file, server_info_file)

packages/pynumaflow-lite/manifests/sink/sink_log.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import collections
33
import logging
44
import signal
5-
from collections.abc import AsyncIterable
5+
from collections.abc import AsyncIterable, AsyncIterator
6+
from typing import Awaitable, Callable
67

78
from pynumaflow_lite import sinker
89
from pynumaflow_lite._sink_dtypes import Sinker
@@ -35,7 +36,7 @@ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses
3536
pass
3637

3738

38-
async def start(f: collections.abc.Callable):
39+
async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]]):
3940
server = sinker.SinkAsyncServer()
4041

4142
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run

packages/pynumaflow-lite/manifests/sourcetransform/sourcetransform_event_filter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import asyncio
22
import signal
33
from datetime import datetime, timezone
4+
from typing import Callable
5+
46
from pynumaflow_lite import sourcetransformer
57

68
# Define epoch timestamps for filtering
@@ -59,7 +61,7 @@ async def handler(
5961
pass
6062

6163

62-
async def start(f: callable):
64+
async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages]):
6365
server = sourcetransformer.SourceTransformAsyncServer()
6466

6567
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run

packages/pynumaflow-lite/pynumaflow_lite/accumulator.pyi

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import datetime
2-
from typing import AsyncIterator, Optional
2+
from typing import AsyncIterator, Optional, Callable, Awaitable
3+
from collections.abc import AsyncIterable
34

45
class Message:
56
"""
@@ -74,13 +75,13 @@ class AccumulatorAsyncServer:
7475
info_file: str | None = "/var/run/numaflow/accumulator-server-info",
7576
) -> None: ...
7677
async def start(
77-
self, py_creator: object, init_args: object | None = None
78+
self, py_creator: type[Accumulator] | Callable[[AsyncIterable[Datum]], AsyncIterator[Message]], init_args: tuple | None = None
7879
) -> None:
7980
"""
80-
Start the server with the given Python class (creator).
81+
Start the server with the given Python class (creator) or function.
8182
8283
Args:
83-
py_creator: The Python class to instantiate per key
84+
py_creator: The Python class to instantiate per key or a function
8485
init_args: Optional tuple of positional arguments for class instantiation
8586
"""
8687
...

packages/pynumaflow-lite/pynumaflow_lite/batchmapper.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class BatchMapAsyncServer:
5757
info_file: str | None = ...,
5858
) -> None: ...
5959

60-
def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ...
60+
def start(self, py_func: Callable[[AsyncIterator[Datum]], Awaitable[BatchResponses]]) -> Awaitable[None]: ...
6161

6262
def stop(self) -> None: ...
6363

packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class MapAsyncServer:
113113
info_file: str | None = ...,
114114
) -> None: ...
115115

116-
def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ...
116+
def start(self, py_func: Callable[[list[str], Datum], Awaitable[Messages]]) -> Awaitable[None]: ...
117117

118118
def stop(self) -> None: ...
119119

packages/pynumaflow-lite/pynumaflow_lite/mapstreamer.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import Optional, List, Dict, Callable, Awaitable, Any, AsyncIterator
3+
from typing import Optional, List, Dict, Callable, Awaitable, AsyncIterator
44
import datetime as _dt
55

66
# Re-export the Python ABC for user convenience and typing
@@ -45,7 +45,7 @@ class MapStreamAsyncServer:
4545
info_file: str | None = ...,
4646
) -> None: ...
4747

48-
def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ...
48+
def start(self, py_func: Callable[[list[str], Datum], AsyncIterator[Message]]) -> Awaitable[None]: ...
4949

5050
def stop(self) -> None: ...
5151

0 commit comments

Comments
 (0)