@@ -133,10 +133,34 @@ async def _invoke_map_stream(
133133 headers = dict (req .request .headers ),
134134 )
135135
136- # Stream results from the user handler as they are produced
136+ # Stream results from the user handler as they are produced.
137+ # The asyncio.sleep(0) after each put yields control to the event loop,
138+ # allowing MapFn to consume and stream the response to gRPC immediately.
139+ # Without it, await Queue.put() on an unbounded queue completes without
140+ # suspending (Queue.full() is always False), starving other tasks.
141+ # The starvation can happen if the UDF code yields messages using regular
142+ # for-loop (non async). See the sample code in https://github.com/numaproj/numaflow-python/issues/342
143+ # With asyncio.sleep(0), this makes our below 'async for' loop equivalent to:
144+ #
145+ # while True:
146+ # msg = await handler.__anext__() # await point
147+ # await result_queue.put(...)
148+ #
149+ # The "await result_queue.put()" isn't a real await point yielding control back to
150+ # eventloop in the case of an unbounded queue. When queue is not full, it simply calls
151+ # a non-async function https://github.com/python/cpython/blob/f4c9bc899b982b9742b45cff0643fa34de3dc84d/Lib/asyncio/queues.py#L125-L154
152+ # Or you can refer the source code with:
153+ # python -c "import asyncio, inspect; print(inspect.getsource(asyncio.Queue.put))"
154+ # This results in a tight loop, blocking other tasks on event loop from proceeding.
155+ # Like in the issue linked here, if the user yields 10 messages at 1 second a part,
156+ # the task that reads from the queue can only proceed this 'async for loop' ends as
157+ # it never yields control back to eventloop. So you will see all 10 messages at the
158+ # same time in the next vertex instead of in a true streaming fashion.
159+ # The asyncio.sleep(0) will yield the control back to event loop avoiding starvation.
137160 async for msg in self .__map_stream_handler (list (req .request .keys ), datum ):
138161 res = map_pb2 .MapResponse .Result (keys = msg .keys , value = msg .value , tags = msg .tags )
139162 await result_queue .put (map_pb2 .MapResponse (results = [res ], id = req .id ))
163+ await asyncio .sleep (0 )
140164
141165 # Emit EOT for this request id
142166 await result_queue .put (
0 commit comments