Skip to content

Commit 8522961

Browse files
authored
Merge branch 'main' into joey/handle_msk
2 parents 509a079 + 837d598 commit 8522961

10 files changed

Lines changed: 233 additions & 99 deletions

File tree

.github/workflows/rs_ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ jobs:
135135
- uses: actions-rust-lang/setup-rust-toolchain@150fca883cd4034361b621bd4e6a9d34e5143606 # v1.15.4
136136
with:
137137
cache: false
138-
- uses: taiki-e/install-action@94a7388bec5d4c8dd93e3ebf09e0ff448f3f6f4d # v2.68.35
138+
- uses: taiki-e/install-action@c12d62a803cbdfe2e7263af15f5a9548065cb4f2 # v2.69.3
139139
with:
140140
tool: nextest@0.9
141141
- uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9

bottlecap/src/tags/lambda/tags.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const FUNCTION_TAGS_KEY: &str = "_dd.tags.function";
4545
// TODO(astuyve) decide what to do with the version
4646
const EXTENSION_VERSION_KEY: &str = "dd_extension_version";
4747
// TODO(duncanista) figure out a better way to not hardcode this
48-
pub const EXTENSION_VERSION: &str = "93-next";
48+
pub const EXTENSION_VERSION: &str = "94-next";
4949

5050
const REGION_KEY: &str = "region";
5151
const ACCOUNT_ID_KEY: &str = "account_id";

bottlecap/src/traces/stats_flusher.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use tokio::sync::Mutex;
77
use tokio::sync::OnceCell;
88

9+
use crate::FLUSH_RETRY_COUNT;
910
use crate::config;
1011
use crate::lifecycle::invocation::processor::S_TO_MS;
1112
use crate::traces::http_client::HttpClient;
@@ -93,32 +94,36 @@ impl StatsFlusher {
9394

9495
let stats_url = trace_stats_url(&self.config.site);
9596

96-
let start = std::time::Instant::now();
97-
98-
let resp = stats_utils::send_stats_payload_with_client(
99-
serialized_stats_payload,
100-
endpoint,
101-
api_key.as_str(),
102-
Some(&self.http_client),
103-
)
104-
.await;
105-
let elapsed = start.elapsed();
106-
debug!(
107-
"STATS | Stats request to {} took {} ms",
108-
stats_url,
109-
elapsed.as_millis()
110-
);
111-
match resp {
112-
Ok(()) => {
113-
debug!("STATS | Successfully flushed stats");
114-
None
115-
}
116-
Err(e) => {
117-
// Network/server errors are temporary - return stats for retry
118-
error!("STATS | Error sending stats: {e:?}");
119-
Some(stats)
97+
for attempt in 1..=FLUSH_RETRY_COUNT {
98+
let start = std::time::Instant::now();
99+
let resp = stats_utils::send_stats_payload_with_client(
100+
serialized_stats_payload.clone(),
101+
endpoint,
102+
api_key.as_str(),
103+
Some(&self.http_client),
104+
)
105+
.await;
106+
let elapsed = start.elapsed();
107+
108+
match resp {
109+
Ok(()) => {
110+
debug!(
111+
"STATS | Successfully flushed stats to {stats_url} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT})",
112+
elapsed.as_millis()
113+
);
114+
return None;
115+
}
116+
Err(e) => {
117+
debug!(
118+
"STATS | Failed to send stats to {stats_url} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT}): {e:?}",
119+
elapsed.as_millis()
120+
);
121+
}
120122
}
121123
}
124+
125+
error!("STATS | Exhausted all {FLUSH_RETRY_COUNT} attempts, returning stats for redrive");
126+
Some(stats)
122127
}
123128

124129
/// Flushes stats from the aggregator.

bottlecap/src/traces/trace_flusher.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use libdd_common::Endpoint;
66
use libdd_trace_utils::{
77
config_utils::trace_intake_url_prefixed,
88
send_data::SendData,
9+
send_with_retry::{RetryBackoffType, RetryStrategy},
910
trace_utils::{self},
1011
tracer_payload::TracerPayloadCollection,
1112
};
@@ -14,11 +15,25 @@ use std::sync::Arc;
1415
use tokio::task::JoinSet;
1516
use tracing::{debug, error};
1617

18+
use crate::FLUSH_RETRY_COUNT;
1719
use crate::config::Config;
1820
use crate::lifecycle::invocation::processor::S_TO_MS;
1921
use crate::traces::http_client::HttpClient;
2022
use crate::traces::trace_aggregator_service::AggregatorHandle;
2123

24+
/// Retry strategy for trace flushing using the shared `FLUSH_RETRY_COUNT`
25+
/// with no delay between attempts. In Lambda, every millisecond of wall-clock
26+
/// time matters, and the per-attempt request timeout already bounds how long
27+
/// each retry can take.
28+
fn trace_retry_strategy() -> RetryStrategy {
29+
RetryStrategy::new(
30+
u32::try_from(FLUSH_RETRY_COUNT).unwrap_or(3),
31+
0,
32+
RetryBackoffType::Constant,
33+
None,
34+
)
35+
}
36+
2237
pub struct TraceFlusher {
2338
pub aggregator_handle: AggregatorHandle,
2439
pub config: Arc<Config>,
@@ -113,7 +128,11 @@ impl TraceFlusher {
113128
let traces_with_tags: Vec<_> = trace_builders
114129
.into_iter()
115130
.map(|info| {
116-
let trace = info.builder.with_api_key(api_key.as_str()).build();
131+
let trace = info
132+
.builder
133+
.with_api_key(api_key.as_str())
134+
.with_retry_strategy(trace_retry_strategy())
135+
.build();
117136
(trace, info.header_tags)
118137
})
119138
.collect();
@@ -125,12 +144,16 @@ impl TraceFlusher {
125144
let additional_traces: Vec<_> = traces_with_tags
126145
.iter()
127146
.filter_map(|(trace, tags)| match trace.get_payloads() {
128-
TracerPayloadCollection::V07(payloads) => Some(SendData::new(
129-
trace.len(),
130-
TracerPayloadCollection::V07(payloads.clone()),
131-
tags.to_tracer_header_tags(),
132-
&endpoint,
133-
)),
147+
TracerPayloadCollection::V07(payloads) => {
148+
let mut send_data = SendData::new(
149+
trace.len(),
150+
TracerPayloadCollection::V07(payloads.clone()),
151+
tags.to_tracer_header_tags(),
152+
&endpoint,
153+
);
154+
send_data.set_retry_strategy(trace_retry_strategy());
155+
Some(send_data)
156+
}
134157
// All payloads in the extension are V07 (produced by
135158
// collect_pb_trace_chunks), so this branch is unreachable.
136159
_ => None,
@@ -174,12 +197,23 @@ impl TraceFlusher {
174197
debug!("TRACES | Flushing {} traces", coalesced_traces.len());
175198

176199
for trace in &coalesced_traces {
177-
let send_result = trace.send(&http_client).await.last_result;
178-
179-
if let Err(e) = send_result {
180-
error!("TRACES | Request failed: {e:?}");
200+
let result = trace.send(&http_client).await;
201+
202+
if let Err(e) = &result.last_result {
203+
error!(
204+
"TRACES | Request failed after {} attempts ({} timeouts, {} network errors, {} status code errors): {e:?}",
205+
result.requests_count,
206+
result.errors_timeout,
207+
result.errors_network,
208+
result.errors_status_code,
209+
);
181210
return Some(coalesced_traces);
182211
}
212+
213+
debug!(
214+
"TRACES | Successfully sent trace ({} attempts, {} bytes)",
215+
result.requests_count, result.bytes_sent,
216+
);
183217
}
184218

185219
debug!("TRACES | Flushing took {} ms", start.elapsed().as_millis());

integration-tests/tests/lmi.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const identifier = getIdentifier();
99
const stackName = `integ-${identifier}-lmi`;
1010

1111
describe('LMI Integration Tests', () => {
12-
let results: Record<string, DatadogTelemetry[][]>;
12+
let telemetry: Record<string, DatadogTelemetry>;
1313

1414
beforeAll(async () => {
1515
const functions: FunctionConfig[] = runtimes.map(runtime => ({
@@ -20,13 +20,13 @@ describe('LMI Integration Tests', () => {
2020
console.log('Invoking LMI functions...');
2121

2222
// Invoke all LMI functions and collect telemetry
23-
results = await invokeAndCollectTelemetry(functions, 1);
23+
telemetry = await invokeAndCollectTelemetry(functions, 1);
2424

2525
console.log('LMI invocation and data fetching completed');
2626
}, 600000);
2727

2828
describe.each(runtimes)('%s Runtime with LMI', (runtime) => {
29-
const getResult = () => results[runtime]?.[0]?.[0];
29+
const getResult = () => telemetry[runtime]?.threads[0]?.[0];
3030

3131
it('should invoke Lambda successfully', () => {
3232
const result = getResult();

integration-tests/tests/on-demand.test.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { invokeAndCollectTelemetry, FunctionConfig } from './utils/default';
2-
import { DatadogTelemetry } from './utils/datadog';
2+
import { DatadogTelemetry, DURATION_METRICS } from './utils/datadog';
33
import { forceColdStart } from './utils/lambda';
44
import { getIdentifier } from '../config';
55

@@ -10,27 +10,25 @@ const identifier = getIdentifier();
1010
const stackName = `integ-${identifier}-on-demand`;
1111

1212
describe('On-Demand Integration Tests', () => {
13-
let results: Record<string, DatadogTelemetry[][]>;
13+
let telemetry: Record<string, DatadogTelemetry>;
1414

1515
beforeAll(async () => {
1616
const functions: FunctionConfig[] = runtimes.map(runtime => ({
1717
functionName: `${stackName}-${runtime}-lambda`,
1818
runtime,
1919
}));
2020

21-
// Force cold starts
2221
await Promise.all(functions.map(fn => forceColdStart(fn.functionName)));
2322

24-
// Add 5s delay between invocations to ensure warm container is reused
25-
// Required because there is post-runtime processing with 'end' flush strategy
26-
results = await invokeAndCollectTelemetry(functions, 2, 1, 5000);
23+
telemetry = await invokeAndCollectTelemetry(functions, 2, 1, 5000);
2724

2825
console.log('All invocations and data fetching completed');
2926
}, 600000);
3027

3128
describe.each(runtimes)('%s runtime', (runtime) => {
32-
const getFirstInvocation = () => results[runtime]?.[0]?.[0];
33-
const getSecondInvocation = () => results[runtime]?.[0]?.[1];
29+
const getTelemetry = () => telemetry[runtime];
30+
const getFirstInvocation = () => getTelemetry()?.threads[0]?.[0];
31+
const getSecondInvocation = () => getTelemetry()?.threads[0]?.[1];
3432

3533
describe('first invocation (cold start)', () => {
3634
it('should invoke Lambda successfully', () => {
@@ -74,7 +72,6 @@ describe('On-Demand Integration Tests', () => {
7472
});
7573
});
7674

77-
// Python has known issues with cold_start spans - mark as failing
7875
if (runtime === 'python') {
7976
it.failing('[failing] should have aws.lambda.cold_start span', () => {
8077
const result = getFirstInvocation();
@@ -151,5 +148,13 @@ describe('On-Demand Integration Tests', () => {
151148
expect(coldStartSpan).toBeUndefined();
152149
});
153150
});
151+
152+
describe.skip.each(DURATION_METRICS)('%s', (metric) => {
153+
it('should have points with positive values', () => {
154+
const points = getTelemetry().metrics[metric];
155+
expect(points.length).toBeGreaterThan(0);
156+
expect(points.every(p => p.value >= 0)).toBe(true);
157+
});
158+
});
154159
});
155160
});

integration-tests/tests/otlp.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const identifier = getIdentifier();
99
const stackName = `integ-${identifier}-otlp`;
1010

1111
describe('OTLP Integration Tests', () => {
12-
let results: Record<string, DatadogTelemetry[][]>;
12+
let telemetry: Record<string, DatadogTelemetry>;
1313

1414
beforeAll(async () => {
1515
// Build function configs for all runtimes plus response validation
@@ -27,13 +27,13 @@ describe('OTLP Integration Tests', () => {
2727
console.log('Invoking all OTLP Lambda functions...');
2828

2929
// Invoke all OTLP functions and collect telemetry
30-
results = await invokeAndCollectTelemetry(functions, 1, 1, 0, {}, DATADOG_INDEXING_WAIT_5_MIN_MS);
30+
telemetry = await invokeAndCollectTelemetry(functions, 1, 1, 0, {}, DATADOG_INDEXING_WAIT_5_MIN_MS);
3131

3232
console.log('All OTLP Lambda invocations and data fetching completed');
3333
}, 700000);
3434

3535
describe.each(runtimes)('%s Runtime', (runtime) => {
36-
const getResult = () => results[runtime]?.[0]?.[0];
36+
const getResult = () => telemetry[runtime]?.threads[0]?.[0];
3737

3838
it('should invoke Lambda successfully', () => {
3939
const result = getResult();
@@ -56,7 +56,7 @@ describe('OTLP Integration Tests', () => {
5656
});
5757

5858
describe('OTLP Response Validation', () => {
59-
const getResult = () => results['responseValidation']?.[0]?.[0];
59+
const getResult = () => telemetry['responseValidation']?.threads[0]?.[0];
6060

6161
it('should invoke response validation Lambda successfully', () => {
6262
const result = getResult();

integration-tests/tests/snapstart.test.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const identifier = getIdentifier();
1010
const stackName = `integ-${identifier}-snapstart`;
1111

1212
describe('Snapstart Integration Tests', () => {
13-
let results: Record<string, DatadogTelemetry[][]>;
13+
let telemetry: Record<string, DatadogTelemetry>;
1414

1515
beforeAll(async () => {
1616
// Publish new versions and wait for SnapStart optimization
@@ -43,20 +43,20 @@ describe('Snapstart Integration Tests', () => {
4343
// - Second invocation: warm (no snapstart_restore span)
4444
// - 5s delay ensures warm container reuse
4545
// - 2 threads for trace isolation testing
46-
results = await invokeAndCollectTelemetry(functions, 2, 2, 5000);
46+
telemetry = await invokeAndCollectTelemetry(functions, 2, 2, 5000);
4747

4848
console.log('All Snapstart Lambda invocations and data fetching completed');
4949
}, 900000);
5050

5151
describe.each(runtimes)('%s Runtime with SnapStart', (runtime) => {
5252
// With concurrency=2, invocations=2:
53-
// - results[runtime][0][0] = thread 0, first invocation (restore)
54-
// - results[runtime][0][1] = thread 0, second invocation (warm)
55-
// - results[runtime][1][0] = thread 1, first invocation (restore)
56-
// - results[runtime][1][1] = thread 1, second invocation (warm)
57-
const getRestoreInvocation = () => results[runtime]?.[0]?.[0];
58-
const getWarmInvocation = () => results[runtime]?.[0]?.[1];
59-
const getOtherThreadInvocation = () => results[runtime]?.[1]?.[0];
53+
// - telemetry[runtime].threads[0][0] = thread 0, first invocation (restore)
54+
// - telemetry[runtime].threads[0][1] = thread 0, second invocation (warm)
55+
// - telemetry[runtime].threads[1][0] = thread 1, first invocation (restore)
56+
// - telemetry[runtime].threads[1][1] = thread 1, second invocation (warm)
57+
const getRestoreInvocation = () => telemetry[runtime]?.threads[0]?.[0];
58+
const getWarmInvocation = () => telemetry[runtime]?.threads[0]?.[1];
59+
const getOtherThreadInvocation = () => telemetry[runtime]?.threads[1]?.[0];
6060

6161
describe('first invocation (restore from snapshot)', () => {
6262
it('should invoke successfully', () => {
@@ -150,10 +150,10 @@ describe('Snapstart Integration Tests', () => {
150150

151151
describe('trace isolation', () => {
152152
it('should have different trace IDs for all 4 invocations', () => {
153-
const thread0Restore = results[runtime]?.[0]?.[0];
154-
const thread0Warm = results[runtime]?.[0]?.[1];
155-
const thread1Restore = results[runtime]?.[1]?.[0];
156-
const thread1Warm = results[runtime]?.[1]?.[1];
153+
const thread0Restore = telemetry[runtime]?.threads[0]?.[0];
154+
const thread0Warm = telemetry[runtime]?.threads[0]?.[1];
155+
const thread1Restore = telemetry[runtime]?.threads[1]?.[0];
156+
const thread1Warm = telemetry[runtime]?.threads[1]?.[1];
157157

158158
expect(thread0Restore).toBeDefined();
159159
expect(thread0Warm).toBeDefined();

0 commit comments

Comments
 (0)