Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/apis/proto/common/nack_options.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2022 The Numaproj Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";
option go_package = "github.com/numaproj/numaflow/pkg/apis/proto/common";

package nack_options;

// Used to expose options to users for nacking messages
message NackOptions {
// reason this message was nacked
optional string reason = 1;
// number of times this message should be redelivered
optional uint32 max_deliveries = 2;
// delay in millisecond
optional uint64 delay = 3;
}
3 changes: 3 additions & 0 deletions pkg/apis/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ option java_package = "io.numaproj.numaflow.source.v1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "common/nack_options.proto";
import "metadata.proto";

package source.v1;
Expand Down Expand Up @@ -169,6 +170,8 @@ message NackRequest {
message Request {
// Required field holding the offset to be nacked
repeated Offset offsets = 1;
// Options passed to the source for nacking
optional nack_options.NackOptions nack_options = 2;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/proto/sourcetransform/v1/sourcetransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ option java_package = "io.numaproj.numaflow.sourcetransformer.v1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "common/nack_options.proto";
import "metadata.proto";

package sourcetransformer.v1;
Expand Down Expand Up @@ -73,6 +74,8 @@ message SourceTransformResponse {
repeated string tags = 4;
// Metadata is the metadata of the message
metadata.Metadata metadata = 5;
// Nack options specified while nacking a message
optional nack_options.NackOptions nack_options = 6;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
Expand Down
22 changes: 12 additions & 10 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl MapHandle {
// if there are errors then we need to drain the stream and nack
if self.shutting_down_on_err {
warn!(offset = ?read_msg.message().offset, error = ?self.final_result, "Map component is shutting down because of an error, not accepting the message");
read_msg.mark_failed(self.final_result.as_ref().unwrap_err());
read_msg.mark_failed(self.final_result.as_ref().unwrap_err(), None);
} else {
let permit = Arc::clone(&ctx.semaphore).acquire_owned()
.await.map_err(|e| Error::Mapper(format!("failed to acquire semaphore: {e}")))?;
Expand Down Expand Up @@ -310,7 +310,7 @@ impl MapHandle {
if self.shutting_down_on_err {
for read_msg in read_batch {
warn!(offset = ?read_msg.message().offset, error = ?self.final_result, "Map component is shutting down because of an error, not accepting the message");
read_msg.mark_failed(self.final_result.as_ref().unwrap_err());
read_msg.mark_failed(self.final_result.as_ref().unwrap_err(), None);
}
continue;
}
Expand Down Expand Up @@ -519,6 +519,8 @@ impl From<UserDefinedMessage<'_>> for Message {
}
Some(Arc::new(metadata))
},
// TODO: update once nack option wiring in map is available
nack_options: None,
}
}
}
Expand Down Expand Up @@ -1014,7 +1016,7 @@ mod tests {

for ack_rx in ack_rxs {
let ack = ack_rx.await.unwrap();
assert_eq!(ack, ReadAck::Nak);
assert_eq!(ack, ReadAck::Nak(None));
}

tokio::time::sleep(Duration::from_millis(50)).await;
Expand Down Expand Up @@ -1128,8 +1130,8 @@ mod tests {

let ack1 = ack_rx1.await.unwrap();
let ack2 = ack_rx2.await.unwrap();
assert_eq!(ack1, ReadAck::Nak);
assert_eq!(ack2, ReadAck::Nak);
assert_eq!(ack1, ReadAck::Nak(None));
assert_eq!(ack2, ReadAck::Nak(None));

// Await the join handle and expect an error due to the panic
let result = map_handle.await.unwrap();
Expand Down Expand Up @@ -1237,7 +1239,7 @@ mod tests {
);
for ack_rx in ack_rxs {
let ack = ack_rx.await.unwrap();
assert_eq!(ack, ReadAck::Nak);
assert_eq!(ack, ReadAck::Nak(None));
}

tokio::time::sleep(Duration::from_millis(50)).await;
Expand Down Expand Up @@ -1311,7 +1313,7 @@ mod tests {

for ack_rx in ack_rxs {
let ack = ack_rx.await.unwrap();
assert_eq!(ack, ReadAck::Nak);
assert_eq!(ack, ReadAck::Nak(None));
}

Ok(())
Expand Down Expand Up @@ -1393,8 +1395,8 @@ mod tests {

let ack1 = ack_rx1.await.unwrap();
let ack2 = ack_rx2.await.unwrap();
assert_eq!(ack1, ReadAck::Nak);
assert_eq!(ack2, ReadAck::Nak);
assert_eq!(ack1, ReadAck::Nak(None));
assert_eq!(ack2, ReadAck::Nak(None));

// Await the join handle and expect an error due to the panic
let result = map_handle.await.unwrap();
Expand Down Expand Up @@ -1450,7 +1452,7 @@ mod tests {

for ack_rx in ack_rxs {
let ack = ack_rx.await.expect("Failed to await ack rx");
assert_eq!(ack, ReadAck::Nak, "Expected Nak due to panic");
assert_eq!(ack, ReadAck::Nak(None), "Expected Nak due to panic");
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/mapper/map/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl MapBatchTask {
}
Err(e) => {
error!(err=?e, "failed to map message");
mark_failed!(msg_handle, &e);
mark_failed!(msg_handle, &e, None);
return Err(e);
}
}
Expand Down
5 changes: 4 additions & 1 deletion rust/numaflow-core/src/mapper/map/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ impl MapStreamTask {
// Convert raw results to Messages using parent info.
// Strip tracing_udf from each result (map stage is done; no-op when no key
// was injected).

// TODO: Handle message nacking

for result in results {
let mut mapped_message: Message =
UserDefinedMessage(result, &parent_info, parent_info.current_index)
Expand Down Expand Up @@ -152,7 +155,7 @@ impl MapStreamTask {
}
Some(Err(e)) => {
error!(?e, "failed to map message");
mark_failed!(self.msg_handle, &e);
mark_failed!(self.msg_handle, &e, None);
let _ = self.shared_ctx.error_tx.send(e).await;
return;
}
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/mapper/map/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MapUnaryTask {
Ok(results) => results,
Err(e) => {
error!(?e, offset = ?parent_info.offset, "failed to map message");
mark_failed!(self.msg_handle, &e);
mark_failed!(self.msg_handle, &e, None);
let _ = self.shared_ctx.error_tx.send(e).await;
return;
}
Expand Down
73 changes: 60 additions & 13 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ macro_rules! mark_success {
/// ```
#[macro_export]
macro_rules! mark_failed {
($msg:expr, $err:expr) => {{
$msg.mark_failed($err);
($msg:expr, $err:expr, $opt:expr) => {{
$msg.mark_failed($err, $opt);
}};
}

Expand All @@ -65,9 +65,9 @@ macro_rules! mark_success_batch {
/// ```
#[macro_export]
macro_rules! mark_failed_batch {
($batch:expr, $err:expr) => {{
($batch:expr, $err:expr, $opt:expr) => {{
for msg in $batch {
msg.mark_failed($err);
msg.mark_failed($err, $opt);
}
}};
}
Expand All @@ -76,8 +76,9 @@ use crate::Error;
use std::cmp::{Ordering, PartialEq};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, OnceLock};
use tracing::{error, warn};

use crate::metadata::Metadata;
Expand All @@ -89,6 +90,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;

const DROP: &str = "U+005C__DROP__";
const NACK: &str = "U+005C__NACK__";

/// The message that is passed from the source to the sink.
/// NOTE: It is cheap to clone.
Expand Down Expand Up @@ -119,6 +121,8 @@ pub(crate) struct Message {
/// is_late is used to indicate if the message is a late data. Late data is data that arrives
/// after the watermark has passed. This is set only at source.
pub(crate) is_late: bool,
// TODO: Move to using Option<Box<NackOptions>> in a separate PR
pub(crate) nack_options: Option<NackOptions>,
}

/// AckHandle is used to send the ack/nak to the source. It uses a reference count to track
Expand All @@ -140,6 +144,8 @@ struct AckHandle {
/// Uses OnceLock since the span is set exactly once after construction; subsequent calls
/// to `set_pipeline_span` are no-ops.
pipeline_span: OnceLock<tracing::Span>,
// options for nacking
nack_options: OnceLock<Option<NackOptions>>,
}

impl AckHandle {
Expand All @@ -149,6 +155,7 @@ impl AckHandle {
ref_count: AtomicUsize::new(1),
failure_reason: OnceLock::new(),
pipeline_span: OnceLock::new(),
nack_options: OnceLock::new(),
}
}

Expand All @@ -172,7 +179,11 @@ impl Drop for AckHandle {
if let Some(reason) = self.failure_reason.get() {
error!(reason = reason.as_str(), "message nacked due to failure");
}
ReadAck::Nak
if let Some(nack) = self.nack_options.get() {
ReadAck::Nak(nack.clone())
} else {
ReadAck::Nak(None)
}
} else {
ReadAck::Ack
};
Expand Down Expand Up @@ -231,8 +242,9 @@ impl MessageHandle {
/// Mark the message as failed (consumes the handle), recording the reason it will be nacked.
/// ref_count is not decremented, so the message will be NAK'd when the AckHandle is dropped.
/// The error is logged at NAK time.
pub(crate) fn mark_failed(self, reason: impl fmt::Display) {
pub(crate) fn mark_failed(self, reason: impl fmt::Display, nack_option: Option<NackOptions>) {
let _ = self.ack_handle.failure_reason.set(reason.to_string());
let _ = self.ack_handle.nack_options.set(nack_option);
}

/// Creates a new MessageHandle with a different message but sharing this handle's ack tracking.
Expand Down Expand Up @@ -277,6 +289,7 @@ impl From<Message> for MessageHandle {
ref_count: AtomicUsize::new(0), // Already "success" state
failure_reason: OnceLock::new(),
pipeline_span: OnceLock::new(),
nack_options: OnceLock::new(),
}),
}
}
Expand Down Expand Up @@ -335,6 +348,7 @@ impl Default for Message {
metadata: None,
typ: Default::default(),
is_late: false,
nack_options: None,
}
}
}
Expand Down Expand Up @@ -394,6 +408,13 @@ impl Message {
.is_some_and(|tags| tags.contains(&DROP.to_string()))
}

// Check if the message should be nacked.
pub(crate) fn nacked(&self) -> bool {
self.tags
.as_ref()
.is_some_and(|tags| tags.contains(&NACK.to_string()))
}

pub(crate) fn strip_tracing_udf(&mut self) {
if let Some(ref mut metadata) = self.metadata {
Arc::make_mut(metadata)
Expand Down Expand Up @@ -487,12 +508,39 @@ impl fmt::Display for StringOffset {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub(crate) struct NackOptions {
pub(crate) delay: Option<u64>,
pub(crate) max_deliveries: Option<u32>,
pub(crate) reason: Option<String>,
}

impl From<NackOptions> for numaflow_pb::common::nack_options::NackOptions {
fn from(value: NackOptions) -> Self {
Self {
reason: value.reason,
max_deliveries: value.max_deliveries,
delay: value.delay,
}
}
}

impl From<numaflow_pb::common::nack_options::NackOptions> for NackOptions {
fn from(value: numaflow_pb::common::nack_options::NackOptions) -> Self {
Self {
reason: value.reason,
max_deliveries: value.max_deliveries,
delay: value.delay,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) enum ReadAck {
/// Message was successfully processed.
Ack,
/// Message will not be processed now and processing can move onto the next message, NAK’d message will be retried.
Nak,
Nak(Option<NackOptions>),
}

/// Message ID which is used to uniquely identify a message. It cheap to clone this.
Expand Down Expand Up @@ -591,14 +639,13 @@ impl TryFrom<Message> for BytesMut {

#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
use chrono::TimeZone;
use numaflow_pb::objects::isb::{
Body, Header, Message as ProtoMessage, MessageId, MessageInfo,
};

use super::*;
use crate::error::Result;

#[test]
fn test_offset_display() {
let offset = Offset::String(StringOffset {
Expand Down Expand Up @@ -721,7 +768,7 @@ mod tests {

// Should receive NAK
let result = ack_rx.await.unwrap();
assert_eq!(result, ReadAck::Nak);
assert_eq!(result, ReadAck::Nak(None));
}

#[tokio::test]
Expand Down Expand Up @@ -776,7 +823,7 @@ mod tests {

// Should receive NAK since not all were marked as success
let result = ack_rx.await.unwrap();
assert_eq!(result, ReadAck::Nak);
assert_eq!(result, ReadAck::Nak(None));
}

fn message_with_metadata() -> Message {
Expand Down
Loading
Loading