Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pynumaflow/sourcer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from collections.abc import AsyncIterable
from collections.abc import AsyncIterator

from google.protobuf import timestamp_pb2 as _timestamp_pb2
from google.protobuf import empty_pb2 as _empty_pb2
Expand Down Expand Up @@ -80,9 +80,9 @@ def __initialize_handlers(self):

async def ReadFn(
self,
request_iterator: AsyncIterable[source_pb2.ReadRequest],
request_iterator: AsyncIterator[source_pb2.ReadRequest],
context: NumaflowServicerContext,
) -> AsyncIterable[source_pb2.ReadResponse]:
) -> AsyncIterator[source_pb2.ReadResponse]:
"""
Handles the Read function, processing incoming requests and sending responses.
"""
Expand All @@ -108,7 +108,7 @@ async def ReadFn(

async for resp in riter:
if isinstance(resp, BaseException):
await handle_async_error(context, resp)
await handle_async_error(context, resp, ERR_UDF_EXCEPTION_STRING)
return

yield _create_read_response(resp)
Expand Down Expand Up @@ -139,9 +139,9 @@ async def __invoke_read(self, req, niter):

async def AckFn(
self,
request_iterator: AsyncIterable[source_pb2.AckRequest],
request_iterator: AsyncIterator[source_pb2.AckRequest],
context: NumaflowServicerContext,
) -> AsyncIterable[source_pb2.AckResponse]:
) -> AsyncIterator[source_pb2.AckResponse]:
"""
Handles the Ack function for user-defined source.
"""
Expand Down
Loading