diff --git a/pkg/apis/proto/common/nack_options.proto b/pkg/apis/proto/common/nack_options.proto new file mode 100644 index 0000000000..2448d6ee06 --- /dev/null +++ b/pkg/apis/proto/common/nack_options.proto @@ -0,0 +1,30 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; +option go_package = "github.com/numaproj/numaflow/pkg/apis/proto/common"; + +package nack_options; + +// Used to expose options to users for nacking messages +message NackOptions { + // reason this message was nacked + optional string reason = 1; + // number of times this message should be redelivered + optional uint32 max_deliveries = 2; + // delay in millisecond + optional uint64 delay = 3; +} diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index 6ca54da4b5..ca17f4932f 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -21,6 +21,7 @@ option java_package = "io.numaproj.numaflow.source.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; +import "common/nack_options.proto"; import "metadata.proto"; package source.v1; @@ -169,6 +170,8 @@ message NackRequest { message Request { // Required field holding the offset to be nacked repeated Offset offsets = 1; + // Options passed to the source for nacking + optional nack_options.NackOptions nack_options = 2; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. Request request = 1; diff --git a/pkg/apis/proto/sourcetransform/v1/sourcetransform.proto b/pkg/apis/proto/sourcetransform/v1/sourcetransform.proto index f0e3691d40..c1d9cb03af 100644 --- a/pkg/apis/proto/sourcetransform/v1/sourcetransform.proto +++ b/pkg/apis/proto/sourcetransform/v1/sourcetransform.proto @@ -21,6 +21,7 @@ option java_package = "io.numaproj.numaflow.sourcetransformer.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; +import "common/nack_options.proto"; import "metadata.proto"; package sourcetransformer.v1; @@ -73,6 +74,8 @@ message SourceTransformResponse { repeated string tags = 4; // Metadata is the metadata of the message metadata.Metadata metadata = 5; + // Nack options specified while nacking a message + optional nack_options.NackOptions nack_options = 6; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 43092314b0..2a5151b040 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -257,7 +257,7 @@ impl MapHandle { // if there are errors then we need to drain the stream and nack if self.shutting_down_on_err { warn!(offset = ?read_msg.message().offset, error = ?self.final_result, "Map component is shutting down because of an error, not accepting the message"); - read_msg.mark_failed(self.final_result.as_ref().unwrap_err()); + read_msg.mark_failed(self.final_result.as_ref().unwrap_err(), None); } else { let permit = Arc::clone(&ctx.semaphore).acquire_owned() .await.map_err(|e| Error::Mapper(format!("failed to acquire semaphore: {e}")))?; @@ -310,7 +310,7 @@ impl MapHandle { if self.shutting_down_on_err { for read_msg in read_batch { warn!(offset = ?read_msg.message().offset, error = ?self.final_result, "Map component is shutting down because of an error, not accepting the message"); - read_msg.mark_failed(self.final_result.as_ref().unwrap_err()); + read_msg.mark_failed(self.final_result.as_ref().unwrap_err(), None); } continue; } @@ -519,6 +519,8 @@ impl From> for Message { } Some(Arc::new(metadata)) }, + // TODO: update once nack option wiring in map is available + nack_options: None, } } } @@ -1014,7 +1016,7 @@ mod tests { for ack_rx in ack_rxs { let ack = ack_rx.await.unwrap(); - assert_eq!(ack, ReadAck::Nak); + assert_eq!(ack, ReadAck::Nak(None)); } tokio::time::sleep(Duration::from_millis(50)).await; @@ -1128,8 +1130,8 @@ mod tests { let ack1 = ack_rx1.await.unwrap(); let ack2 = ack_rx2.await.unwrap(); - assert_eq!(ack1, ReadAck::Nak); - assert_eq!(ack2, ReadAck::Nak); + assert_eq!(ack1, ReadAck::Nak(None)); + assert_eq!(ack2, ReadAck::Nak(None)); // Await the join handle and expect an error due to the panic let result = map_handle.await.unwrap(); @@ -1237,7 +1239,7 @@ mod tests { ); for ack_rx in ack_rxs { let ack = ack_rx.await.unwrap(); - assert_eq!(ack, ReadAck::Nak); + assert_eq!(ack, ReadAck::Nak(None)); } tokio::time::sleep(Duration::from_millis(50)).await; @@ -1311,7 +1313,7 @@ mod tests { for ack_rx in ack_rxs { let ack = ack_rx.await.unwrap(); - assert_eq!(ack, ReadAck::Nak); + assert_eq!(ack, ReadAck::Nak(None)); } Ok(()) @@ -1393,8 +1395,8 @@ mod tests { let ack1 = ack_rx1.await.unwrap(); let ack2 = ack_rx2.await.unwrap(); - assert_eq!(ack1, ReadAck::Nak); - assert_eq!(ack2, ReadAck::Nak); + assert_eq!(ack1, ReadAck::Nak(None)); + assert_eq!(ack2, ReadAck::Nak(None)); // Await the join handle and expect an error due to the panic let result = map_handle.await.unwrap(); @@ -1450,7 +1452,7 @@ mod tests { for ack_rx in ack_rxs { let ack = ack_rx.await.expect("Failed to await ack rx"); - assert_eq!(ack, ReadAck::Nak, "Expected Nak due to panic"); + assert_eq!(ack, ReadAck::Nak(None), "Expected Nak due to panic"); } Ok(()) diff --git a/rust/numaflow-core/src/mapper/map/batch.rs b/rust/numaflow-core/src/mapper/map/batch.rs index 74731f76a9..208e769643 100644 --- a/rust/numaflow-core/src/mapper/map/batch.rs +++ b/rust/numaflow-core/src/mapper/map/batch.rs @@ -166,7 +166,7 @@ impl MapBatchTask { } Err(e) => { error!(err=?e, "failed to map message"); - mark_failed!(msg_handle, &e); + mark_failed!(msg_handle, &e, None); return Err(e); } } diff --git a/rust/numaflow-core/src/mapper/map/stream.rs b/rust/numaflow-core/src/mapper/map/stream.rs index ec9bec7184..7012f9193c 100644 --- a/rust/numaflow-core/src/mapper/map/stream.rs +++ b/rust/numaflow-core/src/mapper/map/stream.rs @@ -105,6 +105,9 @@ impl MapStreamTask { // Convert raw results to Messages using parent info. // Strip tracing_udf from each result (map stage is done; no-op when no key // was injected). + + // TODO: Handle message nacking + for result in results { let mut mapped_message: Message = UserDefinedMessage(result, &parent_info, parent_info.current_index) @@ -152,7 +155,7 @@ impl MapStreamTask { } Some(Err(e)) => { error!(?e, "failed to map message"); - mark_failed!(self.msg_handle, &e); + mark_failed!(self.msg_handle, &e, None); let _ = self.shared_ctx.error_tx.send(e).await; return; } diff --git a/rust/numaflow-core/src/mapper/map/unary.rs b/rust/numaflow-core/src/mapper/map/unary.rs index 3ade159b41..7906d1eedd 100644 --- a/rust/numaflow-core/src/mapper/map/unary.rs +++ b/rust/numaflow-core/src/mapper/map/unary.rs @@ -93,7 +93,7 @@ impl MapUnaryTask { Ok(results) => results, Err(e) => { error!(?e, offset = ?parent_info.offset, "failed to map message"); - mark_failed!(self.msg_handle, &e); + mark_failed!(self.msg_handle, &e, None); let _ = self.shared_ctx.error_tx.send(e).await; return; } diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index 902623f536..02c92dd8ed 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -37,8 +37,8 @@ macro_rules! mark_success { /// ``` #[macro_export] macro_rules! mark_failed { - ($msg:expr, $err:expr) => {{ - $msg.mark_failed($err); + ($msg:expr, $err:expr, $opt:expr) => {{ + $msg.mark_failed($err, $opt); }}; } @@ -65,9 +65,9 @@ macro_rules! mark_success_batch { /// ``` #[macro_export] macro_rules! mark_failed_batch { - ($batch:expr, $err:expr) => {{ + ($batch:expr, $err:expr, $opt:expr) => {{ for msg in $batch { - msg.mark_failed($err); + msg.mark_failed($err, $opt); } }}; } @@ -76,8 +76,9 @@ use crate::Error; use std::cmp::{Ordering, PartialEq}; use std::collections::HashMap; use std::fmt; +use std::sync::Arc; +use std::sync::OnceLock; use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, OnceLock}; use tracing::{error, warn}; use crate::metadata::Metadata; @@ -89,6 +90,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; const DROP: &str = "U+005C__DROP__"; +const NACK: &str = "U+005C__NACK__"; /// The message that is passed from the source to the sink. /// NOTE: It is cheap to clone. @@ -119,6 +121,8 @@ pub(crate) struct Message { /// is_late is used to indicate if the message is a late data. Late data is data that arrives /// after the watermark has passed. This is set only at source. pub(crate) is_late: bool, + // TODO: Move to using Option> in a separate PR + pub(crate) nack_options: Option, } /// AckHandle is used to send the ack/nak to the source. It uses a reference count to track @@ -140,6 +144,8 @@ struct AckHandle { /// Uses OnceLock since the span is set exactly once after construction; subsequent calls /// to `set_pipeline_span` are no-ops. pipeline_span: OnceLock, + // options for nacking + nack_options: OnceLock>, } impl AckHandle { @@ -149,6 +155,7 @@ impl AckHandle { ref_count: AtomicUsize::new(1), failure_reason: OnceLock::new(), pipeline_span: OnceLock::new(), + nack_options: OnceLock::new(), } } @@ -172,7 +179,11 @@ impl Drop for AckHandle { if let Some(reason) = self.failure_reason.get() { error!(reason = reason.as_str(), "message nacked due to failure"); } - ReadAck::Nak + if let Some(nack) = self.nack_options.get() { + ReadAck::Nak(nack.clone()) + } else { + ReadAck::Nak(None) + } } else { ReadAck::Ack }; @@ -231,8 +242,9 @@ impl MessageHandle { /// Mark the message as failed (consumes the handle), recording the reason it will be nacked. /// ref_count is not decremented, so the message will be NAK'd when the AckHandle is dropped. /// The error is logged at NAK time. - pub(crate) fn mark_failed(self, reason: impl fmt::Display) { + pub(crate) fn mark_failed(self, reason: impl fmt::Display, nack_option: Option) { let _ = self.ack_handle.failure_reason.set(reason.to_string()); + let _ = self.ack_handle.nack_options.set(nack_option); } /// Creates a new MessageHandle with a different message but sharing this handle's ack tracking. @@ -277,6 +289,7 @@ impl From for MessageHandle { ref_count: AtomicUsize::new(0), // Already "success" state failure_reason: OnceLock::new(), pipeline_span: OnceLock::new(), + nack_options: OnceLock::new(), }), } } @@ -335,6 +348,7 @@ impl Default for Message { metadata: None, typ: Default::default(), is_late: false, + nack_options: None, } } } @@ -394,6 +408,13 @@ impl Message { .is_some_and(|tags| tags.contains(&DROP.to_string())) } + // Check if the message should be nacked. + pub(crate) fn nacked(&self) -> bool { + self.tags + .as_ref() + .is_some_and(|tags| tags.contains(&NACK.to_string())) + } + pub(crate) fn strip_tracing_udf(&mut self) { if let Some(ref mut metadata) = self.metadata { Arc::make_mut(metadata) @@ -487,12 +508,39 @@ impl fmt::Display for StringOffset { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub(crate) struct NackOptions { + pub(crate) delay: Option, + pub(crate) max_deliveries: Option, + pub(crate) reason: Option, +} + +impl From for numaflow_pb::common::nack_options::NackOptions { + fn from(value: NackOptions) -> Self { + Self { + reason: value.reason, + max_deliveries: value.max_deliveries, + delay: value.delay, + } + } +} + +impl From for NackOptions { + fn from(value: numaflow_pb::common::nack_options::NackOptions) -> Self { + Self { + reason: value.reason, + max_deliveries: value.max_deliveries, + delay: value.delay, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub(crate) enum ReadAck { /// Message was successfully processed. Ack, /// Message will not be processed now and processing can move onto the next message, NAK’d message will be retried. - Nak, + Nak(Option), } /// Message ID which is used to uniquely identify a message. It cheap to clone this. @@ -591,14 +639,13 @@ impl TryFrom for BytesMut { #[cfg(test)] mod tests { + use super::*; + use crate::error::Result; use chrono::TimeZone; use numaflow_pb::objects::isb::{ Body, Header, Message as ProtoMessage, MessageId, MessageInfo, }; - use super::*; - use crate::error::Result; - #[test] fn test_offset_display() { let offset = Offset::String(StringOffset { @@ -721,7 +768,7 @@ mod tests { // Should receive NAK let result = ack_rx.await.unwrap(); - assert_eq!(result, ReadAck::Nak); + assert_eq!(result, ReadAck::Nak(None)); } #[tokio::test] @@ -776,7 +823,7 @@ mod tests { // Should receive NAK since not all were marked as success let result = ack_rx.await.unwrap(); - assert_eq!(result, ReadAck::Nak); + assert_eq!(result, ReadAck::Nak(None)); } fn message_with_metadata() -> Message { diff --git a/rust/numaflow-core/src/monovertex/bypass_router.rs b/rust/numaflow-core/src/monovertex/bypass_router.rs index 1e3a3ae5dc..77d35663e9 100644 --- a/rust/numaflow-core/src/monovertex/bypass_router.rs +++ b/rust/numaflow-core/src/monovertex/bypass_router.rs @@ -261,7 +261,7 @@ impl BypassRouterReceiver { | MessageToSink::Fallback(h) | MessageToSink::OnSuccess(h) => h, }; - handle.mark_failed(self.final_result.as_ref().unwrap_err()); + handle.mark_failed(self.final_result.as_ref().unwrap_err(), None); } continue; } @@ -325,7 +325,7 @@ impl BypassRouterReceiver { error!(?e, "Error writing to sink, initiating shutdown."); cln_token.cancel(); for msg in msg_handles { - msg.mark_failed(&e); + msg.mark_failed(&e, None); } self.final_result = Err(e); self.shutting_down_on_err = true; @@ -400,6 +400,7 @@ mod tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + nack_options: None, } } diff --git a/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs index f1abdc0a31..452995d4c6 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs @@ -421,6 +421,7 @@ mod simple_buffer_tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + nack_options: None, }; writer.write(msg).await.expect("write should succeed"); } diff --git a/rust/numaflow-core/src/pipeline/isb.rs b/rust/numaflow-core/src/pipeline/isb.rs index 8512517356..3a2d1f03a3 100644 --- a/rust/numaflow-core/src/pipeline/isb.rs +++ b/rust/numaflow-core/src/pipeline/isb.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use std::time::Duration; use crate::error::Result; -use crate::message::{Message, Offset}; +use crate::message::{Message, NackOptions, Offset}; /// A boxed future representing a pending write operation. /// This is returned by `ISBWriter::async_write()` and can be awaited @@ -57,9 +57,7 @@ pub(crate) trait LocalISBReader: Sync { /// Negative acknowledgment - indicates the message should be redelivered. /// /// The implementation uses the offset to identify which message to nack. - /// Pass `delay = Some(...)` to defer redelivery by that duration when the - /// underlying ISB supports it; `None` redelivers as soon as the ISB allows. - async fn nack(&self, offset: &Offset, delay: Option) -> Result<()>; + async fn nack(&self, offset: &Offset, nack_options: Option) -> Result<()>; /// Returns the number of pending (unprocessed) messages, if available. /// diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/js_reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/js_reader.rs index 4fdfe352d1..f186be22fe 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/js_reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/js_reader.rs @@ -6,7 +6,7 @@ use crate::Result; use crate::config::get_vertex_name; use crate::config::pipeline::isb::{CompressionType, ISBConfig, Stream}; use crate::error::Error; -use crate::message::{IntOffset, Message, MessageID, MessageType, Offset}; +use crate::message::{IntOffset, Message, MessageID, MessageType, NackOptions, Offset}; use crate::metadata::Metadata; use crate::pipeline::isb::compression; use crate::pipeline::isb::error::ISBError; @@ -89,6 +89,7 @@ impl JSWrappedMessage { watermark: None, metadata: header.metadata.map(|m| Arc::new(Metadata::from(m))), is_late: message_info.is_late, + nack_options: None, }) } } @@ -233,10 +234,17 @@ impl JetStreamReader { } /// Negatively acknowledge the offset, optionally deferring redelivery by `delay`. - pub(crate) async fn nack(&self, offset: &Offset, delay: Option) -> Result<()> { + pub(crate) async fn nack( + &self, + offset: &Offset, + nack_options: Option, + ) -> Result<()> { let msg = self .get_js_message(offset, true) .ok_or_else(|| Error::ISB(ISBError::OffsetNotFound(offset.to_string())))?; + let delay = nack_options + .and_then(|option| option.delay) + .map(Duration::from_millis); msg.ack_with(AckKind::Nak(delay)) .await .map_err(|e| Error::ISB(ISBError::Nack(format!("offset {}: {}", offset, e))))?; @@ -281,8 +289,8 @@ impl crate::pipeline::isb::ISBReader for JetStreamReader { JetStreamReader::ack(self, offset).await } - async fn nack(&self, offset: &Offset, delay: Option) -> Result<()> { - JetStreamReader::nack(self, offset, delay).await + async fn nack(&self, offset: &Offset, nack_options: Option) -> Result<()> { + JetStreamReader::nack(self, offset, nack_options).await } async fn pending(&self) -> Result> { diff --git a/rust/numaflow-core/src/pipeline/isb/reader.rs b/rust/numaflow-core/src/pipeline/isb/reader.rs index ee34d2f65d..9a98c097db 100644 --- a/rust/numaflow-core/src/pipeline/isb/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/reader.rs @@ -20,7 +20,9 @@ use crate::error::Error; use crate::mark_success; #[cfg(test)] use crate::mark_success_batch; -use crate::message::{IntOffset, Message, MessageHandle, MessageType, Offset, ReadAck}; +use crate::message::{ + IntOffset, Message, MessageHandle, MessageType, NackOptions, Offset, ReadAck, +}; use crate::metrics::pipeline_drop_metric_labels; use crate::metrics::{ PIPELINE_PARTITION_NAME_LABEL, jetstream_isb_error_metrics_labels, @@ -219,7 +221,7 @@ impl ISBReaderOrchestrator { let _ = params.reader.mark_wip(¶ms.offset).await; }, res = &mut params.ack_rx => { - match res.unwrap_or(ReadAck::Nak) { + match res.unwrap_or(ReadAck::Nak(None)) { ReadAck::Ack => { let ack_start = Instant::now(); if let Err(e) = Self::ack_with_retry(¶ms.reader, ¶ms.offset, ¶ms.cancel).await { @@ -233,9 +235,9 @@ impl ISBReaderOrchestrator { ); } }, - ReadAck::Nak => { + ReadAck::Nak(option) => { info!(?params.offset, "Nak received for offset"); - if let Err(e) = Self::nak_with_retry(¶ms.reader, ¶ms.offset, ¶ms.cancel).await { + if let Err(e) = Self::nak_with_retry(¶ms.reader, ¶ms.offset, option, ¶ms.cancel).await { error!(?e, ?params.offset, "Failed to nack message after retries"); } }, @@ -282,12 +284,13 @@ impl ISBReaderOrchestrator { async fn nak_with_retry( reader: &Arc, offset: &Offset, + option: Option, cancel: &CancellationToken, ) -> Result<()> { let interval = fixed::Interval::from_millis(ACK_RETRY_INTERVAL).take(ACK_RETRY_ATTEMPTS); let result = Retry::new( interval, - async || reader.nack(offset, None).await, + async || reader.nack(offset, option.clone()).await, |e: &Error| { if cancel.is_cancelled() { error!( @@ -1210,6 +1213,7 @@ mod tests { let result = ISBReaderOrchestrator::::nak_with_retry( &js_reader, &offset, + None, &cancel_token, ) .await; @@ -1266,6 +1270,7 @@ mod tests { let result = ISBReaderOrchestrator::::nak_with_retry( &js_reader, &missing_offset, + None, &cancel_token, ) .await; @@ -1507,6 +1512,7 @@ mod simplebuffer_tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + nack_options: None, }; writer.write(msg).await.expect("write should succeed"); } @@ -1657,6 +1663,7 @@ mod simplebuffer_tests { let nack_result = ISBReaderOrchestrator::::nak_with_retry( &reader, &missing_offset, + None, &cancel, ) .await; @@ -1947,7 +1954,7 @@ mod duplicate_inflight_tests { batch: Arc>>>, acks: Arc>>, #[allow(clippy::type_complexity)] - nacks: Arc)>>>, + nacks: Arc)>>>, } impl ScriptedReader { @@ -1970,8 +1977,11 @@ mod duplicate_inflight_tests { Ok(()) } - async fn nack(&self, offset: &Offset, delay: Option) -> Result<()> { - self.nacks.lock().unwrap().push((offset.clone(), delay)); + async fn nack(&self, offset: &Offset, nack_options: Option) -> Result<()> { + self.nacks + .lock() + .unwrap() + .push((offset.clone(), nack_options)); Ok(()) } @@ -2013,6 +2023,7 @@ mod duplicate_inflight_tests { headers: Arc::new(std::collections::HashMap::new()), metadata: None, is_late: false, + nack_options: None, } } diff --git a/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs b/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs index a3bb1df709..bd5effd172 100644 --- a/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs +++ b/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs @@ -16,7 +16,7 @@ use numaflow_testing::simplebuffer::{ use numaflow_throttling::NoOpRateLimiter; use crate::error::Error; -use crate::message::{IntOffset, Message, MessageID, Offset}; +use crate::message::{IntOffset, Message, MessageID, NackOptions, Offset}; use crate::pipeline::isb::error::ISBError; use crate::pipeline::isb::{ISBReader, ISBWriter, PendingWrite, WriteError, WriteResult}; use crate::typ::NumaflowTypeConfig; @@ -138,6 +138,7 @@ fn convert_message(read_msg: ReadMessage) -> Message { headers: Arc::new(read_msg.headers), metadata: None, is_late: false, + nack_options: None, } } @@ -155,7 +156,7 @@ impl ISBReader for SimpleReaderAdapter { self.inner.ack(&simple_offset).await.map_err(|e| e.into()) } - async fn nack(&self, offset: &Offset, _delay: Option) -> crate::Result<()> { + async fn nack(&self, offset: &Offset, _nack_options: Option) -> crate::Result<()> { // SimpleBuffer has no broker-side delay; the optional `delay` is ignored. let simple_offset = offset.into(); self.inner.nack(&simple_offset).await.map_err(|e| e.into()) @@ -311,6 +312,7 @@ mod tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + nack_options: None, } } diff --git a/rust/numaflow-core/src/pipeline/isb/writer.rs b/rust/numaflow-core/src/pipeline/isb/writer.rs index 4b1472ea5a..76c59dbbe2 100644 --- a/rust/numaflow-core/src/pipeline/isb/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/writer.rs @@ -93,6 +93,12 @@ impl ISBWriteTask { return; } + if message.nacked() { + // TODO: send nacked message metric + mark_failed!(self.msg_handle, "message nacked", message.nack_options); + return; + } + // Route and write to appropriate streams let write_results = self .orchestrator @@ -119,7 +125,8 @@ impl ISBWriteTask { self.msg_handle, format!( "Token cancelled during ISB write: {cancelled_count} write(s) did not complete" - ) + ), + None ); return; } @@ -145,7 +152,8 @@ impl ISBWriteTask { "PAF resolution failed: {}/{} writes succeeded", resolved_offsets.len(), expected_writes - ) + ), + None ); return; } @@ -1104,7 +1112,7 @@ mod tests { mod simple_buffer_tests { use super::*; use crate::config::pipeline::isb::BufferWriterConfig; - use crate::message::{IntOffset, MessageHandle, MessageID, ReadAck}; + use crate::message::{IntOffset, MessageHandle, MessageID, NackOptions, ReadAck}; use crate::pipeline::isb::simplebuffer::{SimpleBufferAdapter, WithSimpleBuffer}; use bytes::Bytes; use chrono::Utc; @@ -1135,6 +1143,7 @@ mod simple_buffer_tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + nack_options: None, }; (MessageHandle::new(message, ack_tx), ack_rx) } @@ -1408,7 +1417,7 @@ mod simple_buffer_tests { .await .expect("Should receive nack") .unwrap(); - assert_eq!(ack, ReadAck::Nak); + assert_eq!(ack, ReadAck::Nak(None)); handle.await.unwrap().unwrap(); assert_eq!(adapter.pending_count(), 0); @@ -1791,7 +1800,7 @@ mod simple_buffer_tests { .await .expect("Should receive ack response") .unwrap(); - assert_eq!(ack, ReadAck::Nak); + assert_eq!(ack, ReadAck::Nak(None)); drop(tx); @@ -1959,6 +1968,7 @@ mod simple_buffer_tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + nack_options: None, } } @@ -2134,7 +2144,7 @@ mod simple_buffer_tests { async fn ack(&self, _offset: &Offset) -> Result<()> { Ok(()) } - async fn nack(&self, _offset: &Offset, _delay: Option) -> Result<()> { + async fn nack(&self, _offset: &Offset, _nack_options: Option) -> Result<()> { Ok(()) } async fn pending(&self) -> Result> { @@ -2223,7 +2233,7 @@ mod simple_buffer_tests { // EXPECTED: cancelled-during-retry NAKs so the source can redeliver. assert_eq!( ack, - ReadAck::Nak, + ReadAck::Nak(None), "Cancellation during retry must NAK to preserve at-least-once. Got ACK" ); diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs index 7ef9a6bba7..57af77a2b7 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs @@ -106,6 +106,7 @@ impl From for Message { headers: Arc::new(result.headers), metadata: None, is_late: false, + nack_options: None, } } } diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs index f5e00f53cd..ff0fa30618 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs @@ -155,6 +155,7 @@ impl From for Message { headers: Arc::new(HashMap::new()), // reset headers since it is a new message metadata: None, is_late: false, + nack_options: None, } } } diff --git a/rust/numaflow-core/src/reduce/wal/segment/append.rs b/rust/numaflow-core/src/reduce/wal/segment/append.rs index 5ab4d8d6b2..c064ee8671 100644 --- a/rust/numaflow-core/src/reduce/wal/segment/append.rs +++ b/rust/numaflow-core/src/reduce/wal/segment/append.rs @@ -20,6 +20,7 @@ use tracing::{debug, error, info}; /// Duration after which the WAL Segment is considered stale. const ROTATE_IF_STALE_DURATION: chrono::Duration = chrono::Duration::seconds(30); +#[allow(clippy::large_enum_variant)] /// The Command that has to be operated on the Segment. pub(crate) enum SegmentWriteMessage { /// Writes a message to the WAL. The message will be converted to bytes internally. @@ -151,7 +152,7 @@ impl SegmentWriteActor { } Err(e) => { error!(?e, "Failed to write message to WAL"); - read_message.mark_failed(&e); + read_message.mark_failed(&e, None); Err(e) } } diff --git a/rust/numaflow-core/src/sinker/sink.rs b/rust/numaflow-core/src/sinker/sink.rs index 9f169916dd..fc030ada22 100644 --- a/rust/numaflow-core/src/sinker/sink.rs +++ b/rust/numaflow-core/src/sinker/sink.rs @@ -29,9 +29,9 @@ use tracing::{error, info}; use crate::config::monovertex::BypassConditions; use crate::sinker::builder::HealthCheckClients; -use serve::{ServingStore, StoreEntry}; // Re-export SinkWriterBuilder for external use pub(crate) use crate::sinker::builder::SinkWriterBuilder; +use serve::{ServingStore, StoreEntry}; /// A [Blackhole] sink which reads but never writes to anywhere, semantic equivalent of `/dev/null`. /// @@ -215,7 +215,7 @@ impl SinkWriter { // We are in shutting down mode, NAK all messages if self.shutting_down_on_err { for msg in read_batch { - msg.mark_failed(self.final_result.as_ref().unwrap_err()); + msg.mark_failed(self.final_result.as_ref().unwrap_err(), None); } continue; } @@ -227,7 +227,7 @@ impl SinkWriter { mark_success_batch!(read_batch); } Err(e) => { - mark_failed_batch!(read_batch, &e); + mark_failed_batch!(read_batch, &e, None); // Critical error, cancel upstream and initiate shutdown error!(?e, "Error writing to sink, initiating shutdown."); cln_token.cancel(); @@ -1067,7 +1067,7 @@ mod tests { let _ = handle.await.unwrap(); for ack_rx in ack_rxs { - assert_eq!(ack_rx.await.unwrap(), ReadAck::Nak); + assert_eq!(ack_rx.await.unwrap(), ReadAck::Nak(None)); } // check if the tracker is empty diff --git a/rust/numaflow-core/src/sinker/sink/sqs.rs b/rust/numaflow-core/src/sinker/sink/sqs.rs index a0a9272edb..70a2131a1e 100644 --- a/rust/numaflow-core/src/sinker/sink/sqs.rs +++ b/rust/numaflow-core/src/sinker/sink/sqs.rs @@ -162,6 +162,7 @@ mod unit_tests { headers: Arc::new(headers.clone()), metadata: None, is_late: false, + nack_options: None, }; let sink_msg: SqsSinkMessage = msg.try_into().unwrap(); @@ -220,6 +221,7 @@ mod unit_tests { headers: Arc::new(headers), metadata: Some(Arc::new(metadata)), is_late: false, + nack_options: None, }; let sink_msg: SqsSinkMessage = msg.try_into().unwrap(); @@ -283,6 +285,7 @@ mod unit_tests { headers: Arc::new(headers), metadata: Some(Arc::new(metadata)), is_late: false, + nack_options: None, }; let sink_msg: SqsSinkMessage = msg.try_into().unwrap(); diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index e17e8b3bf6..dcf950eada 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -7,7 +7,7 @@ use crate::config::pipeline::VERTEX_TYPE_SOURCE; use crate::config::{get_vertex_name, is_mono_vertex}; use crate::error::{Error, Result}; -use crate::message::{MessageHandle, ReadAck}; +use crate::message::{MessageHandle, NackOptions, ReadAck}; use crate::metrics::{ PIPELINE_PARTITION_NAME_LABEL, SOURCE_PARTITION_NAME_LABEL, monovertex_metrics, mvtx_forward_metric_labels, pipeline_drop_metric_labels, pipeline_metric_labels, @@ -124,7 +124,7 @@ pub(crate) trait LocalSourceAcker { /// negatively acknowledge an offset. The implementor might choose to do it in an asynchronous way. /// For sources that don't support nack, this should be a no-op. - async fn nack(&mut self, offsets: Vec) -> Result<()>; + async fn nack(&mut self, offsets: Vec, options: Option) -> Result<()>; } pub(crate) enum SourceType { @@ -161,6 +161,7 @@ enum ActorMessage { Nack { respond_to: oneshot::Sender>, offsets: Vec, + options: Option, }, Pending { respond_to: oneshot::Sender>>, @@ -212,8 +213,9 @@ where ActorMessage::Nack { respond_to, offsets, + options, } => { - let nack = self.acker.nack(offsets).await; + let nack = self.acker.nack(offsets, options).await; let _ = respond_to.send(nack); } ActorMessage::Pending { respond_to } => { @@ -433,11 +435,16 @@ impl Source { } /// nack the offsets by communicating with the nack actor. - async fn nack(source_handle: mpsc::Sender, offsets: Vec) -> Result<()> { + async fn nack( + source_handle: mpsc::Sender, + offsets: Vec, + options: Option, + ) -> Result<()> { let (sender, receiver) = oneshot::channel(); let msg = ActorMessage::Nack { respond_to: sender, offsets, + options, }; // Ignore send errors. If send fails, so does the recv.await below. There's no reason // to check for the same failure twice. @@ -779,9 +786,9 @@ impl Source { Ok(ReadAck::Ack) => { offsets_to_ack.push(offset.clone()); } - Ok(ReadAck::Nak) => { + Ok(ReadAck::Nak(options)) => { warn!(?offset, "Nak received for offset"); - offsets_to_nack.push(offset.clone()); + offsets_to_nack.push((offset.clone(), options)); } Err(e) => { error!(?offset, err=?e, "Error receiving ack for offset"); @@ -798,7 +805,13 @@ impl Source { Self::ack_with_retry(source_handle.clone(), offsets_to_ack, &cancel_token).await?; } if !offsets_to_nack.is_empty() { - Self::nack_with_retry(source_handle, offsets_to_nack, &cancel_token).await?; + let mut nack_map: HashMap, Vec> = HashMap::new(); + offsets_to_nack + .into_iter() + .for_each(|(offset, option)| nack_map.entry(option).or_default().push(offset)); + for (option, offsets) in nack_map { + Self::nack_with_retry(source_handle.clone(), offsets, option, &cancel_token).await? + } } Self::send_ack_metrics(e2e_start_time, n, start); @@ -840,13 +853,14 @@ impl Source { async fn nack_with_retry( source_handle: mpsc::Sender, offsets: Vec, + options: Option, cancel_token: &CancellationToken, ) -> Result<()> { // In practice, this retry should exit early since it is invoked during ISB/map/sink errors, which results in CancellationToken cancellation let interval = fixed::Interval::from_millis(ACK_RETRY_INTERVAL).take(NACK_RETRY_ATTEMPTS); let _ = Retry::new( interval, - async || Self::nack(source_handle.clone(), offsets.clone()).await, + async || Self::nack(source_handle.clone(), offsets.clone(), options.clone()).await, |error: &Error| { error!(?error, "Failed to send nack to source, retrying..."); // Don't retry non-retryable errors diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 870021d000..3c90c5899e 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -1,7 +1,7 @@ use crate::config::components::source::GeneratorConfig; use crate::config::get_vertex_replica; -use crate::message::{Message, Offset}; +use crate::message::{Message, NackOptions, Offset}; use crate::reader; use crate::source; use tokio_stream::StreamExt; @@ -198,6 +198,7 @@ mod stream_generator { // Set default metadata so that metadata is always present. metadata: Some(Arc::new(crate::metadata::Metadata::default())), is_late: false, + nack_options: None, } } @@ -398,7 +399,11 @@ impl source::SourceAcker for GeneratorAck { Ok(()) } - async fn nack(&mut self, _: Vec) -> crate::error::Result<()> { + async fn nack( + &mut self, + _: Vec, + _options: Option, + ) -> crate::error::Result<()> { // Generator source doesn't support nack - no-op Ok(()) } diff --git a/rust/numaflow-core/src/source/http.rs b/rust/numaflow-core/src/source/http.rs index 05509364b9..b1b37bdd2f 100644 --- a/rust/numaflow-core/src/source/http.rs +++ b/rust/numaflow-core/src/source/http.rs @@ -2,7 +2,7 @@ use crate::config::{get_vertex_name, get_vertex_replica}; use crate::error::Result; -use crate::message::{Message, MessageID, Offset, StringOffset}; +use crate::message::{Message, MessageID, NackOptions, Offset, StringOffset}; use crate::metadata::Metadata; use crate::source; use crate::source::{SourceAcker, SourceReader}; @@ -41,6 +41,7 @@ impl From for Message { // Set default metadata so that metadata is always present. metadata: Some(Arc::new(Metadata::default())), is_late: false, + nack_options: None, } } } @@ -106,7 +107,7 @@ impl SourceAcker for CoreHttpSource { .map_err(|e| e.into()) } - async fn nack(&mut self, offsets: Vec) -> Result<()> { + async fn nack(&mut self, offsets: Vec, _options: Option) -> Result<()> { // extract the ids from the offsets, id was used to create the offset let ids = offsets .into_iter() diff --git a/rust/numaflow-core/src/source/jetstream.rs b/rust/numaflow-core/src/source/jetstream.rs index 7378746df0..d26d5de7b3 100644 --- a/rust/numaflow-core/src/source/jetstream.rs +++ b/rust/numaflow-core/src/source/jetstream.rs @@ -6,7 +6,7 @@ use std::time::Duration; use tokio_util::sync::CancellationToken; use crate::config::{get_vertex_name, get_vertex_replica}; -use crate::message::{IntOffset, MessageID, Offset}; +use crate::message::{IntOffset, MessageID, NackOptions, Offset}; use crate::metadata::Metadata; use crate::source::SourceReader; use crate::{Error, Result, message::Message}; @@ -37,6 +37,7 @@ impl From for Message { // Set default metadata so that metadata is always present. metadata: Some(Arc::new(Metadata::default())), is_late: false, + nack_options: None, } } } @@ -91,7 +92,7 @@ impl SourceAcker for JetstreamSource { Ok(()) } - async fn nack(&mut self, offsets: Vec) -> Result<()> { + async fn nack(&mut self, offsets: Vec, _options: Option) -> Result<()> { let mut jetstream_offsets = Vec::with_capacity(offsets.len()); for offset in offsets { let Offset::Int(seq_num) = offset else { @@ -114,7 +115,6 @@ impl super::LagReader for JetstreamSource { #[cfg(test)] mod tests { - use std::collections::HashMap; use bytes::Bytes; diff --git a/rust/numaflow-core/src/source/kafka.rs b/rust/numaflow-core/src/source/kafka.rs index 8418a59b03..fee0a4b15f 100644 --- a/rust/numaflow-core/src/source/kafka.rs +++ b/rust/numaflow-core/src/source/kafka.rs @@ -7,7 +7,7 @@ use tracing::info; use crate::config::get_vertex_name; use crate::error::Error; -use crate::message::{Message, MessageID, Offset, StringOffset}; +use crate::message::{Message, MessageID, NackOptions, Offset, StringOffset}; use crate::metadata::Metadata; use crate::source; @@ -53,6 +53,7 @@ impl TryFrom for Message { // Set default metadata so that metadata is always present. metadata: Some(Arc::new(Metadata::default())), is_late: false, + nack_options: None, }) } } @@ -151,7 +152,11 @@ impl source::SourceAcker for KafkaSource { self.ack_messages(kafka_offsets).await.map_err(Into::into) } - async fn nack(&mut self, offsets: Vec) -> crate::error::Result<()> { + async fn nack( + &mut self, + offsets: Vec, + _options: Option, + ) -> crate::error::Result<()> { info!(?offsets, "Nack invoked for offsets (no-op for Kafka)"); // Kafka doesn't support nack - no-op Ok(()) diff --git a/rust/numaflow-core/src/source/nats.rs b/rust/numaflow-core/src/source/nats.rs index 068b2d3922..3b3d065bd2 100644 --- a/rust/numaflow-core/src/source/nats.rs +++ b/rust/numaflow-core/src/source/nats.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use crate::config::{get_vertex_name, get_vertex_replica}; -use crate::message::Message; +use crate::message::{Message, NackOptions}; use crate::message::{MessageID, Offset, StringOffset}; use crate::metadata::Metadata; use crate::source::SourceReader; @@ -39,6 +39,7 @@ impl From for Message { // Set default metadata so that metadata is always present. metadata: Some(Arc::new(Metadata::default())), is_late: false, + nack_options: None, } } } @@ -70,7 +71,11 @@ impl SourceAcker for NatsSource { Ok(()) } - async fn nack(&mut self, _offsets: Vec) -> crate::Result<()> { + async fn nack( + &mut self, + _offsets: Vec, + _options: Option, + ) -> crate::Result<()> { // NATS nack is a no-op (plain NATS doesn't support nack) Ok(()) } @@ -85,7 +90,6 @@ impl super::LagReader for NatsSource { #[cfg(test)] mod tests { - use crate::reader::LagReader; use super::*; diff --git a/rust/numaflow-core/src/source/pulsar.rs b/rust/numaflow-core/src/source/pulsar.rs index fec0efd8ae..4547d1ddef 100644 --- a/rust/numaflow-core/src/source/pulsar.rs +++ b/rust/numaflow-core/src/source/pulsar.rs @@ -5,7 +5,7 @@ use numaflow_pulsar::source::{PulsarMessage, PulsarSource, PulsarSourceConfig}; use crate::config::{get_vertex_name, get_vertex_replica}; use crate::error::Error; -use crate::message::{IntOffset, Message, MessageID, Offset}; +use crate::message::{IntOffset, Message, MessageID, NackOptions, Offset}; use crate::metadata::Metadata; use crate::source; @@ -32,6 +32,7 @@ impl TryFrom for Message { // Set default metadata so that metadata is always present. metadata: Some(Arc::new(Metadata::default())), is_late: false, + nack_options: None, }) } } @@ -102,7 +103,11 @@ impl source::SourceAcker for PulsarSource { self.ack_offsets(pulsar_offsets).await.map_err(Into::into) } - async fn nack(&mut self, offsets: Vec) -> crate::error::Result<()> { + async fn nack( + &mut self, + offsets: Vec, + _options: Option, + ) -> crate::error::Result<()> { let mut pulsar_offsets = Vec::with_capacity(offsets.len()); for offset in offsets { let Offset::Int(int_offset) = offset else { diff --git a/rust/numaflow-core/src/source/sqs.rs b/rust/numaflow-core/src/source/sqs.rs index 337202ac26..90b0316598 100644 --- a/rust/numaflow-core/src/source/sqs.rs +++ b/rust/numaflow-core/src/source/sqs.rs @@ -1,11 +1,10 @@ +use numaflow_sqs::source::{SqsMessage, SqsSource, SqsSourceBuilder, SqsSourceConfig}; use std::sync::Arc; use std::time::Duration; -use numaflow_sqs::source::{SqsMessage, SqsSource, SqsSourceBuilder, SqsSourceConfig}; - use crate::config::{get_vertex_name, get_vertex_replica}; use crate::error::Error; -use crate::message::{Message, MessageID, Offset, StringOffset}; +use crate::message::{Message, MessageID, NackOptions, Offset, StringOffset}; use crate::source; use crate::metadata::{KeyValueGroup, Metadata}; @@ -49,6 +48,7 @@ impl TryFrom for Message { headers: Arc::new(message.system_attributes), metadata, is_late: false, + nack_options: None, }) } } @@ -125,7 +125,11 @@ impl source::SourceAcker for SqsSource { self.ack_offsets(sqs_offsets).await.map_err(Into::into) } - async fn nack(&mut self, _offsets: Vec) -> crate::error::Result<()> { + async fn nack( + &mut self, + _offsets: Vec, + _options: Option, + ) -> crate::error::Result<()> { // SQS doesn't support nack - no-op Ok(()) } diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 22518ad4d9..ec72850a8f 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken; use tonic::transport::Channel; use tonic::{Request, Streaming}; -use crate::message::{Message, MessageID, Offset, StringOffset}; +use crate::message::{Message, MessageID, NackOptions, Offset, StringOffset}; use crate::metadata::Metadata; use crate::reader::LagReader; use crate::shared::grpc::utc_from_timestamp; @@ -179,6 +179,7 @@ impl TryFrom for Message { None => Metadata::default(), })), is_late: false, + nack_options: None, }) } } @@ -350,7 +351,7 @@ impl SourceAcker for UserDefinedSourceAck { /// This method checks if the SDK supports nack functionality using a pre-computed flag. /// For older SDK versions (< 0.11), it logs a warning and returns Ok() for backward compatibility. /// For newer SDK versions (>= 0.11), it calls the actual nack gRPC method. - async fn nack(&mut self, offsets: Vec) -> Result<()> { + async fn nack(&mut self, offsets: Vec, options: Option) -> Result<()> { if !self.supports_nack { warn!( offset_count = offsets.len(), @@ -368,6 +369,7 @@ impl SourceAcker for UserDefinedSourceAck { .nack_fn(NackRequest { request: Some(source::nack_request::Request { offsets: nack_offsets?, + nack_options: options.map(Into::into), }), }) .await @@ -618,7 +620,7 @@ mod tests { // nack the messages let response = src_ack - .nack(messages.iter().map(|m| m.offset.clone()).collect()) + .nack(messages.iter().map(|m| m.offset.clone()).collect(), None) .await; assert!(response.is_ok()); diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index dc22839378..8f27fd18ab 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -516,6 +516,7 @@ mod tests { ..Default::default() })), is_late: false, + nack_options: None, }; // Insert a new message diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index 53f076e521..ec34162888 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -275,8 +275,13 @@ impl Transformer { .iter() .filter(|h| h.message().dropped()) .count(); + // let nacked_message_handles = transformed_handles + // .iter() + // .filter(|h| h.message().nacked()) + // .collect(); let elapsed_time = batch_start_time.elapsed().as_micros() as f64; let write_messages_count = transformed_handles.len() - dropped_messages_count; + // TODO: emit nacked message metrics Self::send_transformer_metrics( dropped_messages_count, elapsed_time, @@ -343,6 +348,10 @@ impl Transformer { #[cfg(test)] mod tests { + use super::*; + use crate::message::StringOffset; + use crate::message::{Message, MessageHandle, MessageID, Offset}; + use crate::shared::grpc::create_rpc_channel; use chrono::Utc; use numaflow::shared::ServerExtras; use numaflow::sourcetransform; @@ -352,11 +361,6 @@ mod tests { use tempfile::TempDir; use tokio::sync::oneshot; - use super::*; - use crate::message::StringOffset; - use crate::message::{Message, MessageHandle, MessageID, Offset}; - use crate::shared::grpc::create_rpc_channel; - struct SimpleTransformer; #[tonic::async_trait] diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 361956596b..62ea84829b 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -69,6 +69,7 @@ impl From> for Message { Some(Arc::new(metadata)) }, is_late: value.1.is_late, + nack_options: value.0.nack_options.map(Into::into), } } } diff --git a/rust/numaflow-pb/src/clients/source.v1.rs b/rust/numaflow-pb/src/clients/source.v1.rs index ba9b79586e..45b86fdb5b 100644 --- a/rust/numaflow-pb/src/clients/source.v1.rs +++ b/rust/numaflow-pb/src/clients/source.v1.rs @@ -221,6 +221,11 @@ pub mod nack_request { /// Required field holding the offset to be nacked #[prost(message, repeated, tag = "1")] pub offsets: ::prost::alloc::vec::Vec, + /// Options passed to the source for nacking + #[prost(message, optional, tag = "2")] + pub nack_options: ::core::option::Option< + crate::common::nack_options::NackOptions, + >, } } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs b/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs index c7fb6243cc..7813744ecc 100644 --- a/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs +++ b/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs @@ -70,6 +70,11 @@ pub mod source_transform_response { /// Metadata is the metadata of the message #[prost(message, optional, tag = "5")] pub metadata: ::core::option::Option, + /// Nack options specified while nacking a message + #[prost(message, optional, tag = "6")] + pub nack_options: ::core::option::Option< + crate::common::nack_options::NackOptions, + >, } } /// * diff --git a/rust/numaflow-pb/src/common.rs b/rust/numaflow-pb/src/common.rs index 0a793b8bfd..ea1931c505 100644 --- a/rust/numaflow-pb/src/common.rs +++ b/rust/numaflow-pb/src/common.rs @@ -1,2 +1,5 @@ #[path = "common/metadata.rs"] pub mod metadata; + +#[path = "common/nack_options.rs"] +pub mod nack_options; diff --git a/rust/numaflow-pb/src/common/nack_options.rs b/rust/numaflow-pb/src/common/nack_options.rs new file mode 100644 index 0000000000..3cd11619d1 --- /dev/null +++ b/rust/numaflow-pb/src/common/nack_options.rs @@ -0,0 +1,14 @@ +// This file is @generated by prost-build. +/// Used to expose options to users for nacking messages +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct NackOptions { + /// reason this message was nacked + #[prost(string, optional, tag = "1")] + pub reason: ::core::option::Option<::prost::alloc::string::String>, + /// number of times this message should be redelivered + #[prost(uint32, optional, tag = "2")] + pub max_deliveries: ::core::option::Option, + /// delay in millisecond + #[prost(uint64, optional, tag = "3")] + pub delay: ::core::option::Option, +} diff --git a/rust/numaflow-pb/src/main.rs b/rust/numaflow-pb/src/main.rs index 550ee3d2f9..4af74fdd03 100644 --- a/rust/numaflow-pb/src/main.rs +++ b/rust/numaflow-pb/src/main.rs @@ -15,7 +15,10 @@ fn main() { fn build_common() { prost_build::Config::new() .out_dir("src/common") - .compile_protos(&["proto/metadata.proto"], &["proto"]) + .compile_protos( + &["proto/metadata.proto", "proto/common/nack_options.proto"], + &["proto"], + ) .expect("failed to compile common protos"); } @@ -25,6 +28,7 @@ fn build_client() { .build_server(false) .out_dir("src/clients") .extern_path(".metadata", "crate::common::metadata") + .extern_path(".nack_options", "crate::common::nack_options") .compile_protos( &[ "proto/source/v1/source.proto",