diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 6d801c49d5de9..b2ef241ec0c1b 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -222,6 +222,7 @@ jobs: blackhole sink clickhouse sink console sink + databricks_zerobus sink datadog_archives sink datadog_common sink datadog_events sink diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs index 275d2c16a251e..1df2f62a6f1c3 100644 --- a/src/sinks/databricks_zerobus/config.rs +++ b/src/sinks/databricks_zerobus/config.rs @@ -9,15 +9,7 @@ use crate::sinks::{ util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings}, }; -use vector_lib::codecs::encoding::{ - BatchEncoder, BatchSerializerConfig, ProtoBatchSerializerConfig, -}; - -use super::{ - error::ZerobusSinkError, - service::{StreamMode, ZerobusService}, - sink::ZerobusSink, -}; +use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink}; /// Authentication configuration for Databricks. #[configurable_component] @@ -163,27 +155,12 @@ impl SinkConfig for ZerobusSinkConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { self.validate()?; - let descriptor = ZerobusService::resolve_descriptor(self, cx.proxy()).await?; - - // The zerobus sink always encodes in proto_batch form — the stream - // descriptor is the one we just resolved from Unity Catalog. - let descriptor_proto = std::sync::Arc::new(descriptor.descriptor_proto().clone()); - let stream_mode = StreamMode::Proto { descriptor_proto }; - - let proto_config = ProtoBatchSerializerConfig { - descriptor: Some(descriptor), - }; - let batch_serializer = BatchSerializerConfig::ProtoBatch(proto_config) - .build_batch_serializer() - .map_err(|e| format!("Failed to build batch serializer: {}", e))?; - let encoder = BatchEncoder::new(batch_serializer); - - let service = ZerobusService::new(self.clone(), stream_mode, cx.proxy()).await?; + let service = ZerobusService::new(self.clone(), cx.proxy()).await?; let healthcheck_service = service.clone(); let request_limits = self.request.into_settings(); - let sink = ZerobusSink::new(service, request_limits, self.batch, encoder)?; + let sink = ZerobusSink::new(service, request_limits, self.batch)?; let healthcheck = async move { healthcheck_service diff --git a/src/sinks/databricks_zerobus/error.rs b/src/sinks/databricks_zerobus/error.rs index e4dedddea00c8..1e9a630415724 100644 --- a/src/sinks/databricks_zerobus/error.rs +++ b/src/sinks/databricks_zerobus/error.rs @@ -38,6 +38,35 @@ pub enum ZerobusSinkError { /// will create a fresh stream via `get_or_create_stream`. #[snafu(display("Zerobus stream was closed concurrently"))] StreamClosed, + + /// Resolving the table schema from Unity Catalog failed. `retryable` + /// distinguishes transient failures (network, 5xx, 408, 429) from + /// permanent ones (404, 401, 403, ...). + #[snafu(display("Schema resolution failed: {}", message))] + SchemaError { message: String, retryable: bool }, +} + +impl ZerobusSinkError { + /// Whether this error should be retried. + pub fn is_retryable(&self) -> bool { + match self { + Self::ZerobusError { source } + | Self::StreamInitError { source } + | Self::IngestionError { source } => source.is_retryable(), + Self::StreamClosed => true, + Self::SchemaError { retryable, .. } => *retryable, + Self::ConfigError { .. } | Self::EncodingError { .. } | Self::MissingAckOffset => false, + } + } + + /// Event status to apply to a batch's finalizers when this error occurs. + pub fn event_status(&self) -> EventStatus { + if self.is_retryable() { + EventStatus::Errored + } else { + EventStatus::Rejected + } + } } impl From for ZerobusSinkError { @@ -46,24 +75,9 @@ impl From for ZerobusSinkError { } } -/// Convert Zerobus errors to Vector event status. impl From for EventStatus { fn from(error: ZerobusSinkError) -> Self { - match error { - ZerobusSinkError::ConfigError { .. } - | ZerobusSinkError::EncodingError { .. } - | ZerobusSinkError::MissingAckOffset => EventStatus::Rejected, - ZerobusSinkError::StreamClosed => EventStatus::Errored, - ZerobusSinkError::ZerobusError { source } - | ZerobusSinkError::StreamInitError { source } - | ZerobusSinkError::IngestionError { source } => { - if source.is_retryable() { - EventStatus::Errored - } else { - EventStatus::Rejected - } - } - } + error.event_status() } } @@ -170,4 +184,24 @@ mod tests { }; assert!(!logic.is_retriable_error(&error)); } + + #[test] + fn retryable_schema_error_maps_to_errored() { + let error = ZerobusSinkError::SchemaError { + message: "UC 503".to_string(), + retryable: true, + }; + assert!(ZerobusRetryLogic.is_retriable_error(&error)); + assert_eq!(EventStatus::from(error), EventStatus::Errored); + } + + #[test] + fn non_retryable_schema_error_maps_to_rejected() { + let error = ZerobusSinkError::SchemaError { + message: "UC 404".to_string(), + retryable: false, + }; + assert!(!ZerobusRetryLogic.is_retriable_error(&error)); + assert_eq!(EventStatus::from(error), EventStatus::Rejected); + } } diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs index 0da433699d9c2..bffe4d785a45b 100644 --- a/src/sinks/databricks_zerobus/service.rs +++ b/src/sinks/databricks_zerobus/service.rs @@ -1,13 +1,19 @@ //! Zerobus service wrapper for Vector sink integration. use crate::config::ProxyConfig; +use crate::event::Event; +use crate::http::HttpClient; use crate::sinks::util::retries::RetryLogic; +use crate::tls::TlsSettings; use databricks_zerobus_ingest_sdk::{ConnectorFactory, ProxyConnector, ZerobusSdk, ZerobusStream}; use futures::future::BoxFuture; use std::sync::Arc; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, OnceCell, RwLock}; use tower::Service; use tracing::warn; +use vector_lib::codecs::encoding::{ + BatchEncoder, BatchOutput, BatchSerializerConfig, ProtoBatchSerializerConfig, +}; use vector_lib::finalization::{EventFinalizers, Finalizable}; use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_lib::stream::DriverResponse; @@ -50,19 +56,16 @@ fn build_connector_factory(proxy: &ProxyConfig) -> Result>), -} - /// Request type for the Zerobus service. -#[derive(Clone, Debug)] +/// +/// Carries the *unencoded* batch — encoding happens inside `Service::call` so +/// that schema-fetch failures flow through the Tower retry layer. Events live +/// behind an `Arc` because Tower's retry policy clones the request before +/// every call (not just on retry), and a deep clone of `Vec` per call +/// would be wasteful. +#[derive(Clone)] pub struct ZerobusRequest { - pub payload: ZerobusPayload, + pub events: Arc>, pub metadata: RequestMetadata, pub finalizers: EventFinalizers, } @@ -99,18 +102,6 @@ impl MetaDescriptive for ZerobusRequest { } } -/// Determines what kind of stream the service creates and how payloads are ingested. -/// -/// The sink only supports proto streams today; this is kept as an enum to -/// leave room for future stream modes without reshaping all the call sites. -#[derive(Clone)] -pub enum StreamMode { - /// Proto stream using `ZerobusStream::ingest_records_offset`. - Proto { - descriptor_proto: Arc, - }, -} - /// The active stream. /// /// The SDK's `ZerobusStream::close()` requires `&mut self`, but ingests need @@ -257,18 +248,24 @@ impl MockStream { } } +/// Schema and encoding state derived from the Unity Catalog table. +pub(super) struct ResolvedSchema { + encoder: BatchEncoder, + descriptor_proto: Arc, +} + /// Service for handling Zerobus requests. pub struct ZerobusService { sdk: Arc, config: Arc, + http_client: HttpClient, stream: Arc>>>, - stream_mode: StreamMode, + schema: Arc>, } impl ZerobusService { pub async fn new( config: ZerobusSinkConfig, - stream_mode: StreamMode, proxy: &ProxyConfig, ) -> Result { let mut builder = ZerobusSdk::builder() @@ -279,18 +276,25 @@ impl ZerobusService { message: format!("Failed to create Zerobus SDK: {}", e), })?; + let http_client = HttpClient::new(TlsSettings::default(), proxy).map_err(|e| { + ZerobusSinkError::ConfigError { + message: format!("Failed to create HTTP client: {}", e), + } + })?; + Ok(Self { sdk: Arc::new(sdk), config: Arc::new(config), + http_client, stream: Arc::new(Mutex::new(None)), - stream_mode, + schema: Arc::new(OnceCell::new()), }) } - /// Resolve the protobuf message descriptor from the schema configuration. - pub async fn resolve_descriptor( + /// Resolve the protobuf message descriptor from Unity Catalog. + async fn resolve_descriptor( config: &ZerobusSinkConfig, - proxy: &crate::config::ProxyConfig, + http_client: &HttpClient, ) -> Result { let (client_id, client_secret) = config.auth.credentials(); @@ -299,51 +303,93 @@ impl ZerobusService { &config.table_name, client_id, client_secret, - proxy, + http_client, ) .await?; unity_catalog_schema::generate_descriptor_from_schema(&table_schema) } + /// Resolve the schema on first use; cache the result. + pub(super) async fn ensure_schema(&self) -> Result<&ResolvedSchema, ZerobusSinkError> { + self.schema + .get_or_try_init(|| async { + let descriptor = Self::resolve_descriptor(&self.config, &self.http_client).await?; + let descriptor_proto = Arc::new(descriptor.descriptor_proto().clone()); + + let batch_serializer = + BatchSerializerConfig::ProtoBatch(ProtoBatchSerializerConfig { + descriptor: Some(descriptor), + }) + .build_batch_serializer() + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to build batch serializer: {}", e), + })?; + + Ok(ResolvedSchema { + encoder: BatchEncoder::new(batch_serializer), + descriptor_proto, + }) + }) + .await + } + + pub(super) fn encode_records( + schema: &ResolvedSchema, + events: &[Event], + ) -> Result>, ZerobusSinkError> { + match schema + .encoder + .encode_batch(events) + .map_err(|e| ZerobusSinkError::EncodingError { + message: format!("Failed to encode batch: {}", e), + })? { + BatchOutput::Records(records) => Ok(records), + #[cfg(feature = "codecs-arrow")] + BatchOutput::Arrow(_) => Err(ZerobusSinkError::EncodingError { + message: "The Databricks Zerobus sink only supports proto-batch output.".into(), + }), + } + } + /// Ensure we have an active stream, creating one if necessary. /// - /// Also used as the healthcheck: eagerly creating a stream verifies - /// OAuth credentials, endpoint connectivity, and table validity. + /// Also used as the healthcheck: resolving the schema verifies the table + /// and credentials against Unity Catalog, and creating the stream verifies + /// connectivity to the Zerobus endpoint. pub async fn ensure_stream(&self) -> Result<(), ZerobusSinkError> { - self.get_or_create_stream().await.map(|_| ()) + let schema = self.ensure_schema().await?; + self.get_or_create_stream(schema).await.map(|_| ()) } /// Return an `Arc` handle to the active stream, creating one if needed. /// /// The lock is held only while checking/creating the stream; callers can /// then use the returned `Arc` without holding the lock. - async fn get_or_create_stream(&self) -> Result, ZerobusSinkError> { + async fn get_or_create_stream( + &self, + schema: &ResolvedSchema, + ) -> Result, ZerobusSinkError> { let mut stream_guard = self.stream.lock().await; if stream_guard.is_none() { let (client_id, client_secret) = self.config.auth.credentials(); let (client_id, client_secret) = (client_id.to_string(), client_secret.to_string()); - let active_stream = match &self.stream_mode { - StreamMode::Proto { descriptor_proto } => { - let stream_options = &self.config.stream_options; - let stream = self - .sdk - .stream_builder() - .table(self.config.table_name.clone()) - .oauth(client_id, client_secret) - .compiled_proto((**descriptor_proto).clone()) - .server_lack_of_ack_timeout_ms(stream_options.server_lack_of_ack_timeout_ms) - .flush_timeout_ms(stream_options.flush_timeout_ms) - .build() - .await - .map_err(|e| ZerobusSinkError::StreamInitError { source: e })?; - ActiveStream::proto(stream) - } - }; + let stream_options = &self.config.stream_options; + let stream = self + .sdk + .stream_builder() + .table(self.config.table_name.clone()) + .oauth(client_id, client_secret) + .compiled_proto((*schema.descriptor_proto).clone()) + .server_lack_of_ack_timeout_ms(stream_options.server_lack_of_ack_timeout_ms) + .flush_timeout_ms(stream_options.flush_timeout_ms) + .build() + .await + .map_err(|e| ZerobusSinkError::StreamInitError { source: e })?; - *stream_guard = Some(Arc::new(active_stream)); + *stream_guard = Some(Arc::new(ActiveStream::proto(stream))); } Ok(Arc::clone(stream_guard.as_ref().unwrap())) @@ -364,25 +410,20 @@ impl ZerobusService { } } - /// Ingest a payload (proto records or Arrow batch). - /// - /// Obtains an `Arc` handle to the stream (creating one if needed) and - /// then releases the lock before calling into the SDK so that concurrent - /// ingests are not serialized. + /// Send encoded records to an already-resolved stream. /// /// On retryable errors the active stream is removed from the slot so that /// the next attempt (driven by Tower retry) creates a fresh one. - pub async fn ingest( + async fn ingest( &self, - payload: ZerobusPayload, + stream: Arc, + records: Vec>, events_byte_size: GroupedCountByteSize, ) -> Result { - let stream = self.get_or_create_stream().await?; - // Slot lock is not held here — concurrent ingests acquire read guards // on the inner `RwLock` and run truly in parallel. - let result = match (payload, stream.as_ref()) { - (ZerobusPayload::Records(records), ActiveStream::Proto(lock)) => { + let result = match stream.as_ref() { + ActiveStream::Proto(lock) => { let guard = lock.read().await; let Some(s) = guard.as_ref() else { return Err(ZerobusSinkError::StreamClosed); @@ -396,7 +437,7 @@ impl ZerobusService { } } #[cfg(test)] - (ZerobusPayload::Records(_), ActiveStream::Mock(mock)) => mock.try_ingest().await, + ActiveStream::Mock(mock) => mock.try_ingest().await, }; match result { @@ -441,7 +482,12 @@ impl Service for ZerobusService { let events_byte_size = std::mem::take(request.metadata_mut()).into_events_estimated_json_encoded_byte_size(); - Box::pin(async move { service.ingest(request.payload, events_byte_size).await }) + Box::pin(async move { + let schema = service.ensure_schema().await?; + let records = Self::encode_records(schema, &request.events)?; + let stream = service.get_or_create_stream(schema).await?; + service.ingest(stream, records, events_byte_size).await + }) } } @@ -450,8 +496,9 @@ impl Clone for ZerobusService { Self { sdk: Arc::clone(&self.sdk), config: Arc::clone(&self.config), + http_client: self.http_client.clone(), stream: Arc::clone(&self.stream), - stream_mode: self.stream_mode.clone(), + schema: Arc::clone(&self.schema), } } } @@ -482,13 +529,17 @@ impl ZerobusService { message: format!("Failed to create Zerobus SDK: {}", e), })?; + let http_client = HttpClient::new(TlsSettings::default(), &ProxyConfig::default()) + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to create HTTP client: {}", e), + })?; + Ok(Self { sdk: Arc::new(sdk), config: Arc::new(config), + http_client, stream: Arc::new(Mutex::new(Some(Arc::new(ActiveStream::Mock(mock))))), - stream_mode: StreamMode::Proto { - descriptor_proto: Arc::new(Default::default()), - }, + schema: Arc::new(OnceCell::new()), }) } @@ -504,15 +555,7 @@ impl RetryLogic for ZerobusRetryLogic { type Response = ZerobusResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { - match error { - ZerobusSinkError::ZerobusError { source } - | ZerobusSinkError::StreamInitError { source } - | ZerobusSinkError::IngestionError { source } => source.is_retryable(), - ZerobusSinkError::StreamClosed => true, - ZerobusSinkError::ConfigError { .. } - | ZerobusSinkError::EncodingError { .. } - | ZerobusSinkError::MissingAckOffset => false, - } + error.is_retryable() } } @@ -541,8 +584,12 @@ mod tests { } } - fn dummy_payload() -> ZerobusPayload { - ZerobusPayload::Records(vec![vec![1, 2, 3]]) + fn dummy_records() -> Vec> { + vec![vec![1, 2, 3]] + } + + async fn current_stream(service: &ZerobusService) -> Arc { + Arc::clone(service.stream.lock().await.as_ref().unwrap()) } #[tokio::test] @@ -551,8 +598,13 @@ mod tests { .await .unwrap(); + let stream = current_stream(&service).await; let result = service - .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged(), + ) .await; assert!(result.is_ok()); @@ -570,8 +622,13 @@ mod tests { assert!(service.has_active_stream().await); + let stream = current_stream(&service).await; let err = service - .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged(), + ) .await .unwrap_err(); @@ -590,8 +647,13 @@ mod tests { assert!(service.has_active_stream().await); + let stream = current_stream(&service).await; let err = service - .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged(), + ) .await .unwrap_err(); @@ -610,9 +672,14 @@ mod tests { .unwrap(); // First ingest succeeds. + let stream = current_stream(&service).await; assert!( service - .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged() + ) .await .is_ok() ); @@ -629,8 +696,13 @@ mod tests { } // Second ingest fails and clears the stream. + let stream = current_stream(&service).await; let err = service - .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged(), + ) .await .unwrap_err(); assert!(ZerobusRetryLogic.is_retriable_error(&err)); @@ -641,9 +713,14 @@ mod tests { *service.stream.lock().await = Some(Arc::new(ActiveStream::Mock(MockStream::succeeding()))); // Third ingest succeeds on the new stream. + let stream = current_stream(&service).await; assert!( service - .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged() + ) .await .is_ok() ); @@ -686,17 +763,27 @@ mod tests { .await .unwrap(); - // Spawn two concurrent ingests. Each will obtain its own `Arc` clone - // from `get_or_create_stream`, then block in the gate. + // Spawn two concurrent ingests. Each clones the same stream `Arc`, + // then blocks in the gate. let s1 = service.clone(); let t1 = tokio::spawn(async move { - s1.ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) - .await + let stream = current_stream(&s1).await; + s1.ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged(), + ) + .await }); let s2 = service.clone(); let t2 = tokio::spawn(async move { - s2.ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) - .await + let stream = current_stream(&s2).await; + s2.ingest( + stream, + dummy_records(), + GroupedCountByteSize::new_untagged(), + ) + .await }); // Wait until both ingests are inside the gate (both `Arc`s alive). diff --git a/src/sinks/databricks_zerobus/sink.rs b/src/sinks/databricks_zerobus/sink.rs index 30b473c2c041b..752f1f9c62c34 100644 --- a/src/sinks/databricks_zerobus/sink.rs +++ b/src/sinks/databricks_zerobus/sink.rs @@ -3,25 +3,22 @@ use std::num::NonZeroUsize; use std::sync::Arc; +use futures::StreamExt; use futures::stream::BoxStream; -use vector_lib::codecs::encoding::{BatchEncoder, BatchOutput}; -use vector_lib::event::EventStatus; use vector_lib::finalization::Finalizable; use crate::sinks::prelude::*; use crate::sinks::util::metadata::RequestMetadataBuilder; -use crate::sinks::util::request_builder::default_request_builder_concurrency_limit; use crate::sinks::util::{RealtimeSizeBasedDefaultBatchSettings, TowerRequestSettings}; -use super::service::{ZerobusPayload, ZerobusRequest, ZerobusRetryLogic, ZerobusService}; +use super::service::{ZerobusRequest, ZerobusRetryLogic, ZerobusService}; /// The main Zerobus sink. pub struct ZerobusSink { service: ZerobusService, request_limits: TowerRequestSettings, batch_settings: BatcherSettings, - encoder: BatchEncoder, } impl ZerobusSink { @@ -29,7 +26,6 @@ impl ZerobusSink { service: ZerobusService, request_limits: TowerRequestSettings, batch_config: BatchConfig, - encoder: BatchEncoder, ) -> Result { let batch_settings = batch_config.into_batcher_settings()?; @@ -37,53 +33,10 @@ impl ZerobusSink { service, request_limits, batch_settings, - encoder, - }) - } - - fn encode_batch( - encoder: &BatchEncoder, - mut events: Vec, - ) -> Result { - let finalizers = events.take_finalizers(); - let metadata_builder = RequestMetadataBuilder::from_events(&events); - - let batch_output = match encoder.encode_batch(&events) { - Ok(output) => output, - Err(e) => { - finalizers.update_status(EventStatus::Rejected); - return Err(format!("Failed to encode batch: {}", e)); - } - }; - - let (payload, byte_size) = match batch_output { - BatchOutput::Records(records) => { - let size = records.iter().map(|r| r.len()).sum::(); - (ZerobusPayload::Records(records), size) - } - // The sink only builds a proto-batch encoder, so this arm is - // only reachable if the shared codec adds a new `BatchOutput` - // variant without a corresponding zerobus payload type. - #[cfg(feature = "codecs-arrow")] - BatchOutput::Arrow(_) => { - finalizers.update_status(EventStatus::Rejected); - return Err("The Databricks Zerobus sink only supports proto-batch output.".into()); - } - }; - - let request_size = NonZeroUsize::new(byte_size).unwrap_or(NonZeroUsize::MIN); - let metadata = metadata_builder.with_request_size(request_size); - - Ok(ZerobusRequest { - payload, - metadata, - finalizers, }) } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let encoder = Arc::new(self.encoder.clone()); - let result = { let tower_service = ServiceBuilder::new() .settings(self.request_limits, ZerobusRetryLogic) @@ -91,17 +44,18 @@ impl ZerobusSink { input .batched(self.batch_settings.as_byte_size_config()) - .concurrent_map(default_request_builder_concurrency_limit(), move |events| { - let encoder = Arc::clone(&encoder); - Box::pin(async move { Self::encode_batch(&encoder, events) }) - }) - .filter_map(|result| async move { - match result { - Err(error) => { - emit!(SinkRequestBuildError { error }); - None - } - Ok(req) => Some(req), + .map(|mut events| { + let finalizers = events.take_finalizers(); + // The encoded request size isn't known until `Service::call` + // encodes the batch, and nothing in the zerobus path reads + // `RequestMetadata::request_encoded_size`, so a placeholder + // is fine here. + let metadata = RequestMetadataBuilder::from_events(&events) + .with_request_size(NonZeroUsize::MIN); + ZerobusRequest { + events: Arc::new(events), + metadata, + finalizers, } }) .into_driver(tower_service) diff --git a/src/sinks/databricks_zerobus/unity_catalog_schema.rs b/src/sinks/databricks_zerobus/unity_catalog_schema.rs index 0343848f1cefe..fc9b01fbaf9ac 100644 --- a/src/sinks/databricks_zerobus/unity_catalog_schema.rs +++ b/src/sinks/databricks_zerobus/unity_catalog_schema.rs @@ -6,7 +6,7 @@ use bytes::Buf; use databricks_zerobus_ingest_sdk::schema::descriptor_from_uc_schema; -use http::{Request, Uri}; +use http::{Request, StatusCode, Uri}; use http_body::Body as HttpBody; use hyper::Body; use percent_encoding::{NON_ALPHANUMERIC, percent_encode}; @@ -14,9 +14,20 @@ use prost_reflect::prost_types; use serde::Deserialize; use super::error::ZerobusSinkError; -use crate::config::ProxyConfig; use crate::http::HttpClient; -use crate::tls::TlsSettings; + +/// Whether a Unity Catalog HTTP response status should be retried. +/// +/// 5xx, 408 (Request Timeout), and 429 (Too Many Requests) are transient; +/// other 4xx statuses (404, 401, 403, ...) indicate permanent configuration +/// problems that won't fix themselves on retry. The canonical list of +/// retryable statuses for HTTP-based sinks lives at +/// [`crate::sinks::util::http::RetryStrategy::Default`]. +fn status_is_retryable(status: StatusCode) -> bool { + status.is_server_error() + || status == StatusCode::REQUEST_TIMEOUT + || status == StatusCode::TOO_MANY_REQUESTS +} // Alias the SDK types under the names the rest of the sink already uses. #[cfg(test)] @@ -35,17 +46,10 @@ pub async fn fetch_table_schema( table_name: &str, client_id: &str, client_secret: &str, - proxy: &ProxyConfig, + http_client: &HttpClient, ) -> Result { - let http_client = HttpClient::new(TlsSettings::default(), proxy).map_err(|e| { - ZerobusSinkError::ConfigError { - message: format!("Failed to create HTTP client: {}", e), - } - })?; - - // First, get OAuth token let token = get_oauth_token( - &http_client, + http_client, unity_catalog_endpoint, client_id, client_secret, @@ -82,8 +86,9 @@ pub async fn fetch_table_schema( let response = http_client .send(request) .await - .map_err(|e| ZerobusSinkError::ConfigError { + .map_err(|e| ZerobusSinkError::SchemaError { message: format!("Failed to fetch table schema: {}", e), + retryable: true, })?; let status = response.status(); @@ -95,11 +100,12 @@ pub async fn fetch_table_schema( .map(|c| c.to_bytes()) .unwrap_or_default(); let error_text = String::from_utf8_lossy(&body_bytes); - return Err(ZerobusSinkError::ConfigError { + return Err(ZerobusSinkError::SchemaError { message: format!( "Unity Catalog API returned error {}: {}", status, error_text ), + retryable: status_is_retryable(status), }); } @@ -108,8 +114,9 @@ pub async fn fetch_table_schema( .collect() .await .map(|c| c.to_bytes()) - .map_err(|e| ZerobusSinkError::ConfigError { + .map_err(|e| ZerobusSinkError::SchemaError { message: format!("Failed to read response body: {}", e), + retryable: true, })?; let schema: UnityCatalogTableSchema = @@ -157,8 +164,9 @@ async fn get_oauth_token( let response = http_client .send(request) .await - .map_err(|e| ZerobusSinkError::ConfigError { + .map_err(|e| ZerobusSinkError::SchemaError { message: format!("Failed to get OAuth token: {}", e), + retryable: true, })?; let status = response.status(); @@ -170,8 +178,9 @@ async fn get_oauth_token( .map(|c| c.to_bytes()) .unwrap_or_default(); let error_text = String::from_utf8_lossy(&body_bytes); - return Err(ZerobusSinkError::ConfigError { + return Err(ZerobusSinkError::SchemaError { message: format!("OAuth token request failed {}: {}", status, error_text), + retryable: status_is_retryable(status), }); } @@ -180,8 +189,9 @@ async fn get_oauth_token( .collect() .await .map(|c| c.to_bytes()) - .map_err(|e| ZerobusSinkError::ConfigError { + .map_err(|e| ZerobusSinkError::SchemaError { message: format!("Failed to read OAuth response body: {}", e), + retryable: true, })?; let token_response: OAuthTokenResponse =