Skip to content

Commit de0e7a0

Browse files
lym953claude
andauthored
[SVLS-8581] feat: Add durable_function:true tag to enhanced metrics (#1048)
## Overview When the Lambda function's runtime version contains "RuntimeVersion", add the tag `durable_function:true` to enhanced metrics generated by the extension, so the facet `durable_function` appears on Serverless View. Sample value of runtime version: - Python: `python:3.14.DurableFunction.v9` - Node.js: `nodejs:24.DurableFunction.v10` ## Testing ### Steps 1. Build a layer and install it on a durable function 2. Invoke it 3. Check Serverless view ### Result 1. When I search by `durable_function:true`, the function appears in the result. <img width="446" height="210" alt="image" src="https://github.com/user-attachments/assets/2fa14641-fd37-4b27-90bb-85cfce2570e3" /> 2. A facet `durable_function` appears on the sidebar. <img width="437" height="151" alt="image" src="https://github.com/user-attachments/assets/2dcc4113-5caa-4376-a92e-ed54b29dd453" /> 3. When I search by `-durable_function:true`, other functions appear in the result. <img width="500" height="240" alt="image" src="https://github.com/user-attachments/assets/5b1308ff-5a7d-4c9f-9593-93dfb26d625d" /> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent c96c15c commit de0e7a0

File tree

4 files changed

+153
-7
lines changed

4 files changed

+153
-7
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,9 +824,11 @@ async fn handle_event_bus_event(
824824
Event::Telemetry(event) => {
825825
debug!("Telemetry event received: {:?}", event);
826826
match event.record {
827-
TelemetryRecord::PlatformInitStart { .. } => {
827+
TelemetryRecord::PlatformInitStart {
828+
runtime_version, ..
829+
} => {
828830
if let Err(e) = invocation_processor_handle
829-
.on_platform_init_start(event.time)
831+
.on_platform_init_start(event.time, runtime_version)
830832
.await
831833
{
832834
error!("Failed to send platform init start to processor: {}", e);

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,10 @@ impl Processor {
284284
///
285285
/// This is used to create a cold start span, since this telemetry event does not
286286
/// provide a `request_id`, we try to guess which invocation is the cold start.
287-
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>) {
287+
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>, runtime_version: Option<String>) {
288+
if runtime_version.as_deref().map_or(false, |rv| rv.contains("DurableFunction")) {
289+
self.enhanced_metrics.set_durable_function_tag();
290+
}
288291
let start_time: i64 = SystemTime::from(time)
289292
.duration_since(UNIX_EPOCH)
290293
.expect("time went backwards")
@@ -1714,6 +1717,105 @@ mod tests {
17141717
),
17151718
}
17161719

1720+
#[tokio::test]
1721+
async fn test_on_platform_init_start_sets_durable_function_tag() {
1722+
let mut processor = setup();
1723+
let time = Utc::now();
1724+
1725+
processor.on_platform_init_start(time, Some("python:3.14.DurableFunction.v6".to_string()));
1726+
1727+
let now: i64 = std::time::UNIX_EPOCH
1728+
.elapsed()
1729+
.expect("unable to poll clock, unrecoverable")
1730+
.as_secs()
1731+
.try_into()
1732+
.unwrap_or_default();
1733+
processor.enhanced_metrics.increment_invocation_metric(now);
1734+
1735+
let ts = (now / 10) * 10;
1736+
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
1737+
let entry = processor
1738+
.enhanced_metrics
1739+
.aggr_handle
1740+
.get_entry_by_id(
1741+
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
1742+
durable_tags,
1743+
ts,
1744+
)
1745+
.await
1746+
.unwrap();
1747+
assert!(
1748+
entry.is_some(),
1749+
"Expected durable_function:true tag on enhanced metric"
1750+
);
1751+
}
1752+
1753+
#[tokio::test]
1754+
async fn test_on_platform_init_start_no_durable_function_tag_for_regular_runtime() {
1755+
let mut processor = setup();
1756+
let time = Utc::now();
1757+
1758+
processor.on_platform_init_start(time, Some("python:3.12.v10".to_string()));
1759+
1760+
let now: i64 = std::time::UNIX_EPOCH
1761+
.elapsed()
1762+
.expect("unable to poll clock, unrecoverable")
1763+
.as_secs()
1764+
.try_into()
1765+
.unwrap_or_default();
1766+
processor.enhanced_metrics.increment_invocation_metric(now);
1767+
1768+
let ts = (now / 10) * 10;
1769+
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
1770+
let entry = processor
1771+
.enhanced_metrics
1772+
.aggr_handle
1773+
.get_entry_by_id(
1774+
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
1775+
durable_tags,
1776+
ts,
1777+
)
1778+
.await
1779+
.unwrap();
1780+
assert!(
1781+
entry.is_none(),
1782+
"Expected no durable_function:true tag for regular runtime"
1783+
);
1784+
}
1785+
1786+
#[tokio::test]
1787+
async fn test_on_platform_init_start_no_durable_function_tag_when_runtime_version_is_none() {
1788+
let mut processor = setup();
1789+
let time = Utc::now();
1790+
1791+
processor.on_platform_init_start(time, None);
1792+
1793+
let now: i64 = std::time::UNIX_EPOCH
1794+
.elapsed()
1795+
.expect("unable to poll clock, unrecoverable")
1796+
.as_secs()
1797+
.try_into()
1798+
.unwrap_or_default();
1799+
processor.enhanced_metrics.increment_invocation_metric(now);
1800+
1801+
let ts = (now / 10) * 10;
1802+
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
1803+
let entry = processor
1804+
.enhanced_metrics
1805+
.aggr_handle
1806+
.get_entry_by_id(
1807+
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
1808+
durable_tags,
1809+
ts,
1810+
)
1811+
.await
1812+
.unwrap();
1813+
assert!(
1814+
entry.is_none(),
1815+
"Expected no durable_function:true tag when runtime_version is None"
1816+
);
1817+
}
1818+
17171819
#[tokio::test]
17181820
async fn test_is_managed_instance_mode_returns_true() {
17191821
use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE;

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub enum ProcessorCommand {
4040
},
4141
PlatformInitStart {
4242
time: DateTime<Utc>,
43+
runtime_version: Option<String>,
4344
},
4445
PlatformInitReport {
4546
init_type: InitType,
@@ -140,9 +141,13 @@ impl InvocationProcessorHandle {
140141
pub async fn on_platform_init_start(
141142
&self,
142143
time: DateTime<Utc>,
144+
runtime_version: Option<String>,
143145
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
144146
self.sender
145-
.send(ProcessorCommand::PlatformInitStart { time })
147+
.send(ProcessorCommand::PlatformInitStart {
148+
time,
149+
runtime_version,
150+
})
146151
.await
147152
}
148153

@@ -454,8 +459,11 @@ impl InvocationProcessorService {
454459
ProcessorCommand::InvokeEvent { request_id } => {
455460
self.processor.on_invoke_event(request_id);
456461
}
457-
ProcessorCommand::PlatformInitStart { time } => {
458-
self.processor.on_platform_init_start(time);
462+
ProcessorCommand::PlatformInitStart {
463+
time,
464+
runtime_version,
465+
} => {
466+
self.processor.on_platform_init_start(time, runtime_version);
459467
}
460468
ProcessorCommand::PlatformInitReport {
461469
init_type,

bottlecap/src/metrics/enhanced/lambda.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ impl Lambda {
6161
.insert(String::from("runtime"), runtime.to_string());
6262
}
6363

64+
/// Sets the `durable_function:true` tag in `dynamic_value_tags`
65+
pub fn set_durable_function_tag(&mut self) {
66+
self.dynamic_value_tags
67+
.insert(String::from("durable_function"), String::from("true"));
68+
}
69+
6470
fn get_dynamic_value_tags(&self) -> Option<SortedTags> {
6571
let vec_tags: Vec<String> = self
6672
.dynamic_value_tags
@@ -146,10 +152,11 @@ impl Lambda {
146152
if !self.config.enhanced_metrics {
147153
return;
148154
}
155+
let tags = self.get_dynamic_value_tags();
149156
let metric = Metric::new(
150157
metric_name.into(),
151158
MetricValue::distribution(1f64),
152-
self.get_dynamic_value_tags(),
159+
tags,
153160
Some(timestamp),
154161
);
155162
if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) {
@@ -838,6 +845,33 @@ mod tests {
838845
}
839846
}
840847

848+
#[tokio::test]
849+
async fn test_set_durable_function_tag() {
850+
let (metrics_aggr, my_config) = setup();
851+
let mut lambda = Lambda::new(metrics_aggr.clone(), my_config);
852+
let now: i64 = std::time::UNIX_EPOCH
853+
.elapsed()
854+
.expect("unable to poll clock, unrecoverable")
855+
.as_secs()
856+
.try_into()
857+
.unwrap_or_default();
858+
859+
lambda.set_durable_function_tag();
860+
lambda.increment_invocation_metric(now);
861+
862+
// Verify the metric was emitted with the durable_function:true tag
863+
let ts = (now / 10) * 10;
864+
let durable_tags = SortedTags::parse("durable_function:true").ok();
865+
let entry = metrics_aggr
866+
.get_entry_by_id(constants::INVOCATIONS_METRIC.into(), durable_tags, ts)
867+
.await
868+
.unwrap();
869+
assert!(
870+
entry.is_some(),
871+
"Expected metric with durable_function:true tag"
872+
);
873+
}
874+
841875
#[tokio::test]
842876
#[allow(clippy::float_cmp)]
843877
async fn test_increment_invocation_metric() {

0 commit comments

Comments
 (0)