Skip to content

Commit ac59dc1

Browse files
feat: [pynumaflow-lite] Update Sourcer to propagate totalPartitions
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
1 parent 5c7d07f commit ac59dc1

5 files changed

Lines changed: 64 additions & 13 deletions

File tree

packages/pynumaflow-lite/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ name = "pynumaflow_lite"
99
crate-type = ["cdylib", "rlib"]
1010

1111
[dependencies]
12-
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" }
12+
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "15c46e8289943a639a46a475b7e0d286e928a8b0" }
1313
pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] }
1414
tokio = "1.47.1"
1515
tonic = "0.14.2"

packages/pynumaflow-lite/manifests/source/simple_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async def pending_handler(self) -> sourcer.PendingResponse:
7878
"""
7979
return sourcer.PendingResponse(count=0)
8080

81-
async def partitions_handler(self) -> sourcer.PartitionsResponse:
81+
async def active_partitions_handler(self) -> sourcer.PartitionsResponse:
8282
"""
8383
The simple source always returns default partitions.
8484
"""

packages/pynumaflow-lite/pynumaflow_lite/_source_dtypes.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
NackRequest,
99
PendingResponse,
1010
PartitionsResponse,
11+
TotalPartitionsResponse,
1112
)
1213

1314

@@ -19,10 +20,11 @@ class Sourcer(metaclass=ABCMeta):
1920
- read_handler: Read messages from the source
2021
- ack_handler: Acknowledge processed messages
2122
- pending_handler: Return the number of pending messages
22-
- partitions_handler: Return the partitions this source handles
23+
- active_partitions_handler: Return the partitions this source handles
2324
2425
Optionally, you can implement:
2526
- nack_handler: Negatively acknowledge messages (default: no-op)
27+
- total_partitions_handler: Return the total number of partitions in the source
2628
"""
2729

2830
def __call__(self, *args, **kwargs):
@@ -88,9 +90,9 @@ async def pending_handler(self) -> PendingResponse:
8890
pass
8991

9092
@abstractmethod
91-
async def partitions_handler(self) -> PartitionsResponse:
93+
async def active_partitions_handler(self) -> PartitionsResponse:
9294
"""
93-
Return the partitions associated with this source.
95+
Return the active partitions associated with this source.
9496
9597
This is used by the platform to determine the partitions to which
9698
the watermark should be published. If your source doesn't have the
@@ -105,6 +107,21 @@ async def partitions_handler(self) -> PartitionsResponse:
105107
"""
106108
pass
107109

110+
async def total_partitions_handler(self) -> int | None:
111+
"""
112+
Optional.
113+
114+
Returns the total number of partitions in the source.
115+
Used by the platform for watermark progression to know when all
116+
processors have reported in.
117+
118+
Returns None by default, indicating the source does not report total partitions.
119+
120+
:return:
121+
TotalPartitionsResponse: Response containing the total number of partitions
122+
"""
123+
return None
124+
108125
async def nack_handler(self, request: NackRequest) -> None:
109126
"""
110127
Negatively acknowledge messages (optional).

packages/pynumaflow-lite/src/source/server.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,27 +145,27 @@ impl numaflow::source::Sourcer for PySourceRunner {
145145
}
146146
}
147147

148-
/// Returns the partitions associated with the source. This will be used by the platform to determine
148+
/// Returns the active partitions associated with the source. This will be used by the platform to determine
149149
/// the partitions to which the watermark should be published. Some sources might not have the concept of partitions.
150150
/// Kafka is an example of source where a reader can read from multiple partitions.
151151
/// If None is returned, Numaflow replica-id will be returned as the partition.
152-
async fn partitions(&self) -> Option<Vec<i32>> {
153-
// Call the Python partitions_handler
152+
async fn active_partitions(&self) -> Option<Vec<i32>> {
153+
// Call the Python active_partitions_handler
154154
let fut = Python::attach(|py| {
155155
let py_handler = self.py_handler.clone();
156156
let locals = pyo3_async_runtimes::TaskLocals::new(self.event_loop.bind(py).clone());
157157

158158
let coro = py_handler
159-
.call_method0(py, "partitions_handler")
160-
.expect("failed to call partitions_handler")
159+
.call_method0(py, "active_partitions_handler")
160+
.expect("failed to call active_partitions_handler")
161161
.into_bound(py);
162162

163163
pyo3_async_runtimes::into_future_with_locals(&locals, coro)
164-
.expect("failed to convert partitions_handler to future")
164+
.expect("failed to convert active_partitions_handler to future")
165165
});
166166

167167
// Await the Python coroutine and extract the result
168-
let result = fut.await.expect("partitions_handler failed");
168+
let result = fut.await.expect("active_partitions_handler failed");
169169

170170
let partitions_response = Python::attach(|py| {
171171
result
@@ -175,6 +175,34 @@ impl numaflow::source::Sourcer for PySourceRunner {
175175

176176
Some(partitions_response.partitions)
177177
}
178+
179+
/// Returns the total number of partitions in the source. This is used by the platform for
180+
/// watermark progression to know when all processors have reported in.
181+
/// If None is returned, the platform will not use total partitions for watermark tracking.
182+
async fn total_partitions(&self) -> Option<i32> {
183+
// Call the Python total_partitions_handler
184+
let fut = Python::attach(|py| {
185+
let py_handler = self.py_handler.clone();
186+
let locals = pyo3_async_runtimes::TaskLocals::new(self.event_loop.bind(py).clone());
187+
188+
let coro = py_handler
189+
.call_method0(py, "total_partitions_handler")
190+
.expect("failed to call total_partitions_handler")
191+
.into_bound(py);
192+
193+
pyo3_async_runtimes::into_future_with_locals(&locals, coro)
194+
.expect("failed to convert total_partitions_handler to future")
195+
});
196+
197+
// Await the Python coroutine and extract the result
198+
let result = fut.await.expect("total_partitions_handler failed");
199+
200+
Python::attach(|py| {
201+
result
202+
.extract::<Option<i32>>(py)
203+
.expect("failed to extract Option<i32> from total_partitions_handler")
204+
})
205+
}
178206
}
179207

180208
/// Start the source server by spinning up a dedicated Python asyncio loop and wiring shutdown.

packages/pynumaflow-lite/tests/examples/source_simple.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,18 @@ async def pending_handler(self) -> sourcer.PendingResponse:
8787
"""
8888
return sourcer.PendingResponse(count=0)
8989

90-
async def partitions_handler(self) -> sourcer.PartitionsResponse:
90+
async def active_partitions_handler(self) -> sourcer.PartitionsResponse:
9191
"""
9292
The simple source always returns default partitions.
9393
"""
9494
return sourcer.PartitionsResponse(partitions=[self.partition_idx])
9595

96+
async def total_partitions_handler(self) -> int:
97+
"""
98+
The simple source has only one partition.
99+
"""
100+
return 1
101+
96102

97103
async def start():
98104
sock_file = "/tmp/var/run/numaflow/source.sock"

0 commit comments

Comments
 (0)