Describe the bug
When a message is redelivered to map-stream client, the stream method invoked for the redelivered message kicks out the sender already stored in the senders_map and inserts the new sender. If the some responses from the UDF were delivered to the previous sender, the rest of the responses will be delivered through the new sender.
Only the responses sent to the new sender will be forwarded downstream.
Evidence through logs
{"timestamp":"2026-05-11T17:33:58.041190Z","level":"INFO","message":"Received message id","self.msg_handle.message.id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.061256Z","level":"INFO","message":"Received message id","self.msg_handle.message.id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079744Z","level":"INFO","message":"Sending message id","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","child_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079750Z","level":"ERROR","message":"failed to map message","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","e":"Mapper(\"Did not receive EOT from stream receiver task\")","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079773Z","level":"INFO","message":"Sending message id","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","child_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079782Z","level":"INFO","message":"stream complete","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","produced":1,"sent":1,"target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079787Z","level":"INFO","message":"marking message as successful","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","produced":1,"sent":1,"target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.088761Z","level":"WARN","message":"Duplicate message detected","message_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","stream":"Stream { name: \"oss-analytics-numaflowperfharness-use2-e2e-map-validation-sink-0\", vertex: \"sink\", partition: 0 }","target":"numaflow_core::pipeline::isb::writer"}
{"timestamp":"2026-05-11T17:33:58.099060Z","level":"INFO","message":"Nak received for offset","params.offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-05-11T17:33:58.099067Z","level":"ERROR","message":"Cancellation received, stopping Nak retry loop","e":"ISB(OffsetNotFound(\"179336-0\"))","offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-05-11T17:33:58.099073Z","level":"ERROR","message":"Failed to nack message after retries","e":"ISB(OffsetNotFound(\"179336-0\"))","params.offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-05-11T17:33:58.118852Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.118868Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.118871Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}
Received message id log was added at the start of stream method invocation, thus we had 2 messages with same ID arrive almost simultaneously
Sending message id log was added at the receiver task side, when it sent udf response back to the execute task
failed to map message log was added when we added explicit EOT from receiver task to execute task. Thus, the sender on the receiver side was dropped since it was overwritten by the second message's sender.
marking message as successful log clearly shows only 1 response was received from the second sender (originally meant for the first sender). This will lead to only 1 response being forwarded downstream.
- We receive 3 logs for
No such req/resp ID found in StreamResponseSenderMap, 2 for the messages originally meant for the second sender, and then an EOT meant for the second sender.
To Reproduce
Observed during high throughput scenarios
Expected behavior
This should not happen
Screenshots
If applicable, add screenshots to help explain your problem.
Environment (please complete the following information):
Additional context
Add any other context about the problem here.
Message from the maintainers:
Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.
For quick help and support, join our slack channel.
Describe the bug
When a message is redelivered to map-stream client, the
streammethod invoked for the redelivered message kicks out the sender already stored in the senders_map and inserts the new sender. If the some responses from the UDF were delivered to the previous sender, the rest of the responses will be delivered through the new sender.Only the responses sent to the new sender will be forwarded downstream.
Evidence through logs
{"timestamp":"2026-05-11T17:33:58.041190Z","level":"INFO","message":"Received message id","self.msg_handle.message.id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.061256Z","level":"INFO","message":"Received message id","self.msg_handle.message.id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.079744Z","level":"INFO","message":"Sending message id","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","child_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.079750Z","level":"ERROR","message":"failed to map message","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","e":"Mapper(\"Did not receive EOT from stream receiver task\")","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.079773Z","level":"INFO","message":"Sending message id","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","child_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.079782Z","level":"INFO","message":"stream complete","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","produced":1,"sent":1,"target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.079787Z","level":"INFO","message":"marking message as successful","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","produced":1,"sent":1,"target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.088761Z","level":"WARN","message":"Duplicate message detected","message_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","stream":"Stream { name: \"oss-analytics-numaflowperfharness-use2-e2e-map-validation-sink-0\", vertex: \"sink\", partition: 0 }","target":"numaflow_core::pipeline::isb::writer"} {"timestamp":"2026-05-11T17:33:58.099060Z","level":"INFO","message":"Nak received for offset","params.offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"} {"timestamp":"2026-05-11T17:33:58.099067Z","level":"ERROR","message":"Cancellation received, stopping Nak retry loop","e":"ISB(OffsetNotFound(\"179336-0\"))","offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"} {"timestamp":"2026-05-11T17:33:58.099073Z","level":"ERROR","message":"Failed to nack message after retries","e":"ISB(OffsetNotFound(\"179336-0\"))","params.offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"} {"timestamp":"2026-05-11T17:33:58.118852Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.118868Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"} {"timestamp":"2026-05-11T17:33:58.118871Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}Received message idlog was added at the start ofstreammethod invocation, thus we had 2 messages with same ID arrive almost simultaneouslySending message idlog was added at the receiver task side, when it sent udf response back to theexecutetaskfailed to map messagelog was added when we added explicit EOT from receiver task toexecutetask. Thus, the sender on the receiver side was dropped since it was overwritten by the second message's sender.marking message as successfullog clearly shows only 1 response was received from the second sender (originally meant for the first sender). This will lead to only 1 response being forwarded downstream.No such req/resp ID found in StreamResponseSenderMap, 2 for the messages originally meant for the second sender, and then an EOT meant for the second sender.To Reproduce
Observed during high throughput scenarios
Expected behavior
This should not happen
Screenshots
If applicable, add screenshots to help explain your problem.
Environment (please complete the following information):
Additional context
Add any other context about the problem here.
Message from the maintainers:
Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.