Skip to content

Commit 9b043d4

Browse files
BulkBeingvigith
andauthored
fix: Sourcer - Change pending/partitions handlers of Source to async (#301)
Signed-off-by: Sreekanth <prsreekanth920@gmail.com> Signed-off-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com>
1 parent eae7222 commit 9b043d4

2 files changed

Lines changed: 6 additions & 12 deletions

File tree

packages/pynumaflow/pynumaflow/sourcer/_dtypes.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,6 @@ class Sourcer(metaclass=ABCMeta):
273273
which will be exposed over an gRPC server.
274274
"""
275275

276-
def __call__(self, *args, **kwargs):
277-
"""
278-
Allow to call handler function directly if class instance is sent
279-
"""
280-
return self.handler(*args, **kwargs)
281-
282276
@abstractmethod
283277
async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
284278
"""
@@ -290,28 +284,28 @@ async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
290284
pass
291285

292286
@abstractmethod
293-
def ack_handler(self, ack_request: AckRequest):
287+
async def ack_handler(self, ack_request: AckRequest):
294288
"""
295289
The ack handler is used to acknowledge the offsets that have been read
296290
"""
297291
pass
298292

299293
@abstractmethod
300-
def nack_handler(self, nack_request: NackRequest):
294+
async def nack_handler(self, nack_request: NackRequest):
301295
"""
302296
The nack handler is used to negatively acknowledge the offsets on the source
303297
"""
304298
pass
305299

306300
@abstractmethod
307-
def pending_handler(self) -> PendingResponse:
301+
async def pending_handler(self) -> PendingResponse:
308302
"""
309303
The simple source always returns zero to indicate there is no pending record.
310304
"""
311305
pass
312306

313307
@abstractmethod
314-
def partitions_handler(self) -> PartitionsResponse:
308+
async def partitions_handler(self) -> PartitionsResponse:
315309
"""
316310
The simple source always returns zero to indicate there is no pending record.
317311
"""

packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ async def PendingFn(
213213
except BaseException as err:
214214
_LOGGER.critical("PendingFn Error", exc_info=True)
215215
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
216-
return
216+
raise
217217
resp = source_pb2.PendingResponse.Result(count=count.count)
218218
return source_pb2.PendingResponse(result=resp)
219219

@@ -228,7 +228,7 @@ async def PartitionsFn(
228228
except BaseException as err:
229229
_LOGGER.critical("PartitionsFn Error", exc_info=True)
230230
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
231-
return
231+
raise
232232
resp = source_pb2.PartitionsResponse.Result(partitions=partitions.partitions)
233233
return source_pb2.PartitionsResponse(result=resp)
234234

0 commit comments

Comments
 (0)