Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.http.HttpRetryPolicy;
import datadog.communication.http.OkHttpUtils;
import datadog.trace.api.Config;
import datadog.trace.api.intake.Intake;
import datadog.trace.util.throwable.FatalAgentMisconfigurationError;
import javax.annotation.Nullable;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,10 +34,13 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
"Agentless mode is enabled and api key is not set. Please set application key");
}
String traceId = config.getIdGenerationStrategy().generateTraceId().toString();
OkHttpClient httpClient =
OkHttpUtils.buildHttpClient(
agentlessUrl, config.getCiVisibilityBackendApiTimeoutMillis());
return new IntakeApi(agentlessUrl, apiKey, traceId, retryPolicyFactory, httpClient, true);
return new IntakeApi(
agentlessUrl,
apiKey,
traceId,
retryPolicyFactory,
sharedCommunicationObjects.getIntakeHttpClient(),
true);
}

DDAgentFeaturesDiscovery featuresDiscovery =
Expand All @@ -55,7 +56,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
evpProxyUrl,
subdomain,
retryPolicyFactory,
sharedCommunicationObjects.okHttpClient,
sharedCommunicationObjects.agentHttpClient,
true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ public class SharedCommunicationObjects {
private final List<Runnable> pausedComponents = new ArrayList<>();
private volatile boolean paused;

public OkHttpClient okHttpClient;
/**
* HTTP client for making requests to Datadog agent. Depending on configuration, this client may
* use regular HTTP, UDS or named pipe.
*/
public OkHttpClient agentHttpClient;

/**
* HTTP client for making requests directly to Datadog backend. Unlike {@link #agentHttpClient},
* this client is not configured to use UDS or named pipe.
*/
private volatile OkHttpClient intakeHttpClient;

public long httpClientTimeout;
public boolean forceClearTextHttpForIntakeClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private volatile DDAgentFeaturesDiscovery featuresDiscovery;
Expand All @@ -45,18 +58,27 @@ public void createRemaining(Config config) {
if (monitoring == null) {
monitoring = Monitoring.DISABLED;
}

httpClientTimeout =
config.isCiVisibilityEnabled()
? config.getCiVisibilityBackendApiTimeoutMillis()
: TimeUnit.SECONDS.toMillis(config.getAgentTimeout());

forceClearTextHttpForIntakeClient = config.isForceClearTextHttpForIntakeClient();

if (agentUrl == null) {
agentUrl = parseAgentUrl(config);
if (agentUrl == null) {
throw new IllegalArgumentException("Bad agent URL: " + config.getAgentUrl());
}
}
if (okHttpClient == null) {

if (agentHttpClient == null) {
String unixDomainSocket = SocketUtils.discoverApmSocket(config);
String namedPipe = config.getAgentNamedPipe();
okHttpClient =
agentHttpClient =
OkHttpUtils.buildHttpClient(
agentUrl, unixDomainSocket, namedPipe, getHttpClientTimeout(config));
OkHttpUtils.isPlainHttp(agentUrl), unixDomainSocket, namedPipe, httpClientTimeout);
}
}

Expand Down Expand Up @@ -103,14 +125,6 @@ private static HttpUrl parseAgentUrl(Config config) {
return HttpUrl.parse(agentUrl);
}

private static long getHttpClientTimeout(Config config) {
if (!config.isCiVisibilityEnabled()) {
return TimeUnit.SECONDS.toMillis(config.getAgentTimeout());
} else {
return config.getCiVisibilityBackendApiTimeoutMillis();
}
}

public ConfigurationPoller configurationPoller(Config config) {
if (configurationPoller == null && config.isRemoteConfigEnabled()) {
configurationPoller = createPoller(config);
Expand All @@ -130,7 +144,7 @@ private ConfigurationPoller createPoller(Config config) {
configUrlSupplier = new RetryConfigUrlSupplier(this, config);
}
return new DefaultConfigurationPoller(
config, TRACER_VERSION, containerId, entityId, configUrlSupplier, okHttpClient);
config, TRACER_VERSION, containerId, entityId, configUrlSupplier, agentHttpClient);
}

// for testing
Expand All @@ -146,7 +160,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
createRemaining(config);
ret =
new DDAgentFeaturesDiscovery(
okHttpClient,
agentHttpClient,
monitoring,
agentUrl,
config.isTraceAgentV05Enabled(),
Expand Down Expand Up @@ -209,4 +223,20 @@ public String get() {
return this.configUrl;
}
}

public OkHttpClient getIntakeHttpClient() {
OkHttpClient client = this.intakeHttpClient;
if (client != null) {
return client;
}

synchronized (this) {
if (this.intakeHttpClient == null) {
this.intakeHttpClient =
OkHttpUtils.buildHttpClient(
forceClearTextHttpForIntakeClient, null, null, httpClientTimeout);
}
return this.intakeHttpClient;
}
}
Comment thread
AlexeyKuznetsov-DD marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ public final class OkHttpUtils {
SystemProperties.getOrDefault("java.vm.vendor", "unknown");

public static OkHttpClient buildHttpClient(final HttpUrl url, final long timeoutMillis) {
return buildHttpClient(url, null, null, timeoutMillis);
return buildHttpClient(isPlainHttp(url), null, null, timeoutMillis);
}

public static OkHttpClient buildHttpClient(
final HttpUrl url,
final boolean isHttp,
final String unixDomainSocketPath,
final String namedPipe,
final long timeoutMillis) {
return buildHttpClient(
unixDomainSocketPath,
namedPipe,
null,
url,
isHttp,
null,
null,
null,
Expand All @@ -95,7 +95,7 @@ public static OkHttpClient buildHttpClient(
discoverApmSocket(config),
config.getAgentNamedPipe(),
dispatcher,
url,
isPlainHttp(url),
retryOnConnectionFailure,
maxRunningRequests,
proxyHost,
Expand All @@ -111,7 +111,7 @@ private static OkHttpClient buildHttpClient(
final String unixDomainSocketPath,
final String namedPipe,
final Dispatcher dispatcher,
final HttpUrl url,
final boolean isHttp,
final Boolean retryOnConnectionFailure,
final Integer maxRunningRequests,
final String proxyHost,
Expand Down Expand Up @@ -151,7 +151,6 @@ private static OkHttpClient buildHttpClient(
log.debug("Using NamedPipe as http transport");
}

boolean isHttp = url != null && "http".equals(url.scheme());
if (isHttp) {
// force clear text when using http to avoid failures for JVMs without TLS
builder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
Expand Down Expand Up @@ -393,4 +392,8 @@ private static void closeQuietly(Response response) {
// ignore
}
}

public static boolean isPlainHttp(final HttpUrl url) {
return url != null && "http".equalsIgnoreCase(url.scheme());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SharedCommunicationsObjectsSpecification extends DDSpecification {
1 * config.agentTimeout >> 1
1 * config.agentUnixDomainSocket >> null
sco.agentUrl as String == 'http://example.com/'
sco.okHttpClient != null
sco.agentHttpClient != null
sco.monitoring.is(Monitoring.DISABLED)

when:
Expand Down Expand Up @@ -78,17 +78,20 @@ class SharedCommunicationsObjectsSpecification extends DDSpecification {
DDAgentFeaturesDiscovery agentFeaturesDiscovery = Mock()

sco.agentUrl = url
sco.okHttpClient = okHttpClient
sco.agentHttpClient = okHttpClient
sco.monitoring = monitoring
sco.featuresDiscovery = agentFeaturesDiscovery

when:
sco.createRemaining(config)

then:
1 * config.isCiVisibilityEnabled()
1 * config.getAgentTimeout()
1 * config.isForceClearTextHttpForIntakeClient()
0 * _
sco.agentUrl.is(url)
sco.okHttpClient.is(okHttpClient)
sco.agentHttpClient.is(okHttpClient)
sco.monitoring.is(monitoring)
sco.featuresDiscovery.is(agentFeaturesDiscovery)
}
Expand Down Expand Up @@ -124,4 +127,12 @@ class SharedCommunicationsObjectsSpecification extends DDSpecification {
1 * config.agentUnixDomainSocket >> null
sco.agentUrl as String == 'http://[2600:1f18:19c0:bd07:d55b::17]:8126/'
}

void 'creates intake http client'() {
when:
def client = sco.getIntakeHttpClient()

then:
client != null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static CiEnvironment buildCiEnvironment(Config config, SharedCommunicati
CiEnvironment remoteEnvironment =
new CiEnvironmentImpl(
getRemoteEnvironment(
remoteEnvVarsProviderUrl, remoteEnvVarsProviderKey, sco.okHttpClient));
remoteEnvVarsProviderUrl, remoteEnvVarsProviderKey, sco.agentHttpClient));
return new CompositeCiEnvironment(remoteEnvironment, localEnvironment);
} else {
return localEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public void setUp() throws URISyntaxException {
ss = gw.getSubscriptionService(RequestContextSlot.APPSEC);
SharedCommunicationObjects sharedCommunicationObjects = new SharedCommunicationObjects();
sharedCommunicationObjects.monitoring = Monitoring.DISABLED;
sharedCommunicationObjects.okHttpClient = new StubOkHttpClient();
sharedCommunicationObjects.agentHttpClient = new StubOkHttpClient();
sharedCommunicationObjects.setFeaturesDiscovery(
new StubDDAgentFeaturesDiscovery(sharedCommunicationObjects.okHttpClient));
new StubDDAgentFeaturesDiscovery(sharedCommunicationObjects.agentHttpClient));

AppSecSystem.start(ss, sharedCommunicationObjects);
uri = new URIDefaultDataAdapter(new URI("http://localhost:8080/test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class AppSecSystemSpecification extends DDSpecification {
poller
}
}
sco.okHttpClient = Stub(OkHttpClient)
sco.agentHttpClient = Stub(OkHttpClient)
sco.monitoring = Mock(Monitoring)
sco.featuresDiscovery = Stub(DDAgentFeaturesDiscovery)
sco
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ abstract class InstrumentationSpecification extends DDSpecification implements A

// emit traces to the APM Test-Agent for Cross-Tracer Testing Trace Checks
HttpUrl agentUrl = HttpUrl.get("http://" + agentHost + ":" + DEFAULT_TRACE_AGENT_PORT)
OkHttpClient client = buildHttpClient(agentUrl, null, null, TimeUnit.SECONDS.toMillis(DEFAULT_AGENT_TIMEOUT))
OkHttpClient client = buildHttpClient(true, null, null, TimeUnit.SECONDS.toMillis(DEFAULT_AGENT_TIMEOUT))
DDAgentFeaturesDiscovery featureDiscovery = new DDAgentFeaturesDiscovery(client, Monitoring.DISABLED, agentUrl, Config.get().isTraceAgentV05Enabled(), Config.get().isTracerMetricsEnabled())
TEST_AGENT_API = new DDAgentApi(client, agentUrl, featureDiscovery, Monitoring.DISABLED, Config.get().isTracerMetricsEnabled())
TEST_AGENT_WRITER = DDAgentWriter.builder().agentApi(TEST_AGENT_API).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class TracerConfig {
public static final String AGENT_UNIX_DOMAIN_SOCKET = "trace.agent.unix.domain.socket";
public static final String AGENT_NAMED_PIPE = "trace.pipe.name";
public static final String AGENT_TIMEOUT = "trace.agent.timeout";
public static final String FORCE_CLEAR_TEXT_HTTP_FOR_INTAKE_CLIENT =
"force.clear.text.http.for.intake.client";
public static final String PROXY_NO_PROXY = "proxy.no_proxy";
public static final String TRACE_AGENT_PATH = "trace.agent.path";
public static final String TRACE_AGENT_ARGS = "trace.agent.args";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public ConflatingMetricsAggregator(
sharedCommunicationObjects.featuresDiscovery(config),
healthMetrics,
new OkHttpSink(
sharedCommunicationObjects.okHttpClient,
sharedCommunicationObjects.agentHttpClient,
sharedCommunicationObjects.agentUrl.toString(),
V6_METRICS_ENDPOINT,
config.isTracerMetricsBufferingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public DDAgentWriter build() {
final HttpUrl agentUrl = HttpUrl.get("http://" + agentHost + ":" + traceAgentPort);
final OkHttpClient client =
null == featureDiscovery || null == agentApi
? buildHttpClient(agentUrl, unixDomainSocket, namedPipe, timeoutMillis)
? buildHttpClient(true, unixDomainSocket, namedPipe, timeoutMillis)
: null;
if (null == featureDiscovery) {
featureDiscovery =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static Writer createWriter(

DDAgentApi ddAgentApi =
new DDAgentApi(
commObjects.okHttpClient,
commObjects.agentHttpClient,
commObjects.agentUrl,
featuresDiscovery,
commObjects.monitoring,
Expand Down Expand Up @@ -202,7 +202,7 @@ private static RemoteApi createDDIntakeRemoteApi(

if (useProxyApi) {
return DDEvpProxyApi.builder()
.httpClient(commObjects.okHttpClient)
.httpClient(commObjects.agentHttpClient)
.agentUrl(commObjects.agentUrl)
.evpProxyEndpoint(featuresDiscovery.getEvpProxyEndpoint())
.trackType(trackType)
Expand All @@ -224,6 +224,7 @@ private static RemoteApi createDDIntakeRemoteApi(
}
return DDIntakeApi.builder()
.hostUrl(hostUrl)
.httpClient(commObjects.getIntakeHttpClient())
.apiKey(config.getApiKey())
.trackType(trackType)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public DefaultDataStreamsMonitoring(
Supplier<TraceConfig> traceConfigSupplier) {
this(
new OkHttpSink(
sharedCommunicationObjects.okHttpClient,
sharedCommunicationObjects.agentHttpClient,
sharedCommunicationObjects.agentUrl.toString(),
V01_DATASTREAMS_ENDPOINT,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TracerConnectionReliabilityTest extends DDSpecification {
properties.put("trace.agent.port", Integer.toString(agentContainerPort))
def sharedCommunicationObjects = new SharedCommunicationObjects()
sharedCommunicationObjects.agentUrl = HttpUrl.get("http://localhost:" + agentContainerPort)
sharedCommunicationObjects.okHttpClient = client
sharedCommunicationObjects.agentHttpClient = client
def fixedFeaturesDiscovery = new FixedTraceEndpointFeaturesDiscovery(sharedCommunicationObjects)
sharedCommunicationObjects.setFeaturesDiscovery(fixedFeaturesDiscovery)

Expand Down Expand Up @@ -147,7 +147,7 @@ class TracerConnectionReliabilityTest extends DDSpecification {

class FixedTraceEndpointFeaturesDiscovery extends DDAgentFeaturesDiscovery {
FixedTraceEndpointFeaturesDiscovery(SharedCommunicationObjects objects) {
super(objects.okHttpClient, Monitoring.DISABLED, objects.agentUrl, false, false)
super(objects.agentHttpClient, Monitoring.DISABLED, objects.agentUrl, false, false)
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WriterFactoryTest extends DDSpecification {

// Create SharedCommunicationObjects with mocked HTTP client
def sharedComm = new SharedCommunicationObjects()
sharedComm.okHttpClient = mockHttpClient
sharedComm.agentHttpClient = mockHttpClient
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
sharedComm.createRemaining(config)

Expand Down Expand Up @@ -127,7 +127,7 @@ class WriterFactoryTest extends DDSpecification {

// Create SharedCommunicationObjects with mocked HTTP client
def sharedComm = new SharedCommunicationObjects()
sharedComm.okHttpClient = mockHttpClient
sharedComm.agentHttpClient = mockHttpClient
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
sharedComm.createRemaining(config)

Expand Down
Loading