Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The `azure_blob` sink now supports a `tags` option, which sets [blob index tags](https://learn.microsoft.com/azure/storage/blobs/storage-blob-index-how-to) (`x-ms-tags`) on every created blob (parity with the `tags` option on the `aws_s3` sink).

The `azure_blob` sink now supports a `metadata` option, which sets [custom blob metadata](https://learn.microsoft.com/rest/api/storageservices/set-blob-metadata) (`x-ms-meta-*`) on every created blob (parity with the `metadata` option on the `gcp_cloud_storage` sink).

authors: danielku15
55 changes: 55 additions & 0 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use azure_storage_blob::BlobContainerClient;
Expand Down Expand Up @@ -60,6 +61,11 @@ pub struct AzureBlobSinkConfig {
/// | Allowed services | Blob |
/// | Allowed resource types | Container & Object |
/// | Allowed permissions | Read & Create |
///
/// If you also configure the `tags` option, the SAS must include the
/// `Tags` permission. Azure applies the *Set Blob Tags* authorization requirement to
/// the `Put Blob` request that carries the `x-ms-tags` header, so without it tagged
/// uploads fail with an authorization error even when the health check still passes.
#[configurable(metadata(
docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \
resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \
Expand Down Expand Up @@ -148,6 +154,43 @@ pub struct AzureBlobSinkConfig {
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,

/// The set of [blob index tags][blob_index_tags] to apply to created blobs.
Comment thread
Danielku15 marked this conversation as resolved.
///
/// Each entry becomes a tag in the `x-ms-tags` header. Azure limits blobs to 10 tags,
/// with restricted character sets for keys and values; the service rejects invalid
/// configurations.
///
/// When authenticating with a shared access signature (SAS), the token must include the
/// `Tags` permission in addition to `Read` and `Create`. Azure applies the *Set Blob Tags*
/// authorization requirement to the `Put Blob` request that carries these tags, so without
/// it tagged uploads fail with an authorization error even when the health check still passes.
///
/// When authenticating with an Azure credential (managed identity, workload identity, and so
/// on), the identity needs the
/// `Microsoft.Storage/storageAccounts/blobServices/containers/blobs/tags/write` RBAC action.
/// The least-privileged built-in role that grants it is *Storage Blob Data Owner*; the
/// *Storage Blob Data Contributor* role commonly sufficient for uploads does not include it.
///
/// [blob_index_tags]: https://learn.microsoft.com/azure/storage/blobs/storage-blob-index-how-to
#[configurable(metadata(docs::additional_props_description = "A single tag."))]
#[configurable(metadata(docs::examples = "example_tags()"))]
#[serde(default)]
pub tags: Option<BTreeMap<String, String>>,
Comment thread
Danielku15 marked this conversation as resolved.

/// The set of [custom metadata][blob_metadata] `key:value` pairs to apply to created blobs.
///
/// Each entry becomes an `x-ms-meta-{key}` header. Azure limits the total size of all
/// metadata and restricts key names to ASCII alphanumeric characters and underscores,
/// starting with a letter. Non-ASCII values must be Base64-encoded before being set.
/// The service rejects invalid configurations. See the [Azure documentation][blob_metadata]
/// for current limits.
///
/// [blob_metadata]: https://learn.microsoft.com/rest/api/storageservices/set-blob-metadata
#[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
#[configurable(metadata(docs::advanced))]
#[serde(default)]
pub metadata: Option<HashMap<String, String>>,

#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
Expand Down Expand Up @@ -186,6 +229,8 @@ impl GenerateConfig for AzureBlobSinkConfig {
blob_append_uuid: Some(true),
encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
compression: Compression::gzip_default(),
tags: None,
metadata: None,
batch: BatchConfig::default(),
request: TowerRequestConfig::default(),
acknowledgements: Default::default(),
Expand All @@ -195,6 +240,14 @@ impl GenerateConfig for AzureBlobSinkConfig {
}
}

fn example_tags() -> HashMap<String, String> {
HashMap::<_, _>::from_iter([
("Project".to_string(), "Blue".to_string()),
("Classification".to_string(), "confidential".to_string()),
("PHI".to_string(), "True".to_string()),
])
}

#[async_trait::async_trait]
#[typetag::serde(name = "azure_blob")]
impl SinkConfig for AzureBlobSinkConfig {
Expand Down Expand Up @@ -300,6 +353,8 @@ impl AzureBlobSinkConfig {
blob_append_uuid,
encoder: (transformer, encoder),
compression: self.compression,
tags: self.tags.clone(),
metadata: self.metadata.clone(),
};

let sink = AzureBlobSink::new(
Expand Down
98 changes: 94 additions & 4 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::io::{BufRead, BufReader};
use std::sync::Arc;

Expand Down Expand Up @@ -243,6 +244,52 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached_with_oauth() {
.await;
}

async fn assert_insert_lines_with_blob_tags_and_metadata(config: AzureBlobSinkConfig) {
let blob_prefix = format!("lines-tags-meta/into/blob/{}", random_string(10));

let mut tags = BTreeMap::new();
tags.insert("Project".to_string(), "Blue".to_string());
tags.insert("Owner".to_string(), "vector".to_string());

let mut metadata = HashMap::new();
metadata.insert("source".to_string(), "vector".to_string());
metadata.insert("environment".to_string(), "test".to_string());

let config = AzureBlobSinkConfig {
blob_prefix: blob_prefix.clone().try_into().unwrap(),
tags: Some(tags.clone()),
metadata: Some(metadata.clone()),
..config
};
let (_lines, input) = random_lines_with_stream(100, 10, None);

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);

let blob_metadata = config.get_blob_metadata(blobs[0].clone()).await;
assert_eq!(blob_metadata, metadata);

let blob_tags = config.get_blob_tags(blobs[0].clone()).await;
let expected: HashMap<String, String> = tags.into_iter().collect();
assert_eq!(blob_tags, expected);
}

#[tokio::test]
async fn azure_blob_insert_lines_with_blob_tags_and_metadata() {
assert_insert_lines_with_blob_tags_and_metadata(AzureBlobSinkConfig::new_emulator().await)
.await;
}

#[tokio::test]
async fn azure_blob_insert_lines_with_blob_tags_and_metadata_with_oauth() {
assert_insert_lines_with_blob_tags_and_metadata(
AzureBlobSinkConfig::new_emulator_with_oauth().await,
)
.await;
}

impl AzureBlobSinkConfig {
pub async fn new_emulator() -> AzureBlobSinkConfig {
let address = std::env::var("AZURITE_ADDRESS").unwrap_or_else(|_| "localhost".into());
Expand All @@ -257,6 +304,8 @@ impl AzureBlobSinkConfig {
blob_append_uuid: None,
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
tags: None,
metadata: None,
batch: Default::default(),
request: TowerRequestConfig::default(),
acknowledgements: Default::default(),
Expand All @@ -281,6 +330,8 @@ impl AzureBlobSinkConfig {
blob_append_uuid: None,
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
tags: None,
metadata: None,
batch: Default::default(),
request: TowerRequestConfig::default(),
acknowledgements: Default::default(),
Expand Down Expand Up @@ -335,10 +386,10 @@ impl AzureBlobSinkConfig {
let mut names = Vec::new();
while let Some(result) = pager.next().await {
let item = result.expect("Failed to fetch blobs");
if let Some(name) = item.name
&& name.starts_with(&prefix)
{
names.push(name);
if let Some(name) = item.name {
if name.starts_with(&prefix) {
names.push(name);
}
}
}

Expand Down Expand Up @@ -388,6 +439,45 @@ impl AzureBlobSinkConfig {
(content_type, content_encoding, self.get_blob_content(data))
}

pub async fn get_blob_metadata(&self, blob: String) -> HashMap<String, String> {
let client = self.build_test_client().await;
let blob_client = client.blob_client(&blob);
let props_resp = blob_client
.get_properties(None)
.await
.expect("Failed to get blob properties");

const META_PREFIX: &str = "x-ms-meta-";
props_resp
.headers()
.iter()
.filter_map(|(name, value)| {
let key = name.as_str();
if key.len() > META_PREFIX.len()
&& key[..META_PREFIX.len()].eq_ignore_ascii_case(META_PREFIX)
{
Some((
key[META_PREFIX.len()..].to_string(),
value.as_str().to_string(),
))
} else {
None
}
})
.collect()
}

pub async fn get_blob_tags(&self, blob: String) -> HashMap<String, String> {
let client = self.build_test_client().await;
let blob_client = client.blob_client(&blob);
let resp = blob_client
.get_tags(None)
.await
.expect("Failed to get blob tags");
let body = resp.into_model().expect("Failed to decode tags body");
HashMap::from(body)
}

fn get_blob_content(&self, data: Vec<u8>) -> Vec<String> {
let body = BytesMut::from(data.as_slice()).freeze().reader();

Expand Down
28 changes: 28 additions & 0 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::collections::{BTreeMap, HashMap};

use bytes::Bytes;
use chrono::Utc;
use percent_encoding::{NON_ALPHANUMERIC, utf8_percent_encode};
use uuid::Uuid;
use vector_lib::{
EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, request_metadata::RequestMetadata,
Expand All @@ -24,6 +27,8 @@ pub struct AzureBlobRequestOptions {
pub blob_append_uuid: bool,
pub encoder: (Transformer, Encoder<Framer>),
pub compression: Compression,
pub tags: Option<BTreeMap<String, String>>,
pub metadata: Option<HashMap<String, String>>,
}

impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
Expand Down Expand Up @@ -95,6 +100,29 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
content_type: self.encoder.1.content_type(),
metadata: azure_metadata,
request_metadata,
// SDK 0.10.1 has no `with_tags()` helper, so we set the pre-encoded
// `x-ms-tags` header value directly. Match the Azure SDK's own encoding
// (`percent_encoding::NON_ALPHANUMERIC`, which emits `%20` for spaces) rather
// than `url::form_urlencoded` (which emits `+`).
tags: self
.tags
.as_ref()
.filter(|m| !m.is_empty())
.map(encode_tags),
blob_metadata: self.metadata.as_ref().filter(|m| !m.is_empty()).cloned(),
}
}
}

fn encode_tags(tags: &BTreeMap<String, String>) -> String {
let mut out = String::new();
for (k, v) in tags {
if !out.is_empty() {
out.push('&');
}
out.push_str(&utf8_percent_encode(k, NON_ALPHANUMERIC).to_string());
out.push('=');
out.push_str(&utf8_percent_encode(v, NON_ALPHANUMERIC).to_string());
}
out
}
Loading
Loading