Skip to content

Commit afbc9f2

Browse files
lym953claude
andcommitted
Add durable_function:true tag to enhanced metrics for Durable Function runtimes
When the PlatformInitStart payload contains a runtime_version field with "DurableFunction" (e.g. "python:3.14.DurableFunction.v6"), set the tag durable_function:true on all enhanced metrics generated by the extension. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent dfcb768 commit afbc9f2

File tree

4 files changed

+159
-6
lines changed

4 files changed

+159
-6
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,9 +824,12 @@ async fn handle_event_bus_event(
824824
Event::Telemetry(event) => {
825825
debug!("Telemetry event received: {:?}", event);
826826
match event.record {
827-
TelemetryRecord::PlatformInitStart { .. } => {
827+
TelemetryRecord::PlatformInitStart {
828+
runtime_version,
829+
..
830+
} => {
828831
if let Err(e) = invocation_processor_handle
829-
.on_platform_init_start(event.time)
832+
.on_platform_init_start(event.time, runtime_version)
830833
.await
831834
{
832835
error!("Failed to send platform init start to processor: {}", e);

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,13 @@ impl Processor {
284284
///
285285
/// This is used to create a cold start span, since this telemetry event does not
286286
/// provide a `request_id`, we try to guess which invocation is the cold start.
287-
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>) {
287+
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>, runtime_version: Option<String>) {
288+
if runtime_version
289+
.as_deref()
290+
.is_some_and(|rv| rv.contains("DurableFunction"))
291+
{
292+
self.enhanced_metrics.set_durable_function_tag();
293+
}
288294
let start_time: i64 = SystemTime::from(time)
289295
.duration_since(UNIX_EPOCH)
290296
.expect("time went backwards")
@@ -1714,6 +1720,108 @@ mod tests {
17141720
),
17151721
}
17161722

1723+
#[tokio::test]
1724+
async fn test_on_platform_init_start_sets_durable_function_tag() {
1725+
let mut processor = setup();
1726+
let time = Utc::now();
1727+
1728+
processor.on_platform_init_start(
1729+
time,
1730+
Some("python:3.14.DurableFunction.v6".to_string()),
1731+
);
1732+
1733+
let now: i64 = std::time::UNIX_EPOCH
1734+
.elapsed()
1735+
.expect("unable to poll clock, unrecoverable")
1736+
.as_secs()
1737+
.try_into()
1738+
.unwrap_or_default();
1739+
processor.enhanced_metrics.increment_invocation_metric(now);
1740+
1741+
let ts = (now / 10) * 10;
1742+
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
1743+
let entry = processor
1744+
.enhanced_metrics
1745+
.aggr_handle
1746+
.get_entry_by_id(
1747+
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
1748+
durable_tags,
1749+
ts,
1750+
)
1751+
.await
1752+
.unwrap();
1753+
assert!(
1754+
entry.is_some(),
1755+
"Expected durable_function:true tag on enhanced metric"
1756+
);
1757+
}
1758+
1759+
#[tokio::test]
1760+
async fn test_on_platform_init_start_no_durable_function_tag_for_regular_runtime() {
1761+
let mut processor = setup();
1762+
let time = Utc::now();
1763+
1764+
processor.on_platform_init_start(time, Some("python:3.12.v10".to_string()));
1765+
1766+
let now: i64 = std::time::UNIX_EPOCH
1767+
.elapsed()
1768+
.expect("unable to poll clock, unrecoverable")
1769+
.as_secs()
1770+
.try_into()
1771+
.unwrap_or_default();
1772+
processor.enhanced_metrics.increment_invocation_metric(now);
1773+
1774+
let ts = (now / 10) * 10;
1775+
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
1776+
let entry = processor
1777+
.enhanced_metrics
1778+
.aggr_handle
1779+
.get_entry_by_id(
1780+
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
1781+
durable_tags,
1782+
ts,
1783+
)
1784+
.await
1785+
.unwrap();
1786+
assert!(
1787+
entry.is_none(),
1788+
"Expected no durable_function:true tag for regular runtime"
1789+
);
1790+
}
1791+
1792+
#[tokio::test]
1793+
async fn test_on_platform_init_start_no_durable_function_tag_when_runtime_version_is_none() {
1794+
let mut processor = setup();
1795+
let time = Utc::now();
1796+
1797+
processor.on_platform_init_start(time, None);
1798+
1799+
let now: i64 = std::time::UNIX_EPOCH
1800+
.elapsed()
1801+
.expect("unable to poll clock, unrecoverable")
1802+
.as_secs()
1803+
.try_into()
1804+
.unwrap_or_default();
1805+
processor.enhanced_metrics.increment_invocation_metric(now);
1806+
1807+
let ts = (now / 10) * 10;
1808+
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
1809+
let entry = processor
1810+
.enhanced_metrics
1811+
.aggr_handle
1812+
.get_entry_by_id(
1813+
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
1814+
durable_tags,
1815+
ts,
1816+
)
1817+
.await
1818+
.unwrap();
1819+
assert!(
1820+
entry.is_none(),
1821+
"Expected no durable_function:true tag when runtime_version is None"
1822+
);
1823+
}
1824+
17171825
#[tokio::test]
17181826
async fn test_is_managed_instance_mode_returns_true() {
17191827
use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE;

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub enum ProcessorCommand {
4040
},
4141
PlatformInitStart {
4242
time: DateTime<Utc>,
43+
runtime_version: Option<String>,
4344
},
4445
PlatformInitReport {
4546
init_type: InitType,
@@ -140,9 +141,13 @@ impl InvocationProcessorHandle {
140141
pub async fn on_platform_init_start(
141142
&self,
142143
time: DateTime<Utc>,
144+
runtime_version: Option<String>,
143145
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
144146
self.sender
145-
.send(ProcessorCommand::PlatformInitStart { time })
147+
.send(ProcessorCommand::PlatformInitStart {
148+
time,
149+
runtime_version,
150+
})
146151
.await
147152
}
148153

@@ -454,8 +459,11 @@ impl InvocationProcessorService {
454459
ProcessorCommand::InvokeEvent { request_id } => {
455460
self.processor.on_invoke_event(request_id);
456461
}
457-
ProcessorCommand::PlatformInitStart { time } => {
458-
self.processor.on_platform_init_start(time);
462+
ProcessorCommand::PlatformInitStart {
463+
time,
464+
runtime_version,
465+
} => {
466+
self.processor.on_platform_init_start(time, runtime_version);
459467
}
460468
ProcessorCommand::PlatformInitReport {
461469
init_type,

bottlecap/src/metrics/enhanced/lambda.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ impl Lambda {
6161
.insert(String::from("runtime"), runtime.to_string());
6262
}
6363

64+
/// Sets the `durable_function:true` tag in `dynamic_value_tags`
65+
pub fn set_durable_function_tag(&mut self) {
66+
self.dynamic_value_tags
67+
.insert(String::from("durable_function"), String::from("true"));
68+
}
69+
6470
fn get_dynamic_value_tags(&self) -> Option<SortedTags> {
6571
let vec_tags: Vec<String> = self
6672
.dynamic_value_tags
@@ -838,6 +844,34 @@ mod tests {
838844
}
839845
}
840846

847+
#[tokio::test]
848+
async fn test_set_durable_function_tag() {
849+
let (metrics_aggr, my_config) = setup();
850+
let mut lambda = Lambda::new(metrics_aggr.clone(), my_config);
851+
let now: i64 = std::time::UNIX_EPOCH
852+
.elapsed()
853+
.expect("unable to poll clock, unrecoverable")
854+
.as_secs()
855+
.try_into()
856+
.unwrap_or_default();
857+
858+
lambda.set_durable_function_tag();
859+
lambda.increment_invocation_metric(now);
860+
861+
// Verify the metric was emitted with the durable_function:true tag
862+
let ts = (now / 10) * 10;
863+
let durable_tags = SortedTags::parse("durable_function:true").ok();
864+
let entry = metrics_aggr
865+
.get_entry_by_id(
866+
constants::INVOCATIONS_METRIC.into(),
867+
durable_tags,
868+
ts,
869+
)
870+
.await
871+
.unwrap();
872+
assert!(entry.is_some(), "Expected metric with durable_function:true tag");
873+
}
874+
841875
#[tokio::test]
842876
#[allow(clippy::float_cmp)]
843877
async fn test_increment_invocation_metric() {

0 commit comments

Comments
 (0)