Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/connectors/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ pub enum RuntimeError {
FailedToSerializeRawMessages,
#[error("Failed to serialize headers")]
FailedToSerializeHeaders,
#[error(
"Sink connector with ID: {plugin_id} failed to consume {processed_count} processed messages from stream: {stream}, topic: {topic}, partition: {partition_id}, current offset: {current_offset}, schema: {schema}, status: {status}"
)]
SinkConsumeFailed {
plugin_id: u32,
status: i32,
stream: String,
topic: String,
partition_id: u32,
current_offset: u64,
schema: String,
processed_count: usize,
},
#[error("Connector SDK error")]
ConnectorSdkError(#[from] iggy_connector_sdk::Error),
#[error("Iggy client error")]
Expand Down Expand Up @@ -75,6 +88,7 @@ impl RuntimeError {
RuntimeError::MissingIggyCredentials => "invalid_configuration",
RuntimeError::InvalidConfiguration(_) => "invalid_configuration",
RuntimeError::HttpRequestFailed(_) => "http_request_failed",
RuntimeError::SinkConsumeFailed { .. } => "sink_consume_failed",
RuntimeError::TokenFileNotFound(_) => "invalid_configuration",
RuntimeError::TokenFileReadError(_, _) => "invalid_configuration",
RuntimeError::TokenFileEmpty(_) => "invalid_configuration",
Expand Down
96 changes: 95 additions & 1 deletion core/connectors/runtime/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ async fn process_messages(
RuntimeError::FailedToSerializeRawMessages
})?;

(consume)(
let status = (consume)(
plugin_id,
topic_meta.as_ptr(),
topic_meta.len(),
Expand All @@ -591,6 +591,100 @@ async fn process_messages(
messages.as_ptr(),
messages.len(),
);
if status != 0 {
return Err(RuntimeError::SinkConsumeFailed {
plugin_id,
status,
stream: topic_metadata.stream.clone(),
topic: topic_metadata.topic.clone(),
partition_id: messages_metadata.partition_id,
current_offset: messages_metadata.current_offset,
schema: messages_metadata.schema.to_string(),
processed_count,
});
}

Ok(processed_count)
}

#[cfg(test)]
mod tests {
use super::*;
use iggy::prelude::IggyMessageHeader;
use iggy_connector_sdk::{Error, Payload, Schema};

struct TestDecoder;

impl StreamDecoder for TestDecoder {
fn schema(&self) -> Schema {
Schema::Raw
}

fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
Ok(Payload::Raw(payload))
}
}

extern "C" fn failing_consume(
_plugin_id: u32,
_topic_meta_ptr: *const u8,
_topic_meta_len: usize,
_messages_meta_ptr: *const u8,
_messages_meta_len: usize,
_messages_ptr: *const u8,
_messages_len: usize,
) -> i32 {
1
}

#[tokio::test]
async fn process_messages_returns_error_when_consume_callback_fails() {
let plugin_id = 42;
let consume: ConsumeCallback = failing_consume;
let decoder: Arc<dyn StreamDecoder> = Arc::new(TestDecoder);
let result = process_messages(
plugin_id,
MessagesMetadata {
partition_id: 1,
current_offset: 0,
schema: Schema::Raw,
},
&TopicMetadata {
stream: "stream".to_string(),
topic: "topic".to_string(),
},
vec![IggyMessage {
header: IggyMessageHeader {
checksum: 1,
id: 2,
offset: 0,
timestamp: 3,
origin_timestamp: 4,
user_headers_length: 0,
payload_length: 7,
reserved: 0,
},
payload: "payload".into(),
user_headers: None,
}],
&consume,
&Vec::new(),
&decoder,
)
.await;

assert!(matches!(
result,
Err(RuntimeError::SinkConsumeFailed {
plugin_id: 42,
status: 1,
stream,
topic,
partition_id: 1,
current_offset: 0,
schema,
processed_count: 1
}) if stream == "stream" && topic == "topic" && schema == "raw"
));
}
}
6 changes: 3 additions & 3 deletions core/connectors/sinks/http_sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ cargo test -p integration --test connectors -- http_sink

## Delivery Semantics

All retry logic lives inside `consume()`. The connector runtime invokes `consume()` via an FFI callback that returns an `i32` status code. The runtime does not inspect this return value (see `process_messages()` in `runtime/src/sink.rs`), so errors logged by the sink are not propagated to the runtime's retry or alerting mechanisms. Additionally, consumer group offsets are committed before processing ([runtime issue #1](#known-limitations)). This means:
All retry logic lives inside `consume()`. The connector runtime invokes `consume()` via an FFI callback that returns an `i32` status code. A non-zero return value is treated as a processing error by `process_messages()` in `runtime/src/sink.rs`, stopping the sink task and surfacing the failure to the runtime. Additionally, consumer group offsets are committed before processing ([runtime issue #1](#known-limitations)). This means:

- Failed messages are **not retried by the runtime** — only by the sink's internal retry loop
- Messages are committed **before delivery** — a crash after commit but before delivery loses messages
Expand All @@ -795,9 +795,9 @@ The effective delivery guarantee is **at-most-once** at the runtime level. The s

## Known Limitations

1. **Runtime ignores `consume()` status**: The connector runtime invokes `consume()` via an FFI callback returning `i32`. The `process_messages()` function in `runtime/src/sink.rs` does not inspect the return value. Errors are logged internally by the sink but do not trigger runtime-level retry or alerting. ([#2927](https://github.com/apache/iggy/issues/2927))
1. **No runtime retry after `consume()` failure**: The connector runtime now treats a non-zero `consume()` FFI status as a processing error, but the failed batch is not retried by the runtime. ([#2927](https://github.com/apache/iggy/issues/2927))

2. **Offsets committed before processing**: The `PollingMessages` auto-commit strategy commits consumer group offsets before `consume()` is called. Combined with limitation 1, at-least-once delivery is not achievable. ([#2928](https://github.com/apache/iggy/issues/2928))
2. **Offsets committed before processing**: The `PollingMessages` auto-commit strategy commits consumer group offsets before `consume()` is called. Because a failed batch is already committed, at-least-once delivery is not achievable. ([#2928](https://github.com/apache/iggy/issues/2928))

3. **`Retry-After` header not used for backoff**: The `reqwest-middleware` retry layer uses computed exponential backoff. `Retry-After` headers are logged as warnings but do not influence retry timing.

Expand Down
6 changes: 3 additions & 3 deletions core/connectors/sinks/http_sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ impl Sink for HttpSink {
///
/// **Runtime note**: The FFI boundary in `sdk/src/sink.rs` maps `consume()`'s `Result` to
/// `i32` (0=ok, 1=err), but the runtime's `process_messages()` in `runtime/src/sink.rs`
/// discards that return code. All retry logic lives inside this method — returning `Err`
/// does not trigger a runtime-level retry.
/// treats a non-zero return code as a processing error. All retry logic lives inside this
/// method — returning `Err` stops the sink task but does not retry the already-polled batch.
async fn consume(
&self,
topic_metadata: &TopicMetadata,
Expand Down Expand Up @@ -1229,7 +1229,7 @@ impl Sink for HttpSink {

if let Err(ref e) = result {
error!(
"HTTP sink ID: {} — consume() returning error (runtime ignores FFI status code): {}",
"HTTP sink ID: {} — consume() returning error (runtime will receive non-zero FFI status): {}",
self.id, e
);
}
Expand Down
6 changes: 3 additions & 3 deletions core/integration/tests/connectors/http/http_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@
//!
//! ## Known Limitations
//!
//! 1. **FFI return value ignored**: The runtime's `process_messages()` discards `consume()`'s
//! `i32` return code. Errors are logged by the sink but invisible to the runtime.
//! 1. **No runtime retry after `consume()` failure**: The runtime treats a non-zero `consume()`
//! FFI status as a processing error, but it does not retry the already-polled batch.
//! See [#2927](https://github.com/apache/iggy/issues/2927).
//! 2. **Offsets committed before processing**: `PollingMessages` auto-commit strategy commits
//! offsets before `consume()`. Combined with (1), effective guarantee is at-most-once.
//! offsets before `consume()`, so effective guarantee is at-most-once.
//! See [#2928](https://github.com/apache/iggy/issues/2928).
//!
//! ## Test History
Expand Down
Loading