Skip to content

Redelivered message in map stream can lead to partial output #3423

@vaibhavtiwari33

Description

@vaibhavtiwari33

Describe the bug

When a message is redelivered to map-stream client, the stream method invoked for the redelivered message kicks out the sender already stored in the senders_map and inserts the new sender. If the some responses from the UDF were delivered to the previous sender, the rest of the responses will be delivered through the new sender.

Only the responses sent to the new sender will be forwarded downstream.

Evidence through logs

{"timestamp":"2026-05-11T17:33:58.041190Z","level":"INFO","message":"Received message id","self.msg_handle.message.id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.061256Z","level":"INFO","message":"Received message id","self.msg_handle.message.id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079744Z","level":"INFO","message":"Sending message id","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","child_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079750Z","level":"ERROR","message":"failed to map message","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","e":"Mapper(\"Did not receive EOT from stream receiver task\")","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079773Z","level":"INFO","message":"Sending message id","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","child_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079782Z","level":"INFO","message":"stream complete","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","produced":1,"sent":1,"target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.079787Z","level":"INFO","message":"marking message as successful","parent_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0\", index: 0 }","produced":1,"sent":1,"target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.088761Z","level":"WARN","message":"Duplicate message detected","message_id":"MessageID { vertex_name: b\"stream-map\", offset: b\"179336-0-0\", index: 0 }","stream":"Stream { name: \"oss-analytics-numaflowperfharness-use2-e2e-map-validation-sink-0\", vertex: \"sink\", partition: 0 }","target":"numaflow_core::pipeline::isb::writer"}
{"timestamp":"2026-05-11T17:33:58.099060Z","level":"INFO","message":"Nak received for offset","params.offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-05-11T17:33:58.099067Z","level":"ERROR","message":"Cancellation received, stopping Nak retry loop","e":"ISB(OffsetNotFound(\"179336-0\"))","offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-05-11T17:33:58.099073Z","level":"ERROR","message":"Failed to nack message after retries","e":"ISB(OffsetNotFound(\"179336-0\"))","params.offset":"Int(IntOffset { offset: 179336, partition_idx: 0 })","target":"numaflow_core::pipeline::isb::reader"}
{"timestamp":"2026-05-11T17:33:58.118852Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.118868Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}
{"timestamp":"2026-05-11T17:33:58.118871Z","level":"WARN","message":"received error while processing stream response: Mapper Error - No such req/resp ID found in StreamResponseSenderMap: stream-map-179336-0-0","target":"numaflow_core::mapper::map::stream"}
  • Received message id log was added at the start of stream method invocation, thus we had 2 messages with same ID arrive almost simultaneously
  • Sending message id log was added at the receiver task side, when it sent udf response back to the execute task
  • failed to map message log was added when we added explicit EOT from receiver task to execute task. Thus, the sender on the receiver side was dropped since it was overwritten by the second message's sender.
  • marking message as successful log clearly shows only 1 response was received from the second sender (originally meant for the first sender). This will lead to only 1 response being forwarded downstream.
  • We receive 3 logs for No such req/resp ID found in StreamResponseSenderMap, 2 for the messages originally meant for the second sender, and then an EOT meant for the second sender.

To Reproduce
Observed during high throughput scenarios

Expected behavior
This should not happen

Screenshots
If applicable, add screenshots to help explain your problem.

Environment (please complete the following information):

  • Numaflow: main-82e480a

Additional context
Add any other context about the problem here.


Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

For quick help and support, join our slack channel.

Metadata

Metadata

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions