Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 262 additions & 9 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,46 @@
use std::fs::File;
use std::io::Read;
use std::sync::Arc;

use azure_storage_blob::BlobContainerClient;
use azure_core::{
Error,
credentials::TokenCredential,
error::ErrorKind,
http::{StatusCode, Url},
};
use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};

use bytes::Bytes;
use futures::FutureExt;
use snafu::Snafu;
use tower::ServiceBuilder;
use vector_lib::{
codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
configurable::configurable_component,
request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
sensitive_string::SensitiveString,
stream::DriverResponse,
};

use super::request_builder::AzureBlobRequestOptions;
use crate::{
Result,
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
event::{EventFinalizers, EventStatus, Finalizable},
sinks::{
Healthcheck, VectorSink,
azure_blob::{service::AzureBlobService, sink::AzureBlobSink},
azure_common::{
self, config::AzureAuthentication, config::AzureBlobRetryLogic,
config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink,
config::AzureAuthentication,
config::AzureBlobTlsConfig,
connection_string::{Auth, ParsedConnectionString},
shared_key_policy::SharedKeyAuthorizationPolicy,
},
util::{
BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
TowerRequestConfig, partitioner::KeyPartitioner, retries::RetryLogic,
service::TowerRequestConfigDefaults,
},
},
template::Template,
Expand Down Expand Up @@ -241,7 +260,7 @@ impl SinkConfig for AzureBlobSinkConfig {
}
};

let client = azure_common::config::build_client(
let client = build_client(
self.auth.clone(),
connection_string.clone(),
self.container_name.clone(),
Expand All @@ -250,10 +269,7 @@ impl SinkConfig for AzureBlobSinkConfig {
)
.await?;

let healthcheck = azure_common::config::build_healthcheck(
self.container_name.clone(),
Arc::clone(&client),
)?;
let healthcheck = build_healthcheck(self.container_name.clone(), Arc::clone(&client))?;
let sink = self.build_processor(client)?;
Ok((sink, healthcheck))
}
Expand Down Expand Up @@ -316,3 +332,240 @@ impl AzureBlobSinkConfig {
Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
}
}

#[derive(Debug, Clone)]
pub struct AzureBlobRequest {
pub blob_data: Bytes,
pub content_encoding: Option<&'static str>,
pub content_type: &'static str,
pub metadata: AzureBlobMetadata,
pub request_metadata: RequestMetadata,
}

impl Finalizable for AzureBlobRequest {
fn take_finalizers(&mut self) -> EventFinalizers {
std::mem::take(&mut self.metadata.finalizers)
}
}

impl MetaDescriptive for AzureBlobRequest {
fn get_metadata(&self) -> &RequestMetadata {
&self.request_metadata
}

fn metadata_mut(&mut self) -> &mut RequestMetadata {
&mut self.request_metadata
}
}

#[derive(Clone, Debug)]
pub struct AzureBlobMetadata {
pub partition_key: String,
pub count: usize,
pub finalizers: EventFinalizers,
}

#[derive(Debug, Clone)]
pub struct AzureBlobRetryLogic;

impl RetryLogic for AzureBlobRetryLogic {
type Error = Error;
type Request = AzureBlobRequest;
type Response = AzureBlobResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
match error.http_status() {
Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
None => false,
}
}
}

#[derive(Debug)]
pub struct AzureBlobResponse {
pub events_byte_size: GroupedCountByteSize,
pub byte_size: usize,
}

impl DriverResponse for AzureBlobResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
}

fn events_sent(&self) -> &GroupedCountByteSize {
&self.events_byte_size
}

fn bytes_sent(&self) -> Option<usize> {
Some(self.byte_size)
}
}

#[derive(Debug, Snafu)]
pub enum HealthcheckError {
#[snafu(display("Invalid connection string specified"))]
InvalidCredentials,
#[snafu(display("Container: {:?} not found", container))]
UnknownContainer { container: String },
#[snafu(display("Unknown status code: {}", status))]
Unknown { status: StatusCode },
}

pub fn build_healthcheck(
container_name: String,
client: Arc<BlobContainerClient>,
) -> crate::Result<Healthcheck> {
let healthcheck = async move {
let resp: crate::Result<()> = match client.get_properties(None).await {
Ok(_) => Ok(()),
Err(error) => {
let code = error.http_status();
Err(match code {
Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
container: container_name,
}),
Some(status) => Box::new(HealthcheckError::Unknown { status }),
None => "unknown status code".into(),
})
}
};
resp
};

Ok(healthcheck.boxed())
}

pub async fn build_client(
auth: Option<AzureAuthentication>,
connection_string: String,
container_name: String,
proxy: &crate::config::ProxyConfig,
tls: Option<AzureBlobTlsConfig>,
) -> crate::Result<Arc<BlobContainerClient>> {
// Parse connection string without legacy SDK
let parsed = ParsedConnectionString::parse(&connection_string)
.map_err(|e| format!("Invalid connection string: {e}"))?;
// Compose container URL (SAS appended if present)
let container_url = parsed
.container_url(&container_name)
.map_err(|e| format!("Failed to build container URL: {e}"))?;
let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;

let mut credential: Option<Arc<dyn TokenCredential>> = None;

// Prepare options; attach Shared Key policy if needed
let mut options = BlobContainerClientOptions::default();
match (parsed.auth(), &auth) {
(Auth::None, None) => {
warn!("No authentication method provided, requests will be anonymous.");
}
(Auth::Sas { .. }, None) => {
info!("Using SAS token authentication.");
}
(
Auth::SharedKey {
account_name,
account_key,
},
None,
) => {
info!("Using Shared Key authentication.");

let policy = SharedKeyAuthorizationPolicy::new(
account_name,
account_key,
// Use an Azurite-supported storage service version
String::from("2025-11-05"),
)
.map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
options
.client_options
.per_call_policies
.push(Arc::new(policy));
}
(Auth::None, Some(AzureAuthentication::Specific(..))) => {
info!("Using Azure Authentication method.");
let credential_result: Arc<dyn TokenCredential> =
auth.unwrap().credential().await.map_err(|e| {
Error::with_message(
ErrorKind::Credential,
format!("Failed to configure Azure Authentication: {e}"),
)
})?;
credential = Some(credential_result);
}
(Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
return Err(Box::new(Error::with_message(
ErrorKind::Credential,
"Cannot use both SAS token and another Azure Authentication method at the same time",
)));
}
(Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
return Err(Box::new(Error::with_message(
ErrorKind::Credential,
"Cannot use both Shared Key and another Azure Authentication method at the same time",
)));
}
#[cfg(test)]
(Auth::None, Some(AzureAuthentication::MockCredential)) => {
warn!("Using mock token credential authentication.");
credential = Some(auth.unwrap().credential().await.unwrap());
}
#[cfg(test)]
(_, Some(AzureAuthentication::MockCredential)) => {
return Err(Box::new(Error::with_message(
ErrorKind::Credential,
"Cannot use both connection string auth and mock credential at the same time",
)));
}
}

// Use reqwest v0.13 since Azure SDK only implements HttpClient for reqwest::Client v0.13
let mut reqwest_builder = reqwest_13::ClientBuilder::new();
let bypass_proxy = {
let host = url.host_str().unwrap_or("");
let port = url.port();
proxy.no_proxy.matches(host)
|| port
.map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
.unwrap_or(false)
};
if bypass_proxy || !proxy.enabled {
// Ensure no proxy (and disable any potential system proxy auto-detection)
reqwest_builder = reqwest_builder.no_proxy();
} else {
if let Some(http) = &proxy.http {
let p = reqwest_13::Proxy::http(http)
.map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
// If credentials are embedded in the proxy URL, reqwest will handle them.
reqwest_builder = reqwest_builder.proxy(p);
}
if let Some(https) = &proxy.https {
let p = reqwest_13::Proxy::https(https)
.map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
// If credentials are embedded in the proxy URL, reqwest will handle them.
reqwest_builder = reqwest_builder.proxy(p);
}
}

if let Some(AzureBlobTlsConfig { ca_file }) = &tls
&& let Some(ca_file) = ca_file
{
let mut buf = Vec::new();
File::open(ca_file)?.read_to_end(&mut buf)?;
let cert = reqwest_13::Certificate::from_pem(&buf)?;

warn!("Adding TLS root certificate from {}", ca_file.display());
reqwest_builder = reqwest_builder.add_root_certificate(cert);
}

options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
reqwest_builder
.build()
.map_err(|e| format!("Failed to build reqwest client: {e}"))?,
)));
let client =
BlobContainerClient::new(url, credential, Some(options)).map_err(|e| format!("{e}"))?;
Ok(Arc::new(client))
}
10 changes: 5 additions & 5 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::config::AzureBlobSinkConfig;
use crate::{
event::{Event, EventArray, LogEvent},
sinks::{
VectorSink, azure_common,
VectorSink, azure_blob, azure_common,
util::{Compression, TowerRequestConfig},
},
test_util::{
Expand All @@ -34,7 +34,7 @@ async fn azure_blob_healthcheck_passed() {
let config = AzureBlobSinkConfig::new_emulator().await;
let client = config.build_test_client().await;

azure_common::config::build_healthcheck(config.container_name, client)
azure_blob::config::build_healthcheck(config.container_name, client)
.expect("Failed to build healthcheck")
.await
.expect("Failed to pass healthcheck");
Expand All @@ -45,7 +45,7 @@ async fn azure_blob_healthcheck_passed_with_oauth() {
let config = AzureBlobSinkConfig::new_emulator_with_oauth().await;
let client = config.build_test_client().await;

azure_common::config::build_healthcheck(config.container_name, client)
azure_blob::config::build_healthcheck(config.container_name, client)
.expect("Failed to build healthcheck")
.await
.expect("Failed to pass healthcheck");
Expand All @@ -61,7 +61,7 @@ async fn azure_blob_healthcheck_unknown_container() {
let client = config.build_test_client().await;

assert_eq!(
azure_common::config::build_healthcheck(config.container_name, client)
azure_blob::config::build_healthcheck(config.container_name, client)
.unwrap()
.await
.unwrap_err()
Expand Down Expand Up @@ -295,7 +295,7 @@ impl AzureBlobSinkConfig {
}

async fn build_test_client(&self) -> Arc<BlobContainerClient> {
azure_common::config::build_client(
azure_blob::config::build_client(
self.auth.clone(),
self.connection_string
.clone()
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/azure_blob/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod config;
mod request_builder;
mod service;
mod sink;

#[cfg(all(test, feature = "azure-blob-integration-tests"))]
mod integration_tests;
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use bytes::Bytes;
use chrono::Utc;
use uuid::Uuid;
use vector_lib::{
EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, request_metadata::RequestMetadata,
};
use vector_lib::{codecs::encoding::Framer, request_metadata::RequestMetadata};

use crate::{
codecs::{Encoder, Transformer},
event::{Event, Finalizable},
sinks::{
azure_common::config::{AzureBlobMetadata, AzureBlobRequest},
azure_blob::config::{AzureBlobMetadata, AzureBlobRequest},
util::{
Compression, RequestBuilder, metadata::RequestMetadataBuilder,
request_builder::EncodeResult,
Expand Down Expand Up @@ -51,7 +49,6 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
let azure_metadata = AzureBlobMetadata {
partition_key,
count: events.len(),
byte_size: events.estimated_json_encoded_size_of(),
finalizers,
};

Expand Down
Loading
Loading