Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 126 additions & 4 deletions src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::client::get::GetClient;
use crate::client::header::{HeaderConfig, get_put_result};
use crate::client::list::ListClient;
use crate::client::retry::{RetryContext, RetryExt};
use crate::client::token::{TemporaryToken, TokenCache};
use crate::client::{
CryptoProvider, DigestAlgorithm, GetOptionsExt, HttpClient, HttpError, HttpRequest,
HttpResponse, crypto_provider,
Expand All @@ -47,7 +48,7 @@ use rand::RngExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";
Expand Down Expand Up @@ -662,16 +663,47 @@ async fn parse_blob_batch_delete_body(
Ok(results)
}

/// How long a freshly fetched user delegation key is requested to remain valid.
///
/// The SAS tokens we sign with it stay short-lived; this only bounds how often
/// we call `GetUserDelegationKey`. Azure caps the key lifetime at 7 days.
const DELEGATION_KEY_VALIDITY: Duration = Duration::from_secs(12 * 60 * 60);

/// Minimum remaining validity for a cached key to be reused.
///
/// The cache only hands back a key with at least this much life left, so it is
/// also the longest SAS lifetime the cache can safely serve (a SAS must not
/// outlive the key it is signed with). Longer-lived SAS fetch a dedicated key.
const DELEGATION_KEY_MIN_TTL: Duration = Duration::from_secs(2 * 60 * 60);

/// Parse the validity Azure actually granted a user delegation key, falling back
/// to the window we requested if the response can't be parsed.
fn delegation_key_expiry(key: &UserDelegationKey, requested: DateTime<Utc>) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(&key.signed_expiry)
.map(|t| t.with_timezone(&Utc))
.unwrap_or(requested)
}

#[derive(Debug)]
pub(crate) struct AzureClient {
config: AzureConfig,
client: HttpClient,
/// Caches the user delegation key used to sign SAS URLs.
///
/// Fetching a key is a network round-trip (`GetUserDelegationKey`) that Azure
/// throttles under load, so we fetch a long-lived key once and reuse it to
/// mint many short-lived SAS tokens.
delegation_key_cache: TokenCache<UserDelegationKey>,
Comment thread
emilk marked this conversation as resolved.
}

impl AzureClient {
/// create a new instance of [AzureClient]
pub(crate) fn new(config: AzureConfig, client: HttpClient) -> Self {
Self { config, client }
Self {
config,
client,
delegation_key_cache: TokenCache::default().with_min_ttl(DELEGATION_KEY_MIN_TTL),
}
}

/// Returns the config
Expand Down Expand Up @@ -1020,7 +1052,7 @@ impl AzureClient {
match credential.as_deref() {
Some(AzureCredential::BearerToken(_)) => {
let key = self
.get_user_delegation_key(&signed_start, &signed_expiry)
.user_delegation_key(signed_start, signed_expiry, expires_in)
.await?;
let signing_key = AzureAccessKey::try_new(&key.value)?;
Ok(AzureSigner::new(
Expand All @@ -1043,6 +1075,50 @@ impl AzureClient {
}
}

/// Return a user delegation key valid for a SAS over `[sas_start, sas_expiry]`.
///
/// `GetUserDelegationKey` is a network round-trip that Azure throttles (HTTP
/// 503) under load, so a long-lived key is cached and reused to sign many
/// short-lived SAS URLs.
///
/// The cache only returns a key with more than [`DELEGATION_KEY_MIN_TTL`]
/// remaining, so any SAS no longer than that is guaranteed to expire before
/// its key. The (rare) longer-lived SAS get a dedicated key instead.
async fn user_delegation_key(
&self,
sas_start: DateTime<Utc>,
sas_expiry: DateTime<Utc>,
expires_in: Duration,
) -> Result<UserDelegationKey> {
if expires_in <= DELEGATION_KEY_MIN_TTL {
self.delegation_key_cache
.get_or_insert_with(|| self.fetch_delegation_key(DELEGATION_KEY_VALIDITY))
.await
} else {
self.get_user_delegation_key(&sas_start, &sas_expiry).await
}
}

/// Fetch a user delegation key valid for `validity` and wrap it as a
/// [`TemporaryToken`] so [`TokenCache`] can expire it.
async fn fetch_delegation_key(
&self,
validity: Duration,
) -> Result<TemporaryToken<UserDelegationKey>> {
let start = chrono::Utc::now();
let requested_expiry = start + validity;
let key = self
.get_user_delegation_key(&start, &requested_expiry)
.await?;
// Expire the cache entry when the key Azure granted does (it may clamp it).
let expiry = delegation_key_expiry(&key, requested_expiry);
let ttl = (expiry - chrono::Utc::now()).to_std().unwrap_or(validity);
Ok(TemporaryToken {
token: key,
expiry: Some(Instant::now() + ttl),
})
}

#[cfg(test)]
pub(crate) async fn get_blob_tagging(&self, path: &Path) -> Result<HttpResponse> {
let credential = self.get_credential().await?;
Expand Down Expand Up @@ -1385,7 +1461,7 @@ impl BlockList {
}
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[derive(Clone, Default, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct UserDelegationKey {
pub signed_oid: String,
Expand All @@ -1397,6 +1473,29 @@ pub(crate) struct UserDelegationKey {
pub value: String,
}

impl std::fmt::Debug for UserDelegationKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
signed_oid,
signed_tid,
signed_start,
signed_expiry,
signed_service,
signed_version,
value: _, // secret => redacted
} = self;
f.debug_struct("UserDelegationKey")
.field("signed_oid", signed_oid)
.field("signed_tid", signed_tid)
.field("signed_start", signed_start)
.field("signed_expiry", signed_expiry)
.field("signed_service", signed_service)
.field("signed_version", signed_version)
.field("value", &"******")
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1594,6 +1693,29 @@ mod tests {
quick_xml::de::from_str(S).unwrap();
}

#[test]
fn test_delegation_key_expiry() {
let at = |s: &str| DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc);
let requested = at("2026-06-25T06:00:00Z");

// A well-formed granted expiry is honored (e.g. Azure clamped it shorter).
let key = UserDelegationKey {
signed_expiry: "2026-06-25T05:00:00Z".to_string(),
..Default::default()
};
assert_eq!(
delegation_key_expiry(&key, requested),
at("2026-06-25T05:00:00Z")
);

// An unparsable expiry falls back to the requested window.
let key = UserDelegationKey {
signed_expiry: "not a timestamp".to_string(),
..Default::default()
};
assert_eq!(delegation_key_expiry(&key, requested), requested);
}

#[cfg(feature = "reqwest")]
#[tokio::test]
async fn test_build_bulk_delete_body() {
Expand Down
2 changes: 1 addition & 1 deletion src/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<T> Default for TokenCache<T> {

impl<T: Clone + Send + Sync> TokenCache<T> {
/// Override the minimum remaining TTL for a cached token to be used
#[cfg(any(feature = "aws-base", feature = "gcp-base"))]
#[cfg(any(feature = "aws-base", feature = "azure-base", feature = "gcp-base"))]
pub(crate) fn with_min_ttl(self, min_ttl: Duration) -> Self {
Self { min_ttl, ..self }
}
Expand Down