diff --git a/integration-tests/tests/lmi.test.ts b/integration-tests/tests/lmi.test.ts index f74178c89..e2574ed30 100644 --- a/integration-tests/tests/lmi.test.ts +++ b/integration-tests/tests/lmi.test.ts @@ -9,7 +9,7 @@ const identifier = getIdentifier(); const stackName = `integ-${identifier}-lmi`; describe('LMI Integration Tests', () => { - let results: Record; + let telemetry: Record; beforeAll(async () => { const functions: FunctionConfig[] = runtimes.map(runtime => ({ @@ -20,13 +20,13 @@ describe('LMI Integration Tests', () => { console.log('Invoking LMI functions...'); // Invoke all LMI functions and collect telemetry - results = await invokeAndCollectTelemetry(functions, 1); + telemetry = await invokeAndCollectTelemetry(functions, 1); console.log('LMI invocation and data fetching completed'); }, 600000); describe.each(runtimes)('%s Runtime with LMI', (runtime) => { - const getResult = () => results[runtime]?.[0]?.[0]; + const getResult = () => telemetry[runtime]?.threads[0]?.[0]; it('should invoke Lambda successfully', () => { const result = getResult(); diff --git a/integration-tests/tests/on-demand.test.ts b/integration-tests/tests/on-demand.test.ts index f4d6a930d..8c26d06ab 100644 --- a/integration-tests/tests/on-demand.test.ts +++ b/integration-tests/tests/on-demand.test.ts @@ -1,5 +1,5 @@ import { invokeAndCollectTelemetry, FunctionConfig } from './utils/default'; -import { DatadogTelemetry } from './utils/datadog'; +import { DatadogTelemetry, DURATION_METRICS } from './utils/datadog'; import { forceColdStart } from './utils/lambda'; import { getIdentifier } from '../config'; @@ -10,7 +10,7 @@ const identifier = getIdentifier(); const stackName = `integ-${identifier}-on-demand`; describe('On-Demand Integration Tests', () => { - let results: Record; + let telemetry: Record; beforeAll(async () => { const functions: FunctionConfig[] = runtimes.map(runtime => ({ @@ -18,19 +18,17 @@ describe('On-Demand Integration Tests', () => { runtime, })); - // Force cold starts await Promise.all(functions.map(fn => forceColdStart(fn.functionName))); - // Add 5s delay between invocations to ensure warm container is reused - // Required because there is post-runtime processing with 'end' flush strategy - results = await invokeAndCollectTelemetry(functions, 2, 1, 5000); + telemetry = await invokeAndCollectTelemetry(functions, 2, 1, 5000); console.log('All invocations and data fetching completed'); }, 600000); describe.each(runtimes)('%s runtime', (runtime) => { - const getFirstInvocation = () => results[runtime]?.[0]?.[0]; - const getSecondInvocation = () => results[runtime]?.[0]?.[1]; + const getTelemetry = () => telemetry[runtime]; + const getFirstInvocation = () => getTelemetry()?.threads[0]?.[0]; + const getSecondInvocation = () => getTelemetry()?.threads[0]?.[1]; describe('first invocation (cold start)', () => { it('should invoke Lambda successfully', () => { @@ -74,7 +72,6 @@ describe('On-Demand Integration Tests', () => { }); }); - // Python has known issues with cold_start spans - mark as failing if (runtime === 'python') { it.failing('[failing] should have aws.lambda.cold_start span', () => { const result = getFirstInvocation(); @@ -151,5 +148,13 @@ describe('On-Demand Integration Tests', () => { expect(coldStartSpan).toBeUndefined(); }); }); + + describe.skip.each(DURATION_METRICS)('%s', (metric) => { + it('should have points with positive values', () => { + const points = getTelemetry().metrics[metric]; + expect(points.length).toBeGreaterThan(0); + expect(points.every(p => p.value >= 0)).toBe(true); + }); + }); }); }); diff --git a/integration-tests/tests/otlp.test.ts b/integration-tests/tests/otlp.test.ts index 19dc2eba6..d910bcf6e 100644 --- a/integration-tests/tests/otlp.test.ts +++ b/integration-tests/tests/otlp.test.ts @@ -9,7 +9,7 @@ const identifier = getIdentifier(); const stackName = `integ-${identifier}-otlp`; describe('OTLP Integration Tests', () => { - let results: Record; + let telemetry: Record; beforeAll(async () => { // Build function configs for all runtimes plus response validation @@ -27,13 +27,13 @@ describe('OTLP Integration Tests', () => { console.log('Invoking all OTLP Lambda functions...'); // Invoke all OTLP functions and collect telemetry - results = await invokeAndCollectTelemetry(functions, 1, 1, 0, {}, DATADOG_INDEXING_WAIT_5_MIN_MS); + telemetry = await invokeAndCollectTelemetry(functions, 1, 1, 0, {}, DATADOG_INDEXING_WAIT_5_MIN_MS); console.log('All OTLP Lambda invocations and data fetching completed'); }, 700000); describe.each(runtimes)('%s Runtime', (runtime) => { - const getResult = () => results[runtime]?.[0]?.[0]; + const getResult = () => telemetry[runtime]?.threads[0]?.[0]; it('should invoke Lambda successfully', () => { const result = getResult(); @@ -56,7 +56,7 @@ describe('OTLP Integration Tests', () => { }); describe('OTLP Response Validation', () => { - const getResult = () => results['responseValidation']?.[0]?.[0]; + const getResult = () => telemetry['responseValidation']?.threads[0]?.[0]; it('should invoke response validation Lambda successfully', () => { const result = getResult(); diff --git a/integration-tests/tests/snapstart.test.ts b/integration-tests/tests/snapstart.test.ts index 8c6e2f619..ccbd74111 100644 --- a/integration-tests/tests/snapstart.test.ts +++ b/integration-tests/tests/snapstart.test.ts @@ -10,7 +10,7 @@ const identifier = getIdentifier(); const stackName = `integ-${identifier}-snapstart`; describe('Snapstart Integration Tests', () => { - let results: Record; + let telemetry: Record; beforeAll(async () => { // Publish new versions and wait for SnapStart optimization @@ -43,20 +43,20 @@ describe('Snapstart Integration Tests', () => { // - Second invocation: warm (no snapstart_restore span) // - 5s delay ensures warm container reuse // - 2 threads for trace isolation testing - results = await invokeAndCollectTelemetry(functions, 2, 2, 5000); + telemetry = await invokeAndCollectTelemetry(functions, 2, 2, 5000); console.log('All Snapstart Lambda invocations and data fetching completed'); }, 900000); describe.each(runtimes)('%s Runtime with SnapStart', (runtime) => { // With concurrency=2, invocations=2: - // - results[runtime][0][0] = thread 0, first invocation (restore) - // - results[runtime][0][1] = thread 0, second invocation (warm) - // - results[runtime][1][0] = thread 1, first invocation (restore) - // - results[runtime][1][1] = thread 1, second invocation (warm) - const getRestoreInvocation = () => results[runtime]?.[0]?.[0]; - const getWarmInvocation = () => results[runtime]?.[0]?.[1]; - const getOtherThreadInvocation = () => results[runtime]?.[1]?.[0]; + // - telemetry[runtime].threads[0][0] = thread 0, first invocation (restore) + // - telemetry[runtime].threads[0][1] = thread 0, second invocation (warm) + // - telemetry[runtime].threads[1][0] = thread 1, first invocation (restore) + // - telemetry[runtime].threads[1][1] = thread 1, second invocation (warm) + const getRestoreInvocation = () => telemetry[runtime]?.threads[0]?.[0]; + const getWarmInvocation = () => telemetry[runtime]?.threads[0]?.[1]; + const getOtherThreadInvocation = () => telemetry[runtime]?.threads[1]?.[0]; describe('first invocation (restore from snapshot)', () => { it('should invoke successfully', () => { @@ -150,10 +150,10 @@ describe('Snapstart Integration Tests', () => { describe('trace isolation', () => { it('should have different trace IDs for all 4 invocations', () => { - const thread0Restore = results[runtime]?.[0]?.[0]; - const thread0Warm = results[runtime]?.[0]?.[1]; - const thread1Restore = results[runtime]?.[1]?.[0]; - const thread1Warm = results[runtime]?.[1]?.[1]; + const thread0Restore = telemetry[runtime]?.threads[0]?.[0]; + const thread0Warm = telemetry[runtime]?.threads[0]?.[1]; + const thread1Restore = telemetry[runtime]?.threads[1]?.[0]; + const thread1Warm = telemetry[runtime]?.threads[1]?.[1]; expect(thread0Restore).toBeDefined(); expect(thread0Warm).toBeDefined(); diff --git a/integration-tests/tests/utils/datadog.ts b/integration-tests/tests/utils/datadog.ts index fc431ea1d..ed3768ea0 100644 --- a/integration-tests/tests/utils/datadog.ts +++ b/integration-tests/tests/utils/datadog.ts @@ -55,6 +55,11 @@ function formatDatadogError(error: unknown, query: string): string { } export interface DatadogTelemetry { + threads: InvocationTracesLogs[][]; // [thread][invocation] + metrics: EnhancedMetrics; +} + +export interface InvocationTracesLogs { requestId: string; statusCode?: number; traces?: DatadogTrace[]; @@ -78,6 +83,21 @@ export interface DatadogLog { tags: string[]; } +export const DURATION_METRICS = [ + 'aws.lambda.enhanced.runtime_duration', + 'aws.lambda.enhanced.billed_duration', + 'aws.lambda.enhanced.duration', + 'aws.lambda.enhanced.post_runtime_duration', + 'aws.lambda.enhanced.init_duration', +]; + +export type EnhancedMetrics = Record; + +export interface MetricPoint { + timestamp: number; + value: number; +} + /** * Extracts the base service name from a function name by stripping any * version qualifier (:N) or alias qualifier (:alias) @@ -90,7 +110,7 @@ function getServiceName(functionName: string): string { return functionName.substring(0, colonIndex); } -export async function getDatadogTelemetryByRequestId(functionName: string, requestId: string): Promise { +export async function getInvocationTracesLogsByRequestId(functionName: string, requestId: string): Promise { const serviceName = getServiceName(functionName); const traces = await getTraces(serviceName, requestId); const logs = await getLogs(serviceName, requestId); @@ -107,16 +127,14 @@ export async function getTraces( requestId: string, ): Promise { const now = Date.now(); - const fromTime = now - (1 * 60 * 60 * 1000); // 1 hour ago + const fromTime = now - (1 * 60 * 60 * 1000); const toTime = now; - // Convert service name to lowercase as Datadog stores it that way const serviceNameLower = serviceName.toLowerCase(); const query = `service:${serviceNameLower} @request_id:${requestId}`; try { console.log(`Searching for traces: ${query}`); - // First, find spans matching the request_id to get trace IDs const initialResponse = await datadogClient.post('/api/v2/spans/events/search', { data: { type: 'search_request', @@ -137,7 +155,6 @@ export async function getTraces( const initialSpans = initialResponse.data.data || []; console.log(`Found ${initialSpans.length} initial span(s)`); - // Extract unique trace IDs const traceIds = new Set(); for (const spanData of initialSpans) { const traceId = spanData.attributes?.trace_id; @@ -148,7 +165,6 @@ export async function getTraces( console.log(`Found ${traceIds.size} unique trace(s)`); - // Now fetch all spans for each trace ID const allSpans: any[] = []; for (const traceId of traceIds) { const traceResponse = await datadogClient.post('/api/v2/spans/events/search', { @@ -171,7 +187,6 @@ export async function getTraces( allSpans.push(...traceSpans); } - // Group spans by trace_id to reconstruct traces const traceMap = new Map(); for (const spanData of allSpans) { @@ -190,7 +205,6 @@ export async function getTraces( } } - // Convert map to array of traces const traces: DatadogTrace[] = []; for (const [traceId, spans] of traceMap.entries()) { traces.push({ @@ -216,7 +230,7 @@ export async function getLogs( requestId: string, ): Promise { const now = Date.now(); - const fromTime = now - (2 * 60 * 60 * 1000); // 2 hours ago + const fromTime = now - (2 * 60 * 60 * 1000); const toTime = now; const query = `service:${serviceName} @lambda.request_id:${requestId}`; @@ -237,7 +251,6 @@ export async function getLogs( const rawLogs = response.data.data || []; console.log(`Found ${rawLogs.length} log(s)`); - // Transform raw logs to DatadogLog format const logs: DatadogLog[] = rawLogs.map((logData: any) => { const attrs = logData.attributes || {}; return { @@ -255,3 +268,55 @@ export async function getLogs( throw error; } } + +export async function getEnhancedMetrics( + functionName: string, + fromTime: number, + toTime: number +): Promise { + const promises = DURATION_METRICS.map(async (metricName) => { + const points = await getMetrics(metricName, functionName, fromTime, toTime); + return { metricName, points }; + }); + + const results = await Promise.all(promises); + + const metrics: EnhancedMetrics = {}; + for (const { metricName, points } of results) { + metrics[metricName] = points; + } + + return metrics; +} + +async function getMetrics( + metricName: string, + functionName: string, + fromTime: number, + toTime: number +): Promise { + const baseFunctionName = getServiceName(functionName).toLowerCase(); + const query = `avg:${metricName}{functionname:${baseFunctionName}}`; + + console.log(`Querying metrics: ${query}`); + + const response = await datadogClient.get('/api/v1/query', { + params: { + query, + from: Math.floor(fromTime / 1000), + to: Math.floor(toTime / 1000), + }, + }); + + const series = response.data.series || []; + console.log(`Found ${series.length} series for ${metricName}`); + + if (series.length === 0) { + return []; + } + + return (series[0].pointlist || []).map((p: [number, number]) => ({ + timestamp: p[0], + value: p[1], + })); +} diff --git a/integration-tests/tests/utils/default.ts b/integration-tests/tests/utils/default.ts index ebe4ba76e..dc6ea38e6 100644 --- a/integration-tests/tests/utils/default.ts +++ b/integration-tests/tests/utils/default.ts @@ -1,5 +1,10 @@ import { invokeLambda, InvocationResult } from './lambda'; -import { getDatadogTelemetryByRequestId, DatadogTelemetry } from './datadog'; +import { + getInvocationTracesLogsByRequestId, + InvocationTracesLogs, + DatadogTelemetry, + getEnhancedMetrics, +} from './datadog'; import { DEFAULT_DATADOG_INDEXING_WAIT_MS } from '../../config'; export interface FunctionConfig { @@ -26,7 +31,6 @@ async function invokeThread( const result = await invokeLambda(functionName, payload); results.push(result); - // Delay between requests (but not after the last one) if (delayBetweenRequestsMs > 0 && i < numInvocations - 1) { await sleep(delayBetweenRequestsMs); } @@ -36,16 +40,14 @@ async function invokeThread( } /** - * Invokes multiple Lambda functions using concurrent threads. - * Each function gets `concurrency` threads, each doing `invocations` sequential requests. + * Invokes multiple Lambda functions and collects all telemetry (traces, logs, metrics). + * Returns DatadogTelemetry per runtime, which includes per-invocation data and aggregated metrics. * - * Returns results keyed by runtime, where each value is a list of lists - * (one per thread, containing telemetry in request order). - * - * Example: functions=[{node, fn1}, {python, fn2}], invocations=5, concurrency=2 - * node: Thread 0: 5 requests, Thread 1: 5 requests - * python: Thread 0: 5 requests, Thread 1: 5 requests - * Returns: { node: [[t0], [t1]], python: [[t0], [t1]] } + * Example: functions=[{node, fn1}, {python, fn2}], invocations=2 + * Returns: { + * node: { invocations: [inv1, inv2], metrics: { duration: {...} } }, + * python: { invocations: [inv1, inv2], metrics: { duration: {...} } } + * } */ export async function invokeAndCollectTelemetry( functions: FunctionConfig[], @@ -54,8 +56,9 @@ export async function invokeAndCollectTelemetry( delayBetweenRequestsMs: number = 0, payload: any = {}, datadogIndexingWaitMs: number = DEFAULT_DATADOG_INDEXING_WAIT_MS, -): Promise> { - // Start all threads for all functions in parallel +): Promise> { + const invocationStartTime = Date.now(); + const allPromises: { runtime: string; functionName: string; promise: Promise }[] = []; for (const fn of functions) { @@ -68,7 +71,6 @@ export async function invokeAndCollectTelemetry( } } - // Wait for all invocations to complete const resolvedResults = await Promise.all( allPromises.map(async (p) => ({ runtime: p.runtime, @@ -77,22 +79,22 @@ export async function invokeAndCollectTelemetry( })) ); - // Wait for Datadog indexing await sleep(datadogIndexingWaitMs); - // Fetch telemetry and organize by runtime - const telemetry: Record = {}; + const metricsEndTime = Date.now(); + + const telemetryByRuntime: Record = {}; for (const { runtime, functionName, results } of resolvedResults) { - if (!telemetry[runtime]) { - telemetry[runtime] = []; + if (!telemetryByRuntime[runtime]) { + telemetryByRuntime[runtime] = []; } - const threadTelemetry: DatadogTelemetry[] = []; + const threadTelemetry: InvocationTracesLogs[] = []; for (const inv of results) { try { - const data = await getDatadogTelemetryByRequestId(functionName, inv.requestId); + const data = await getInvocationTracesLogsByRequestId(functionName, inv.requestId); data.statusCode = inv.statusCode; threadTelemetry.push(data); } catch (err) { @@ -106,9 +108,32 @@ export async function invokeAndCollectTelemetry( } } - telemetry[runtime].push(threadTelemetry); + telemetryByRuntime[runtime].push(threadTelemetry); + } + + const runtimesWithFunctions = functions.map(fn => ({ + runtime: fn.runtime, + functionName: fn.functionName, + })); + + const metricsPromises = runtimesWithFunctions.map(async ({ runtime, functionName }) => { + const metrics = await getEnhancedMetrics(functionName, invocationStartTime, metricsEndTime); + return { runtime, metrics }; + }); + + const metricsResults = await Promise.all(metricsPromises); + + const result: Record = {}; + + for (const fn of functions) { + const threads = telemetryByRuntime[fn.runtime] || []; + const metricsResult = metricsResults.find(m => m.runtime === fn.runtime)!; + result[fn.runtime] = { + threads, + metrics: metricsResult.metrics, + }; } console.log(`Collected telemetry for ${functions.length} functions`); - return telemetry; + return result; }