diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 1311e09edd4f7..78848fb5ce7db 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -1,11 +1,25 @@ +use std::fs::File; +use std::io::Read; use std::sync::Arc; -use azure_storage_blob::BlobContainerClient; +use azure_core::{ + Error, + credentials::TokenCredential, + error::ErrorKind, + http::{StatusCode, Url}, +}; +use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions}; + +use bytes::Bytes; +use futures::FutureExt; +use snafu::Snafu; use tower::ServiceBuilder; use vector_lib::{ codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer}, configurable::configurable_component, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, sensitive_string::SensitiveString, + stream::DriverResponse, }; use super::request_builder::AzureBlobRequestOptions; @@ -13,15 +27,20 @@ use crate::{ Result, codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, + event::{EventFinalizers, EventStatus, Finalizable}, sinks::{ Healthcheck, VectorSink, + azure_blob::{service::AzureBlobService, sink::AzureBlobSink}, azure_common::{ - self, config::AzureAuthentication, config::AzureBlobRetryLogic, - config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink, + config::AzureAuthentication, + config::AzureBlobTlsConfig, + connection_string::{Auth, ParsedConnectionString}, + shared_key_policy::SharedKeyAuthorizationPolicy, }, util::{ BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults, + TowerRequestConfig, partitioner::KeyPartitioner, retries::RetryLogic, + service::TowerRequestConfigDefaults, }, }, template::Template, @@ -241,7 +260,7 @@ impl SinkConfig for AzureBlobSinkConfig { } }; - let client = azure_common::config::build_client( + let client = build_client( self.auth.clone(), connection_string.clone(), self.container_name.clone(), @@ -250,10 +269,7 @@ impl SinkConfig for AzureBlobSinkConfig { ) .await?; - let healthcheck = azure_common::config::build_healthcheck( - self.container_name.clone(), - Arc::clone(&client), - )?; + let healthcheck = build_healthcheck(self.container_name.clone(), Arc::clone(&client))?; let sink = self.build_processor(client)?; Ok((sink, healthcheck)) } @@ -316,3 +332,240 @@ impl AzureBlobSinkConfig { Ok(KeyPartitioner::new(self.blob_prefix.clone(), None)) } } + +#[derive(Debug, Clone)] +pub struct AzureBlobRequest { + pub blob_data: Bytes, + pub content_encoding: Option<&'static str>, + pub content_type: &'static str, + pub metadata: AzureBlobMetadata, + pub request_metadata: RequestMetadata, +} + +impl Finalizable for AzureBlobRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.metadata.finalizers) + } +} + +impl MetaDescriptive for AzureBlobRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata + } +} + +#[derive(Clone, Debug)] +pub struct AzureBlobMetadata { + pub partition_key: String, + pub count: usize, + pub finalizers: EventFinalizers, +} + +#[derive(Debug, Clone)] +pub struct AzureBlobRetryLogic; + +impl RetryLogic for AzureBlobRetryLogic { + type Error = Error; + type Request = AzureBlobRequest; + type Response = AzureBlobResponse; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + match error.http_status() { + Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests, + None => false, + } + } +} + +#[derive(Debug)] +pub struct AzureBlobResponse { + pub events_byte_size: GroupedCountByteSize, + pub byte_size: usize, +} + +impl DriverResponse for AzureBlobResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.byte_size) + } +} + +#[derive(Debug, Snafu)] +pub enum HealthcheckError { + #[snafu(display("Invalid connection string specified"))] + InvalidCredentials, + #[snafu(display("Container: {:?} not found", container))] + UnknownContainer { container: String }, + #[snafu(display("Unknown status code: {}", status))] + Unknown { status: StatusCode }, +} + +pub fn build_healthcheck( + container_name: String, + client: Arc, +) -> crate::Result { + let healthcheck = async move { + let resp: crate::Result<()> = match client.get_properties(None).await { + Ok(_) => Ok(()), + Err(error) => { + let code = error.http_status(); + Err(match code { + Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials), + Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer { + container: container_name, + }), + Some(status) => Box::new(HealthcheckError::Unknown { status }), + None => "unknown status code".into(), + }) + } + }; + resp + }; + + Ok(healthcheck.boxed()) +} + +pub async fn build_client( + auth: Option, + connection_string: String, + container_name: String, + proxy: &crate::config::ProxyConfig, + tls: Option, +) -> crate::Result> { + // Parse connection string without legacy SDK + let parsed = ParsedConnectionString::parse(&connection_string) + .map_err(|e| format!("Invalid connection string: {e}"))?; + // Compose container URL (SAS appended if present) + let container_url = parsed + .container_url(&container_name) + .map_err(|e| format!("Failed to build container URL: {e}"))?; + let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?; + + let mut credential: Option> = None; + + // Prepare options; attach Shared Key policy if needed + let mut options = BlobContainerClientOptions::default(); + match (parsed.auth(), &auth) { + (Auth::None, None) => { + warn!("No authentication method provided, requests will be anonymous."); + } + (Auth::Sas { .. }, None) => { + info!("Using SAS token authentication."); + } + ( + Auth::SharedKey { + account_name, + account_key, + }, + None, + ) => { + info!("Using Shared Key authentication."); + + let policy = SharedKeyAuthorizationPolicy::new( + account_name, + account_key, + // Use an Azurite-supported storage service version + String::from("2025-11-05"), + ) + .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?; + options + .client_options + .per_call_policies + .push(Arc::new(policy)); + } + (Auth::None, Some(AzureAuthentication::Specific(..))) => { + info!("Using Azure Authentication method."); + let credential_result: Arc = + auth.unwrap().credential().await.map_err(|e| { + Error::with_message( + ErrorKind::Credential, + format!("Failed to configure Azure Authentication: {e}"), + ) + })?; + credential = Some(credential_result); + } + (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => { + return Err(Box::new(Error::with_message( + ErrorKind::Credential, + "Cannot use both SAS token and another Azure Authentication method at the same time", + ))); + } + (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => { + return Err(Box::new(Error::with_message( + ErrorKind::Credential, + "Cannot use both Shared Key and another Azure Authentication method at the same time", + ))); + } + #[cfg(test)] + (Auth::None, Some(AzureAuthentication::MockCredential)) => { + warn!("Using mock token credential authentication."); + credential = Some(auth.unwrap().credential().await.unwrap()); + } + #[cfg(test)] + (_, Some(AzureAuthentication::MockCredential)) => { + return Err(Box::new(Error::with_message( + ErrorKind::Credential, + "Cannot use both connection string auth and mock credential at the same time", + ))); + } + } + + // Use reqwest v0.13 since Azure SDK only implements HttpClient for reqwest::Client v0.13 + let mut reqwest_builder = reqwest_13::ClientBuilder::new(); + let bypass_proxy = { + let host = url.host_str().unwrap_or(""); + let port = url.port(); + proxy.no_proxy.matches(host) + || port + .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p))) + .unwrap_or(false) + }; + if bypass_proxy || !proxy.enabled { + // Ensure no proxy (and disable any potential system proxy auto-detection) + reqwest_builder = reqwest_builder.no_proxy(); + } else { + if let Some(http) = &proxy.http { + let p = reqwest_13::Proxy::http(http) + .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?; + // If credentials are embedded in the proxy URL, reqwest will handle them. + reqwest_builder = reqwest_builder.proxy(p); + } + if let Some(https) = &proxy.https { + let p = reqwest_13::Proxy::https(https) + .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?; + // If credentials are embedded in the proxy URL, reqwest will handle them. + reqwest_builder = reqwest_builder.proxy(p); + } + } + + if let Some(AzureBlobTlsConfig { ca_file }) = &tls + && let Some(ca_file) = ca_file + { + let mut buf = Vec::new(); + File::open(ca_file)?.read_to_end(&mut buf)?; + let cert = reqwest_13::Certificate::from_pem(&buf)?; + + warn!("Adding TLS root certificate from {}", ca_file.display()); + reqwest_builder = reqwest_builder.add_root_certificate(cert); + } + + options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( + reqwest_builder + .build() + .map_err(|e| format!("Failed to build reqwest client: {e}"))?, + ))); + let client = + BlobContainerClient::new(url, credential, Some(options)).map_err(|e| format!("{e}"))?; + Ok(Arc::new(client)) +} diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 9703db841a062..cf35cdfc89206 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -19,7 +19,7 @@ use super::config::AzureBlobSinkConfig; use crate::{ event::{Event, EventArray, LogEvent}, sinks::{ - VectorSink, azure_common, + VectorSink, azure_blob, azure_common, util::{Compression, TowerRequestConfig}, }, test_util::{ @@ -34,7 +34,7 @@ async fn azure_blob_healthcheck_passed() { let config = AzureBlobSinkConfig::new_emulator().await; let client = config.build_test_client().await; - azure_common::config::build_healthcheck(config.container_name, client) + azure_blob::config::build_healthcheck(config.container_name, client) .expect("Failed to build healthcheck") .await .expect("Failed to pass healthcheck"); @@ -45,7 +45,7 @@ async fn azure_blob_healthcheck_passed_with_oauth() { let config = AzureBlobSinkConfig::new_emulator_with_oauth().await; let client = config.build_test_client().await; - azure_common::config::build_healthcheck(config.container_name, client) + azure_blob::config::build_healthcheck(config.container_name, client) .expect("Failed to build healthcheck") .await .expect("Failed to pass healthcheck"); @@ -61,7 +61,7 @@ async fn azure_blob_healthcheck_unknown_container() { let client = config.build_test_client().await; assert_eq!( - azure_common::config::build_healthcheck(config.container_name, client) + azure_blob::config::build_healthcheck(config.container_name, client) .unwrap() .await .unwrap_err() @@ -295,7 +295,7 @@ impl AzureBlobSinkConfig { } async fn build_test_client(&self) -> Arc { - azure_common::config::build_client( + azure_blob::config::build_client( self.auth.clone(), self.connection_string .clone() diff --git a/src/sinks/azure_blob/mod.rs b/src/sinks/azure_blob/mod.rs index 7bb1c91a1335f..fef502c75a3a0 100644 --- a/src/sinks/azure_blob/mod.rs +++ b/src/sinks/azure_blob/mod.rs @@ -1,5 +1,7 @@ mod config; mod request_builder; +mod service; +mod sink; #[cfg(all(test, feature = "azure-blob-integration-tests"))] mod integration_tests; diff --git a/src/sinks/azure_blob/request_builder.rs b/src/sinks/azure_blob/request_builder.rs index 079aea91d3525..02bd4a8ebaa07 100644 --- a/src/sinks/azure_blob/request_builder.rs +++ b/src/sinks/azure_blob/request_builder.rs @@ -1,15 +1,13 @@ use bytes::Bytes; use chrono::Utc; use uuid::Uuid; -use vector_lib::{ - EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, request_metadata::RequestMetadata, -}; +use vector_lib::{codecs::encoding::Framer, request_metadata::RequestMetadata}; use crate::{ codecs::{Encoder, Transformer}, event::{Event, Finalizable}, sinks::{ - azure_common::config::{AzureBlobMetadata, AzureBlobRequest}, + azure_blob::config::{AzureBlobMetadata, AzureBlobRequest}, util::{ Compression, RequestBuilder, metadata::RequestMetadataBuilder, request_builder::EncodeResult, @@ -51,7 +49,6 @@ impl RequestBuilder<(String, Vec)> for AzureBlobRequestOptions { let azure_metadata = AzureBlobMetadata { partition_key, count: events.len(), - byte_size: events.estimated_json_encoded_size_of(), finalizers, }; diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_blob/service.rs similarity index 96% rename from src/sinks/azure_common/service.rs rename to src/sinks/azure_blob/service.rs index 31668ab85ba70..a57fdda1204e1 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_blob/service.rs @@ -10,7 +10,7 @@ use futures::future::BoxFuture; use tower::Service; use tracing::Instrument; -use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse}; +use crate::sinks::azure_blob::config::{AzureBlobRequest, AzureBlobResponse}; #[derive(Clone)] pub struct AzureBlobService { diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_blob/sink.rs similarity index 100% rename from src/sinks/azure_common/sink.rs rename to src/sinks/azure_blob/sink.rs diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 245517d89170b..ed438b78d33b7 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -1,16 +1,10 @@ -use std::fs::File; -use std::io::Read; use std::path::PathBuf; use std::sync::Arc; #[cfg(test)] use base64::prelude::*; -use azure_core::error::Error as AzureCoreError; - -use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString}; -use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy; -use azure_core::http::{ClientMethodOptions, StatusCode, Url}; +use azure_core::http::ClientMethodOptions; use azure_core::credentials::{TokenCredential, TokenRequestOptions}; use azure_core::{Error, error::ErrorKind}; @@ -22,23 +16,7 @@ use azure_identity::{ WorkloadIdentityCredentialOptions, }; -use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions}; - -use bytes::Bytes; -use futures::FutureExt; -use snafu::Snafu; -use vector_lib::{ - configurable::configurable_component, - json_size::JsonSize, - request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, - sensitive_string::SensitiveString, - stream::DriverResponse, -}; - -use crate::{ - event::{EventFinalizers, EventStatus, Finalizable}, - sinks::{Healthcheck, util::retries::RetryLogic}, -}; +use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; /// TLS configuration. #[configurable_component] @@ -404,244 +382,6 @@ impl SpecificAzureCredential { } } -#[derive(Debug, Clone)] -pub struct AzureBlobRequest { - pub blob_data: Bytes, - pub content_encoding: Option<&'static str>, - pub content_type: &'static str, - pub metadata: AzureBlobMetadata, - pub request_metadata: RequestMetadata, -} - -impl Finalizable for AzureBlobRequest { - fn take_finalizers(&mut self) -> EventFinalizers { - std::mem::take(&mut self.metadata.finalizers) - } -} - -impl MetaDescriptive for AzureBlobRequest { - fn get_metadata(&self) -> &RequestMetadata { - &self.request_metadata - } - - fn metadata_mut(&mut self) -> &mut RequestMetadata { - &mut self.request_metadata - } -} - -#[derive(Clone, Debug)] -pub struct AzureBlobMetadata { - pub partition_key: String, - pub count: usize, - pub byte_size: JsonSize, - pub finalizers: EventFinalizers, -} - -#[derive(Debug, Clone)] -pub struct AzureBlobRetryLogic; - -impl RetryLogic for AzureBlobRetryLogic { - type Error = AzureCoreError; - type Request = AzureBlobRequest; - type Response = AzureBlobResponse; - - fn is_retriable_error(&self, error: &Self::Error) -> bool { - match error.http_status() { - Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests, - None => false, - } - } -} - -#[derive(Debug)] -pub struct AzureBlobResponse { - pub events_byte_size: GroupedCountByteSize, - pub byte_size: usize, -} - -impl DriverResponse for AzureBlobResponse { - fn event_status(&self) -> EventStatus { - EventStatus::Delivered - } - - fn events_sent(&self) -> &GroupedCountByteSize { - &self.events_byte_size - } - - fn bytes_sent(&self) -> Option { - Some(self.byte_size) - } -} - -#[derive(Debug, Snafu)] -pub enum HealthcheckError { - #[snafu(display("Invalid connection string specified"))] - InvalidCredentials, - #[snafu(display("Container: {:?} not found", container))] - UnknownContainer { container: String }, - #[snafu(display("Unknown status code: {}", status))] - Unknown { status: StatusCode }, -} - -pub fn build_healthcheck( - container_name: String, - client: Arc, -) -> crate::Result { - let healthcheck = async move { - let resp: crate::Result<()> = match client.get_properties(None).await { - Ok(_) => Ok(()), - Err(error) => { - let code = error.http_status(); - Err(match code { - Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials), - Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer { - container: container_name, - }), - Some(status) => Box::new(HealthcheckError::Unknown { status }), - None => "unknown status code".into(), - }) - } - }; - resp - }; - - Ok(healthcheck.boxed()) -} - -pub async fn build_client( - auth: Option, - connection_string: String, - container_name: String, - proxy: &crate::config::ProxyConfig, - tls: Option, -) -> crate::Result> { - // Parse connection string without legacy SDK - let parsed = ParsedConnectionString::parse(&connection_string) - .map_err(|e| format!("Invalid connection string: {e}"))?; - // Compose container URL (SAS appended if present) - let container_url = parsed - .container_url(&container_name) - .map_err(|e| format!("Failed to build container URL: {e}"))?; - let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?; - - let mut credential: Option> = None; - - // Prepare options; attach Shared Key policy if needed - let mut options = BlobContainerClientOptions::default(); - match (parsed.auth(), &auth) { - (Auth::None, None) => { - warn!("No authentication method provided, requests will be anonymous."); - } - (Auth::Sas { .. }, None) => { - info!("Using SAS token authentication."); - } - ( - Auth::SharedKey { - account_name, - account_key, - }, - None, - ) => { - info!("Using Shared Key authentication."); - - let policy = SharedKeyAuthorizationPolicy::new( - account_name, - account_key, - // Use an Azurite-supported storage service version - String::from("2025-11-05"), - ) - .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?; - options - .client_options - .per_call_policies - .push(Arc::new(policy)); - } - (Auth::None, Some(AzureAuthentication::Specific(..))) => { - info!("Using Azure Authentication method."); - let credential_result: Arc = - auth.unwrap().credential().await.map_err(|e| { - Error::with_message( - ErrorKind::Credential, - format!("Failed to configure Azure Authentication: {e}"), - ) - })?; - credential = Some(credential_result); - } - (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => { - return Err(Box::new(Error::with_message( - ErrorKind::Credential, - "Cannot use both SAS token and another Azure Authentication method at the same time", - ))); - } - (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => { - return Err(Box::new(Error::with_message( - ErrorKind::Credential, - "Cannot use both Shared Key and another Azure Authentication method at the same time", - ))); - } - #[cfg(test)] - (Auth::None, Some(AzureAuthentication::MockCredential)) => { - warn!("Using mock token credential authentication."); - credential = Some(auth.unwrap().credential().await.unwrap()); - } - #[cfg(test)] - (_, Some(AzureAuthentication::MockCredential)) => { - return Err(Box::new(Error::with_message( - ErrorKind::Credential, - "Cannot use both connection string auth and mock credential at the same time", - ))); - } - } - - // Use reqwest v0.13 since Azure SDK only implements HttpClient for reqwest::Client v0.13 - let mut reqwest_builder = reqwest_13::ClientBuilder::new(); - let bypass_proxy = { - let host = url.host_str().unwrap_or(""); - let port = url.port(); - proxy.no_proxy.matches(host) - || port - .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p))) - .unwrap_or(false) - }; - if bypass_proxy || !proxy.enabled { - // Ensure no proxy (and disable any potential system proxy auto-detection) - reqwest_builder = reqwest_builder.no_proxy(); - } else { - if let Some(http) = &proxy.http { - let p = reqwest_13::Proxy::http(http) - .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?; - // If credentials are embedded in the proxy URL, reqwest will handle them. - reqwest_builder = reqwest_builder.proxy(p); - } - if let Some(https) = &proxy.https { - let p = reqwest_13::Proxy::https(https) - .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?; - // If credentials are embedded in the proxy URL, reqwest will handle them. - reqwest_builder = reqwest_builder.proxy(p); - } - } - - if let Some(AzureBlobTlsConfig { ca_file }) = &tls - && let Some(ca_file) = ca_file - { - let mut buf = Vec::new(); - File::open(ca_file)?.read_to_end(&mut buf)?; - let cert = reqwest_13::Certificate::from_pem(&buf)?; - - warn!("Adding TLS root certificate from {}", ca_file.display()); - reqwest_builder = reqwest_builder.add_root_certificate(cert); - } - - options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( - reqwest_builder - .build() - .map_err(|e| format!("Failed to build reqwest client: {e}"))?, - ))); - let client = - BlobContainerClient::new(url, credential, Some(options)).map_err(|e| format!("{e}"))?; - Ok(Arc::new(client)) -} - #[cfg(test)] #[derive(Debug)] struct MockTokenCredential; diff --git a/src/sinks/azure_common/mod.rs b/src/sinks/azure_common/mod.rs index 368c88164fd8d..c4dc5952c6218 100644 --- a/src/sinks/azure_common/mod.rs +++ b/src/sinks/azure_common/mod.rs @@ -1,5 +1,3 @@ pub mod config; pub mod connection_string; -pub mod service; pub mod shared_key_policy; -pub mod sink;