Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public void maybeSubscribeConfigPolling() {
} else {
subscribeConfigurationPoller();
}
} else {
} else if (!tracerConfig.isAwsServerless()) {
log.info("Remote config is disabled; AppSec will not be able to use it");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.config.inversion.ConfigHelper;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -89,13 +90,14 @@ static AgentScope enter(
return null;
}
String lambdaRequestId = awsContext.getAwsRequestId();
AgentSpanContext lambdaContext = AgentTracer.get().notifyExtensionStart(in, lambdaRequestId);
AgentSpanContext lambdaContext = AgentTracer.get().notifyLambdaStart(in, lambdaRequestId);
final AgentSpan span;
if (null == lambdaContext) {
span = startSpan(INVOCATION_SPAN_NAME);
} else {
span = startSpan(INVOCATION_SPAN_NAME, lambdaContext);
}
span.setSpanType(InternalSpanTypes.SERVERLESS);
span.setTag("request_id", lambdaRequestId);

final AgentScope scope = activateSpan(span);
Expand Down Expand Up @@ -123,6 +125,7 @@ static void exit(
}
String lambdaRequestId = awsContext.getAwsRequestId();

AgentTracer.get().notifyAppSecEnd(span);
span.finish();
AgentTracer.get().notifyExtensionEnd(span, result, null != throwable, lambdaRequestId);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
import static datadog.trace.api.gateway.Events.EVENTS

import datadog.trace.agent.test.naming.VersionedNamingTestBase
import java.nio.charset.StandardCharsets
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.function.TriConsumer
import datadog.trace.api.function.TriFunction
import datadog.trace.api.gateway.Flow
import datadog.trace.api.gateway.RequestContext
import datadog.trace.api.gateway.RequestContextSlot
import datadog.trace.bootstrap.ActiveSubsystems
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter
import com.amazonaws.services.lambda.runtime.Context
import java.nio.charset.StandardCharsets
import java.util.function.BiFunction
import java.util.function.Function
import java.util.function.Supplier

abstract class LambdaHandlerInstrumentationTest extends VersionedNamingTestBase {
def requestId = "test-request-id"
Expand All @@ -16,6 +30,53 @@ abstract class LambdaHandlerInstrumentationTest extends VersionedNamingTestBase
null
}

def ig
def appSecStarted = false
def capturedMethod = null
def capturedPath = null
def capturedHeaders = [:]
def capturedBody = null
def appSecEnded = false

def setup() {
ig = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC)
ActiveSubsystems.APPSEC_ACTIVE = true
appSecStarted = false
capturedMethod = null
capturedPath = null
capturedHeaders = [:]
capturedBody = null
appSecEnded = false
ig.registerCallback(EVENTS.requestStarted(), {
appSecStarted = true
new Flow.ResultFlow(new Object())
} as Supplier)
ig.registerCallback(EVENTS.requestMethodUriRaw(), { RequestContext ctx, String method, URIDataAdapter uri ->
capturedMethod = method
capturedPath = uri.path()
Flow.ResultFlow.empty()
} as TriFunction)
ig.registerCallback(EVENTS.requestHeader(), { RequestContext ctx, String name, String value ->
capturedHeaders[name] = value
} as TriConsumer)
ig.registerCallback(EVENTS.requestHeaderDone(), { RequestContext ctx ->
Flow.ResultFlow.empty()
} as Function)
ig.registerCallback(EVENTS.requestBodyProcessed(), { RequestContext ctx, Object body ->
capturedBody = body
Flow.ResultFlow.empty()
} as BiFunction)
ig.registerCallback(EVENTS.requestEnded(), { RequestContext ctx, Object spanInfo ->
appSecEnded = true
Flow.ResultFlow.empty()
} as BiFunction)
}

def cleanup() {
ig.reset()
ActiveSubsystems.APPSEC_ACTIVE = false
}

def "test lambda streaming handler"() {
when:
def input = new ByteArrayInputStream(StandardCharsets.UTF_8.encode("Hello").array())
Expand All @@ -30,6 +91,7 @@ abstract class LambdaHandlerInstrumentationTest extends VersionedNamingTestBase
trace(1) {
span {
operationName operation()
spanType DDSpanTypes.SERVERLESS
errored false
}
}
Expand All @@ -51,6 +113,7 @@ abstract class LambdaHandlerInstrumentationTest extends VersionedNamingTestBase
trace(1) {
span {
operationName operation()
spanType DDSpanTypes.SERVERLESS
errored true
tags {
tag "request_id", requestId
Expand All @@ -73,6 +136,114 @@ abstract class LambdaHandlerInstrumentationTest extends VersionedNamingTestBase
}
}
}

def "appsec callbacks are invoked for API Gateway v1 event"() {
given:
def eventJson = """{
"path": "/api/users/123",
"headers": {"content-type": "application/json", "x-forwarded-for": "203.0.113.1"},
"body": "{\\"key\\": \\"value\\"}",
"requestContext": {
"httpMethod": "GET",
"requestId": "req-abc",
"identity": {"sourceIp": "203.0.113.1"}
}
}"""

when:
def input = new ByteArrayInputStream(eventJson.getBytes(StandardCharsets.UTF_8))
def output = new ByteArrayOutputStream()
def ctx = Stub(Context) { getAwsRequestId() >> requestId }
new HandlerStreaming().handleRequest(input, output, ctx)

then:
appSecStarted
capturedMethod == "GET"
capturedPath == "/api/users/123"
capturedHeaders["content-type"] == "application/json"
capturedBody instanceof Map
appSecEnded
assertTraces(1) {
trace(1) {
span {
operationName operation()
spanType DDSpanTypes.SERVERLESS
errored false
}
}
}
}

def "appsec callbacks are invoked for API Gateway v2 HTTP event"() {
given:
def eventJson = """{
"version": "2.0",
"headers": {"content-type": "application/json", "accept": "application/json"},
"cookies": ["session=abc123"],
"body": "{\\"key\\": \\"value\\"}",
"requestContext": {
"http": {
"method": "POST",
"path": "/api/items",
"sourceIp": "198.51.100.1"
},
"domainName": "api.example.com"
}
}"""

when:
def input = new ByteArrayInputStream(eventJson.getBytes(StandardCharsets.UTF_8))
def output = new ByteArrayOutputStream()
def ctx = Stub(Context) { getAwsRequestId() >> requestId }
new HandlerStreaming().handleRequest(input, output, ctx)

then:
appSecStarted
capturedMethod == "POST"
capturedPath == "/api/items"
capturedHeaders["content-type"] == "application/json"
capturedHeaders["cookie"] == "session=abc123"
capturedBody instanceof Map
appSecEnded
assertTraces(1) {
trace(1) {
span {
operationName operation()
spanType DDSpanTypes.SERVERLESS
errored false
}
}
}
}

def "appsec callbacks are not invoked when appsec is disabled"() {
given:
ActiveSubsystems.APPSEC_ACTIVE = false

when:
def eventJson = """{
"path": "/api/test",
"requestContext": {"httpMethod": "GET", "requestId": "req-xyz"}
}"""
def input = new ByteArrayInputStream(eventJson.getBytes(StandardCharsets.UTF_8))
def output = new ByteArrayOutputStream()
def ctx = Stub(Context) { getAwsRequestId() >> requestId }
new HandlerStreaming().handleRequest(input, output, ctx)

then:
!appSecStarted
capturedMethod == null
!appSecEnded
assertTraces(1) {
trace(1) {
span {
operationName operation()
spanType DDSpanTypes.SERVERLESS
errored false
}
}
}
}
}


Expand Down
17 changes: 15 additions & 2 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import datadog.trace.core.taginterceptor.RuleFlags;
import datadog.trace.core.taginterceptor.TagInterceptor;
import datadog.trace.core.traceinterceptor.LatencyTraceInterceptor;
import datadog.trace.lambda.LambdaAppSecHandler;
import datadog.trace.lambda.LambdaHandler;
import datadog.trace.util.AgentTaskScheduler;
import java.io.IOException;
Expand Down Expand Up @@ -1195,8 +1196,15 @@ public void closeActive() {
}

@Override
public AgentSpanContext notifyExtensionStart(Object event, String lambdaRequestId) {
return LambdaHandler.notifyStartInvocation(event, lambdaRequestId);
public AgentSpanContext notifyLambdaStart(Object event, String lambdaRequestId) {
// Get context from AppSec
AgentSpanContext appSecContext = LambdaAppSecHandler.processRequestStart(event);

// Get context from extension
AgentSpanContext extensionContext = LambdaHandler.notifyStartInvocation(event, lambdaRequestId);

// Merge contexts
return LambdaAppSecHandler.mergeContexts(extensionContext, appSecContext);
}

@Override
Expand All @@ -1205,6 +1213,11 @@ public void notifyExtensionEnd(
LambdaHandler.notifyEndInvocation(span, result, isError, lambdaRequestId);
}

@Override
public void notifyAppSecEnd(AgentSpan span) {
LambdaAppSecHandler.processRequestEnd(span);
}

@Override
public AgentDataStreamsMonitoring getDataStreamsMonitoring() {
return dataStreamsMonitoring;
Expand Down
Loading
Loading