Skip to content

Commit 0f80faf

Browse files
jlaundryCopilot
andauthored
chore(azure sinks): refactor Blob code out of azure_common (#25689)
Co-authored-by: Copilot <copilot@github.com>
1 parent 0141894 commit 0f80faf

8 files changed

Lines changed: 274 additions & 284 deletions

File tree

src/sinks/azure_blob/config.rs

Lines changed: 262 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,46 @@
1+
use std::fs::File;
2+
use std::io::Read;
13
use std::sync::Arc;
24

3-
use azure_storage_blob::BlobContainerClient;
5+
use azure_core::{
6+
Error,
7+
credentials::TokenCredential,
8+
error::ErrorKind,
9+
http::{StatusCode, Url},
10+
};
11+
use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
12+
13+
use bytes::Bytes;
14+
use futures::FutureExt;
15+
use snafu::Snafu;
416
use tower::ServiceBuilder;
517
use vector_lib::{
618
codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
719
configurable::configurable_component,
20+
request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
821
sensitive_string::SensitiveString,
22+
stream::DriverResponse,
923
};
1024

1125
use super::request_builder::AzureBlobRequestOptions;
1226
use crate::{
1327
Result,
1428
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
1529
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
30+
event::{EventFinalizers, EventStatus, Finalizable},
1631
sinks::{
1732
Healthcheck, VectorSink,
33+
azure_blob::{service::AzureBlobService, sink::AzureBlobSink},
1834
azure_common::{
19-
self, config::AzureAuthentication, config::AzureBlobRetryLogic,
20-
config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink,
35+
config::AzureAuthentication,
36+
config::AzureBlobTlsConfig,
37+
connection_string::{Auth, ParsedConnectionString},
38+
shared_key_policy::SharedKeyAuthorizationPolicy,
2139
},
2240
util::{
2341
BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
24-
TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
42+
TowerRequestConfig, partitioner::KeyPartitioner, retries::RetryLogic,
43+
service::TowerRequestConfigDefaults,
2544
},
2645
},
2746
template::Template,
@@ -241,7 +260,7 @@ impl SinkConfig for AzureBlobSinkConfig {
241260
}
242261
};
243262

244-
let client = azure_common::config::build_client(
263+
let client = build_client(
245264
self.auth.clone(),
246265
connection_string.clone(),
247266
self.container_name.clone(),
@@ -250,10 +269,7 @@ impl SinkConfig for AzureBlobSinkConfig {
250269
)
251270
.await?;
252271

253-
let healthcheck = azure_common::config::build_healthcheck(
254-
self.container_name.clone(),
255-
Arc::clone(&client),
256-
)?;
272+
let healthcheck = build_healthcheck(self.container_name.clone(), Arc::clone(&client))?;
257273
let sink = self.build_processor(client)?;
258274
Ok((sink, healthcheck))
259275
}
@@ -316,3 +332,240 @@ impl AzureBlobSinkConfig {
316332
Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
317333
}
318334
}
335+
336+
#[derive(Debug, Clone)]
337+
pub struct AzureBlobRequest {
338+
pub blob_data: Bytes,
339+
pub content_encoding: Option<&'static str>,
340+
pub content_type: &'static str,
341+
pub metadata: AzureBlobMetadata,
342+
pub request_metadata: RequestMetadata,
343+
}
344+
345+
impl Finalizable for AzureBlobRequest {
346+
fn take_finalizers(&mut self) -> EventFinalizers {
347+
std::mem::take(&mut self.metadata.finalizers)
348+
}
349+
}
350+
351+
impl MetaDescriptive for AzureBlobRequest {
352+
fn get_metadata(&self) -> &RequestMetadata {
353+
&self.request_metadata
354+
}
355+
356+
fn metadata_mut(&mut self) -> &mut RequestMetadata {
357+
&mut self.request_metadata
358+
}
359+
}
360+
361+
#[derive(Clone, Debug)]
362+
pub struct AzureBlobMetadata {
363+
pub partition_key: String,
364+
pub count: usize,
365+
pub finalizers: EventFinalizers,
366+
}
367+
368+
#[derive(Debug, Clone)]
369+
pub struct AzureBlobRetryLogic;
370+
371+
impl RetryLogic for AzureBlobRetryLogic {
372+
type Error = Error;
373+
type Request = AzureBlobRequest;
374+
type Response = AzureBlobResponse;
375+
376+
fn is_retriable_error(&self, error: &Self::Error) -> bool {
377+
match error.http_status() {
378+
Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
379+
None => false,
380+
}
381+
}
382+
}
383+
384+
#[derive(Debug)]
385+
pub struct AzureBlobResponse {
386+
pub events_byte_size: GroupedCountByteSize,
387+
pub byte_size: usize,
388+
}
389+
390+
impl DriverResponse for AzureBlobResponse {
391+
fn event_status(&self) -> EventStatus {
392+
EventStatus::Delivered
393+
}
394+
395+
fn events_sent(&self) -> &GroupedCountByteSize {
396+
&self.events_byte_size
397+
}
398+
399+
fn bytes_sent(&self) -> Option<usize> {
400+
Some(self.byte_size)
401+
}
402+
}
403+
404+
#[derive(Debug, Snafu)]
405+
pub enum HealthcheckError {
406+
#[snafu(display("Invalid connection string specified"))]
407+
InvalidCredentials,
408+
#[snafu(display("Container: {:?} not found", container))]
409+
UnknownContainer { container: String },
410+
#[snafu(display("Unknown status code: {}", status))]
411+
Unknown { status: StatusCode },
412+
}
413+
414+
pub fn build_healthcheck(
415+
container_name: String,
416+
client: Arc<BlobContainerClient>,
417+
) -> crate::Result<Healthcheck> {
418+
let healthcheck = async move {
419+
let resp: crate::Result<()> = match client.get_properties(None).await {
420+
Ok(_) => Ok(()),
421+
Err(error) => {
422+
let code = error.http_status();
423+
Err(match code {
424+
Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
425+
Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
426+
container: container_name,
427+
}),
428+
Some(status) => Box::new(HealthcheckError::Unknown { status }),
429+
None => "unknown status code".into(),
430+
})
431+
}
432+
};
433+
resp
434+
};
435+
436+
Ok(healthcheck.boxed())
437+
}
438+
439+
pub async fn build_client(
440+
auth: Option<AzureAuthentication>,
441+
connection_string: String,
442+
container_name: String,
443+
proxy: &crate::config::ProxyConfig,
444+
tls: Option<AzureBlobTlsConfig>,
445+
) -> crate::Result<Arc<BlobContainerClient>> {
446+
// Parse connection string without legacy SDK
447+
let parsed = ParsedConnectionString::parse(&connection_string)
448+
.map_err(|e| format!("Invalid connection string: {e}"))?;
449+
// Compose container URL (SAS appended if present)
450+
let container_url = parsed
451+
.container_url(&container_name)
452+
.map_err(|e| format!("Failed to build container URL: {e}"))?;
453+
let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
454+
455+
let mut credential: Option<Arc<dyn TokenCredential>> = None;
456+
457+
// Prepare options; attach Shared Key policy if needed
458+
let mut options = BlobContainerClientOptions::default();
459+
match (parsed.auth(), &auth) {
460+
(Auth::None, None) => {
461+
warn!("No authentication method provided, requests will be anonymous.");
462+
}
463+
(Auth::Sas { .. }, None) => {
464+
info!("Using SAS token authentication.");
465+
}
466+
(
467+
Auth::SharedKey {
468+
account_name,
469+
account_key,
470+
},
471+
None,
472+
) => {
473+
info!("Using Shared Key authentication.");
474+
475+
let policy = SharedKeyAuthorizationPolicy::new(
476+
account_name,
477+
account_key,
478+
// Use an Azurite-supported storage service version
479+
String::from("2025-11-05"),
480+
)
481+
.map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
482+
options
483+
.client_options
484+
.per_call_policies
485+
.push(Arc::new(policy));
486+
}
487+
(Auth::None, Some(AzureAuthentication::Specific(..))) => {
488+
info!("Using Azure Authentication method.");
489+
let credential_result: Arc<dyn TokenCredential> =
490+
auth.unwrap().credential().await.map_err(|e| {
491+
Error::with_message(
492+
ErrorKind::Credential,
493+
format!("Failed to configure Azure Authentication: {e}"),
494+
)
495+
})?;
496+
credential = Some(credential_result);
497+
}
498+
(Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
499+
return Err(Box::new(Error::with_message(
500+
ErrorKind::Credential,
501+
"Cannot use both SAS token and another Azure Authentication method at the same time",
502+
)));
503+
}
504+
(Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
505+
return Err(Box::new(Error::with_message(
506+
ErrorKind::Credential,
507+
"Cannot use both Shared Key and another Azure Authentication method at the same time",
508+
)));
509+
}
510+
#[cfg(test)]
511+
(Auth::None, Some(AzureAuthentication::MockCredential)) => {
512+
warn!("Using mock token credential authentication.");
513+
credential = Some(auth.unwrap().credential().await.unwrap());
514+
}
515+
#[cfg(test)]
516+
(_, Some(AzureAuthentication::MockCredential)) => {
517+
return Err(Box::new(Error::with_message(
518+
ErrorKind::Credential,
519+
"Cannot use both connection string auth and mock credential at the same time",
520+
)));
521+
}
522+
}
523+
524+
// Use reqwest v0.13 since Azure SDK only implements HttpClient for reqwest::Client v0.13
525+
let mut reqwest_builder = reqwest_13::ClientBuilder::new();
526+
let bypass_proxy = {
527+
let host = url.host_str().unwrap_or("");
528+
let port = url.port();
529+
proxy.no_proxy.matches(host)
530+
|| port
531+
.map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
532+
.unwrap_or(false)
533+
};
534+
if bypass_proxy || !proxy.enabled {
535+
// Ensure no proxy (and disable any potential system proxy auto-detection)
536+
reqwest_builder = reqwest_builder.no_proxy();
537+
} else {
538+
if let Some(http) = &proxy.http {
539+
let p = reqwest_13::Proxy::http(http)
540+
.map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
541+
// If credentials are embedded in the proxy URL, reqwest will handle them.
542+
reqwest_builder = reqwest_builder.proxy(p);
543+
}
544+
if let Some(https) = &proxy.https {
545+
let p = reqwest_13::Proxy::https(https)
546+
.map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
547+
// If credentials are embedded in the proxy URL, reqwest will handle them.
548+
reqwest_builder = reqwest_builder.proxy(p);
549+
}
550+
}
551+
552+
if let Some(AzureBlobTlsConfig { ca_file }) = &tls
553+
&& let Some(ca_file) = ca_file
554+
{
555+
let mut buf = Vec::new();
556+
File::open(ca_file)?.read_to_end(&mut buf)?;
557+
let cert = reqwest_13::Certificate::from_pem(&buf)?;
558+
559+
warn!("Adding TLS root certificate from {}", ca_file.display());
560+
reqwest_builder = reqwest_builder.add_root_certificate(cert);
561+
}
562+
563+
options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
564+
reqwest_builder
565+
.build()
566+
.map_err(|e| format!("Failed to build reqwest client: {e}"))?,
567+
)));
568+
let client =
569+
BlobContainerClient::new(url, credential, Some(options)).map_err(|e| format!("{e}"))?;
570+
Ok(Arc::new(client))
571+
}

src/sinks/azure_blob/integration_tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use super::config::AzureBlobSinkConfig;
1919
use crate::{
2020
event::{Event, EventArray, LogEvent},
2121
sinks::{
22-
VectorSink, azure_common,
22+
VectorSink, azure_blob, azure_common,
2323
util::{Compression, TowerRequestConfig},
2424
},
2525
test_util::{
@@ -34,7 +34,7 @@ async fn azure_blob_healthcheck_passed() {
3434
let config = AzureBlobSinkConfig::new_emulator().await;
3535
let client = config.build_test_client().await;
3636

37-
azure_common::config::build_healthcheck(config.container_name, client)
37+
azure_blob::config::build_healthcheck(config.container_name, client)
3838
.expect("Failed to build healthcheck")
3939
.await
4040
.expect("Failed to pass healthcheck");
@@ -45,7 +45,7 @@ async fn azure_blob_healthcheck_passed_with_oauth() {
4545
let config = AzureBlobSinkConfig::new_emulator_with_oauth().await;
4646
let client = config.build_test_client().await;
4747

48-
azure_common::config::build_healthcheck(config.container_name, client)
48+
azure_blob::config::build_healthcheck(config.container_name, client)
4949
.expect("Failed to build healthcheck")
5050
.await
5151
.expect("Failed to pass healthcheck");
@@ -61,7 +61,7 @@ async fn azure_blob_healthcheck_unknown_container() {
6161
let client = config.build_test_client().await;
6262

6363
assert_eq!(
64-
azure_common::config::build_healthcheck(config.container_name, client)
64+
azure_blob::config::build_healthcheck(config.container_name, client)
6565
.unwrap()
6666
.await
6767
.unwrap_err()
@@ -295,7 +295,7 @@ impl AzureBlobSinkConfig {
295295
}
296296

297297
async fn build_test_client(&self) -> Arc<BlobContainerClient> {
298-
azure_common::config::build_client(
298+
azure_blob::config::build_client(
299299
self.auth.clone(),
300300
self.connection_string
301301
.clone()

src/sinks/azure_blob/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod config;
22
mod request_builder;
3+
mod service;
4+
mod sink;
35

46
#[cfg(all(test, feature = "azure-blob-integration-tests"))]
57
mod integration_tests;

src/sinks/azure_blob/request_builder.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
use bytes::Bytes;
22
use chrono::Utc;
33
use uuid::Uuid;
4-
use vector_lib::{
5-
EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, request_metadata::RequestMetadata,
6-
};
4+
use vector_lib::{codecs::encoding::Framer, request_metadata::RequestMetadata};
75

86
use crate::{
97
codecs::{Encoder, Transformer},
108
event::{Event, Finalizable},
119
sinks::{
12-
azure_common::config::{AzureBlobMetadata, AzureBlobRequest},
10+
azure_blob::config::{AzureBlobMetadata, AzureBlobRequest},
1311
util::{
1412
Compression, RequestBuilder, metadata::RequestMetadataBuilder,
1513
request_builder::EncodeResult,
@@ -51,7 +49,6 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
5149
let azure_metadata = AzureBlobMetadata {
5250
partition_key,
5351
count: events.len(),
54-
byte_size: events.estimated_json_encoded_size_of(),
5552
finalizers,
5653
};
5754

0 commit comments

Comments
 (0)