Skip to content

Commit bbf5b04

Browse files
fix: Skip log message processing when logs are disabled (#831)
# What? Refactors telemetry client to be a method which receives parameters and subscribes accordingly Also gets events based on logging configuration # Why? We needed to ensure that we don't subscribe to logs events when customer is not enabling `DD_SERVERLESS_LOGS_ENABLED` --------- Co-authored-by: jordan gonzález <30836115+duncanista@users.noreply.github.com>
1 parent 8513d0c commit bbf5b04

4 files changed

Lines changed: 82 additions & 79 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tikv_jemallocator::Jemalloc;
1717
static GLOBAL: Jemalloc = Jemalloc;
1818

1919
use bottlecap::{
20-
DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG, TELEMETRY_PORT,
20+
DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG,
2121
appsec::processor::{
2222
Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor,
2323
},
@@ -48,7 +48,7 @@ use bottlecap::{
4848
provider::Provider as TagProvider,
4949
},
5050
telemetry::{
51-
client::TelemetryApiClient,
51+
self, TELEMETRY_PORT,
5252
events::{TelemetryEvent, TelemetryRecord},
5353
listener::TelemetryListener,
5454
},
@@ -474,9 +474,14 @@ async fn extension_loop_active(
474474

475475
let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await;
476476

477-
let telemetry_listener_cancel_token =
478-
setup_telemetry_client(&r.extension_id, &aws_config.runtime_api, logs_agent_channel)
479-
.await?;
477+
let telemetry_listener_cancel_token = setup_telemetry_client(
478+
client,
479+
&r.extension_id,
480+
&aws_config.runtime_api,
481+
logs_agent_channel,
482+
config.serverless_logs_enabled,
483+
)
484+
.await?;
480485

481486
let otlp_cancel_token = start_otlp_agent(
482487
config,
@@ -1107,9 +1112,11 @@ async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> Cancel
11071112
}
11081113

11091114
async fn setup_telemetry_client(
1115+
client: &Client,
11101116
extension_id: &str,
11111117
runtime_api: &str,
11121118
logs_agent_channel: Sender<TelemetryEvent>,
1119+
logs_enabled: bool,
11131120
) -> anyhow::Result<CancellationToken> {
11141121
let telemetry_listener =
11151122
TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_agent_channel);
@@ -1121,15 +1128,16 @@ async fn setup_telemetry_client(
11211128
}
11221129
});
11231130

1124-
let telemetry_client = TelemetryApiClient::new(
1125-
extension_id.to_string(),
1131+
telemetry::subscribe(
1132+
client,
1133+
runtime_api,
1134+
extension_id,
11261135
TELEMETRY_PORT,
1127-
runtime_api.to_string(),
1128-
);
1129-
telemetry_client
1130-
.subscribe()
1131-
.await
1132-
.map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?;
1136+
logs_enabled,
1137+
)
1138+
.await
1139+
.map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?;
1140+
11331141
Ok(cancel_token)
11341142
}
11351143

bottlecap/src/lib.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,3 @@ pub const FLUSH_RETRY_COUNT: usize = 3;
4545

4646
// todo: make sure we can override those with environment variables
4747
pub const DOGSTATSD_PORT: u16 = 8125;
48-
49-
pub const TELEMETRY_SUBSCRIPTION_ROUTE: &str = "2022-07-01/telemetry";
50-
// todo(astuyve) should be 8124 on /lambda/logs but
51-
// telemetry is implemented on a raw socket now and
52-
// does not multiplex routes on the same port.
53-
pub const TELEMETRY_PORT: u16 = 8999;

bottlecap/src/telemetry/client.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.

bottlecap/src/telemetry/mod.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,63 @@
1-
pub mod client;
1+
use reqwest::{Client, Response};
2+
use tracing::debug;
3+
4+
use crate::extension::{EXTENSION_ID_HEADER, base_url};
5+
26
pub mod events;
37
pub mod listener;
8+
9+
pub const TELEMETRY_SUBSCRIPTION_ROUTE: &str = "2022-07-01/telemetry";
10+
// todo(astuyve) should be 8124 on /lambda/logs but
11+
// telemetry is implemented on a raw socket now and
12+
// does not multiplex routes on the same port.
13+
pub const TELEMETRY_PORT: u16 = 8999;
14+
15+
const PLATFORM_ONLY_EVENTS: &[&str] = &["platform"];
16+
const ALL_EVENTS: &[&str] = &["platform", "extension", "function"];
17+
18+
/// Error conditions that can arise from extension operations
19+
#[derive(thiserror::Error, Debug)]
20+
pub enum ExtensionSubscriptionError {
21+
#[error("Subscription request failed: {0}")]
22+
HttpError(#[from] reqwest::Error),
23+
}
24+
25+
fn get_subscription_event_types(logs_enabled: bool) -> Vec<&'static str> {
26+
(if logs_enabled {
27+
ALL_EVENTS
28+
} else {
29+
PLATFORM_ONLY_EVENTS
30+
})
31+
.to_vec()
32+
}
33+
34+
pub async fn subscribe(
35+
client: &Client,
36+
runtime_api: &str,
37+
extension_id: &str,
38+
destination_port: u16,
39+
logs_enabled: bool,
40+
) -> Result<Response, ExtensionSubscriptionError> {
41+
let url = base_url(TELEMETRY_SUBSCRIPTION_ROUTE, runtime_api);
42+
let response = client
43+
.put(&url)
44+
.header(EXTENSION_ID_HEADER, extension_id)
45+
.json(&serde_json::json!({
46+
"schemaVersion": "2022-12-13",
47+
"destination": {
48+
"protocol": "HTTP",
49+
"URI": format!("http://sandbox:{}/", destination_port),
50+
},
51+
"types": get_subscription_event_types(logs_enabled),
52+
"buffering": { // TODO: re evaluate using default values
53+
"maxItems": 1000,
54+
"maxBytes": 256 * 1024,
55+
"timeoutMs": 25
56+
}
57+
}))
58+
.send()
59+
.await?;
60+
61+
debug!("Subscribed to Telemetry API: {:?}", response);
62+
Ok(response)
63+
}

0 commit comments

Comments
 (0)