Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ jobs:
blackhole sink
clickhouse sink
console sink
databricks_zerobus sink
Comment thread
pront marked this conversation as resolved.
datadog_archives sink
datadog_common sink
datadog_events sink
Expand Down
29 changes: 3 additions & 26 deletions src/sinks/databricks_zerobus/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,7 @@ use crate::sinks::{
util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
};

use vector_lib::codecs::encoding::{
BatchEncoder, BatchSerializerConfig, ProtoBatchSerializerConfig,
};

use super::{
error::ZerobusSinkError,
service::{StreamMode, ZerobusService},
sink::ZerobusSink,
};
use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};

/// Authentication configuration for Databricks.
#[configurable_component]
Expand Down Expand Up @@ -163,27 +155,12 @@ impl SinkConfig for ZerobusSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
self.validate()?;

let descriptor = ZerobusService::resolve_descriptor(self, cx.proxy()).await?;

// The zerobus sink always encodes in proto_batch form — the stream
// descriptor is the one we just resolved from Unity Catalog.
let descriptor_proto = std::sync::Arc::new(descriptor.descriptor_proto().clone());
let stream_mode = StreamMode::Proto { descriptor_proto };

let proto_config = ProtoBatchSerializerConfig {
descriptor: Some(descriptor),
};
let batch_serializer = BatchSerializerConfig::ProtoBatch(proto_config)
.build_batch_serializer()
.map_err(|e| format!("Failed to build batch serializer: {}", e))?;
let encoder = BatchEncoder::new(batch_serializer);

let service = ZerobusService::new(self.clone(), stream_mode, cx.proxy()).await?;
let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
let healthcheck_service = service.clone();

let request_limits = self.request.into_settings();

let sink = ZerobusSink::new(service, request_limits, self.batch, encoder)?;
let sink = ZerobusSink::new(service, request_limits, self.batch)?;

let healthcheck = async move {
healthcheck_service
Expand Down
66 changes: 50 additions & 16 deletions src/sinks/databricks_zerobus/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,35 @@ pub enum ZerobusSinkError {
/// will create a fresh stream via `get_or_create_stream`.
#[snafu(display("Zerobus stream was closed concurrently"))]
StreamClosed,

/// Resolving the table schema from Unity Catalog failed. `retryable`
/// distinguishes transient failures (network, 5xx, 408, 429) from
/// permanent ones (404, 401, 403, ...).
#[snafu(display("Schema resolution failed: {}", message))]
SchemaError { message: String, retryable: bool },
}

impl ZerobusSinkError {
/// Whether this error should be retried.
pub fn is_retryable(&self) -> bool {
match self {
Self::ZerobusError { source }
| Self::StreamInitError { source }
| Self::IngestionError { source } => source.is_retryable(),
Self::StreamClosed => true,
Self::SchemaError { retryable, .. } => *retryable,
Self::ConfigError { .. } | Self::EncodingError { .. } | Self::MissingAckOffset => false,
}
}

/// Event status to apply to a batch's finalizers when this error occurs.
pub fn event_status(&self) -> EventStatus {
if self.is_retryable() {
EventStatus::Errored
} else {
EventStatus::Rejected
}
}
}

impl From<ZerobusError> for ZerobusSinkError {
Expand All @@ -46,24 +75,9 @@ impl From<ZerobusError> for ZerobusSinkError {
}
}

/// Convert Zerobus errors to Vector event status.
impl From<ZerobusSinkError> for EventStatus {
fn from(error: ZerobusSinkError) -> Self {
match error {
ZerobusSinkError::ConfigError { .. }
| ZerobusSinkError::EncodingError { .. }
| ZerobusSinkError::MissingAckOffset => EventStatus::Rejected,
ZerobusSinkError::StreamClosed => EventStatus::Errored,
ZerobusSinkError::ZerobusError { source }
| ZerobusSinkError::StreamInitError { source }
| ZerobusSinkError::IngestionError { source } => {
if source.is_retryable() {
EventStatus::Errored
} else {
EventStatus::Rejected
}
}
}
error.event_status()
}
}

Expand Down Expand Up @@ -170,4 +184,24 @@ mod tests {
};
assert!(!logic.is_retriable_error(&error));
}

#[test]
fn retryable_schema_error_maps_to_errored() {
let error = ZerobusSinkError::SchemaError {
message: "UC 503".to_string(),
retryable: true,
};
assert!(ZerobusRetryLogic.is_retriable_error(&error));
assert_eq!(EventStatus::from(error), EventStatus::Errored);
}

#[test]
fn non_retryable_schema_error_maps_to_rejected() {
let error = ZerobusSinkError::SchemaError {
message: "UC 404".to_string(),
retryable: false,
};
assert!(!ZerobusRetryLogic.is_retriable_error(&error));
assert_eq!(EventStatus::from(error), EventStatus::Rejected);
}
}
Loading
Loading