Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import datadog.environment.JavaVirtualMachine
import datadog.trace.agent.test.server.http.TestHttpServer.HandlerApi.RequestApi
import datadog.trace.api.config.GeneralConfig
import datadog.trace.test.util.Flaky
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.RandomAccessFile
import java.nio.charset.StandardCharsets
import java.util.zip.GZIPInputStream
import spock.lang.AutoCleanup
Expand Down Expand Up @@ -53,6 +56,10 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest {
@AutoCleanup
MockBackend mockBackend = new MockBackend()

// Captured for inclusion in failure diagnostics so we don't have to dig through Gradle reports.
@Shared
String launchCommand

def setup() {
mockBackend.reset()
}
Expand Down Expand Up @@ -114,7 +121,8 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest {
command.addAll(additionalArguments())
command.addAll((String[]) ["-jar", loggingJar])

println "COMMANDS: " + command.join(" ")
launchCommand = command.join(" ")
println "COMMANDS: " + launchCommand
ProcessBuilder processBuilder = new ProcessBuilder(command)
processBuilder.directory(new File(buildDirectory))

Expand Down Expand Up @@ -349,8 +357,12 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest {

/**
* Like {@link AbstractSmokeTest#waitForTraceCount} but checks process liveness on every poll
* iteration and dumps diagnostic state on failure, so CI failures produce actionable output
* instead of a bare "Condition not satisfied" after a 30s timeout.
* iteration and dumps comprehensive diagnostic state on failure. The previous iteration of
* this method narrowed the flake to "process alive, RC polling, captured stdout empty" but
* could not pinpoint where the JVM was wedged — see {@link #captureFullDiagnostic} for the
* extended state captured here, designed to discriminate between a class-load deadlock,
* stuck @Trace advice, broken trace writer, no-op tracer, dead OutputThreads writer, and a
* stalled stdout pipe in a single failure.
*/
int waitForTraceCountAlive(int count) {
try {
Expand All @@ -364,41 +376,247 @@ abstract class LogInjectionSmokeTest extends AbstractSmokeTest {
return
}
if (testedProcess != null && !testedProcess.isAlive()) {
def lastLines = tailProcessLog(20)
// RuntimeException (not AssertionError) so PollingConditions propagates
// immediately instead of retrying for the full timeout.
throw new RuntimeException(
"Process exited with code ${testedProcess.exitValue()} while waiting for ${count} traces " +
"(received ${traceCount.get()}, RC polls: ${rcClientMessages.size()}).\n" +
"Last process output:\n${lastLines}")
"Process exited while waiting for ${count} traces.\n" +
captureFullDiagnostic(count))
}
assert traceCount.get() >= count
}
} catch (AssertionError e) {
// The default error ("Condition not satisfied after 30s") is useless — enrich with diagnostic state
def alive = testedProcess?.isAlive()
def lastLines = tailProcessLog(30)
// The default error ("Condition not satisfied after 30s") is useless — enrich with diagnostic state.
throw new AssertionError(
"Timed out waiting for ${count} traces after ${defaultPoll.timeout}s. " +
"traceCount=${traceCount.get()}, process.alive=${alive}, " +
"RC polls received: ${rcClientMessages.size()}.\n" +
"Last process output:\n${lastLines}", e)
"Timed out waiting for ${count} traces after ${defaultPoll.timeout}s.\n" +
captureFullDiagnostic(count), e)
}
traceCount.get()
}

private String tailProcessLog(int lines) {
/**
* Comprehensive diagnostic capture for the failure path. Triggers a SIGQUIT thread dump on
* the live process, waits briefly for it to flow through the captured stdout pipe, then
* collects everything needed to discriminate the remaining hypotheses for this flake:
* captured stdout file size + tail (with thread dump if the OutputThreads writer is alive),
* the application logger file (logback/log4j2/JBoss target via {@code dd.test.logfile}),
* OutputThreads writer-thread health, and a {@code jcmd Thread.print} fallback that works
* even if the writer thread is dead.
*/
private String captureFullDiagnostic(int targetCount) {
boolean alive = testedProcess != null && testedProcess.isAlive()
long pid = -1L
if (testedProcess != null) {
// Process.pid() is Java 9+; the test runner JVM may be Java 8 (zulu8). Fall back gracefully.
try {
pid = testedProcess.pid()
} catch (Throwable ignored) {
// Java 8 — SIGQUIT/jcmd paths get skipped; we still capture file state + thread groups.
}
}

// Trigger SIGQUIT for an in-process thread dump. The JVM writes the dump to its stderr,
// which redirectErrorStream(true) merges into the captured stdout pipe. Sleep briefly to
// let the writer thread drain it before we re-read the file. No-op (and harmless) on
// Windows; in CI this runs on Linux containers.
if (alive && pid > 0) {
try {
["kill", "-3", "${pid}".toString()].execute().waitFor(2, SECONDS)
} catch (Throwable ignored) {
// best-effort
}
try {
Thread.sleep(1500)
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt()
}
}

def sb = new StringBuilder()
sb << "pid=${pid} alive=${alive}"
if (testedProcess != null && !alive) {
try {
sb << " exitValue=${testedProcess.exitValue()}"
} catch (Throwable ignored) {
// exitValue may not be available
}
}
sb << "\n"
sb << "rcPolls=${rcClientMessages.size()} traceCount=${traceCount.get()} target=${targetCount}\n"
if (rcClientDecodingFailure != null) {
sb << "rcDecodingFailure=${rcClientDecodingFailure}\n"
}
if (traceDecodingFailure != null) {
sb << "traceDecodingFailure=${traceDecodingFailure}\n"
}
sb << "launchCommand=${launchCommand}\n"
sb << "outputThreads=${describeOutputThreadGroup()}\n"

def stdoutFile = new File(logFilePath)
if (stdoutFile.exists()) {
sb << "capturedStdout=${stdoutFile.absolutePath} size=${stdoutFile.length()} mtime=${new Date(stdoutFile.lastModified())}\n"
sb << "--- captured stdout (last 60 lines, post-SIGQUIT) ---\n"
sb << tailFile(stdoutFile, 60)
sb << "\n"
} else {
sb << "capturedStdout=${logFilePath} (does not exist)\n"
}

if (outputLogFile != null && outputLogFile.exists()) {
sb << "appLogFile=${outputLogFile.absolutePath} size=${outputLogFile.length()}\n"
sb << "--- app log (last 30 lines) ---\n"
sb << tailFile(outputLogFile, 30)
sb << "\n"
} else if (outputLogFile != null) {
sb << "appLogFile=${outputLogFile.absolutePath} (does not exist)\n"
}

if (pid > 0) {
sb << "--- jcmd Thread.print (fallback, works even if OutputThreads writer is dead) ---\n"
sb << jcmdThreadPrint(pid)
sb << "\n"
}
return sb.toString()
}

private String describeOutputThreadGroup() {
// OutputThreads creates threads in a group named "smoke-output" — see OutputThreads.java.
ThreadGroup root = Thread.currentThread().threadGroup
while (root.parent != null) {
root = root.parent
}
Thread[] threads = new Thread[root.activeCount() + 16]
int n = root.enumerate(threads, true)
def details = []
for (int i = 0; i < n; i++) {
def t = threads[i]
if (t == null) {
continue
}
ThreadGroup tg = t.threadGroup
if (tg != null && tg.name == "smoke-output") {
details << "${t.name}/${t.state}/alive=${t.alive}"
}
}
return "smoke-output threads=${details.size()} [${details.join(", ")}]"
}

// Cap on diagnostic chunks so a wedged JVM with a giant thread dump can't OOM the Gradle
// worker or produce an unreadable test report. ~32KB is enough for typical thread dumps.
private static final int DIAG_TAIL_BYTES = 32 * 1024

/**
* Returns the last {@code lines} lines (or up to {@link #DIAG_TAIL_BYTES} from the file end,
* whichever is smaller) without loading the whole file into memory. Important on the failure
* path because we have just appended a full JVM thread dump to the captured stdout file via
* SIGQUIT — {@code readLines()} on that file could OOM the Gradle worker on a wedged JVM with
* a large dump.
*/
private String tailFile(File f, int lines) {
try (RandomAccessFile raf = new RandomAccessFile(f, "r")) {
long len = raf.length()
long start = Math.max(0L, len - DIAG_TAIL_BYTES)
raf.seek(start)
byte[] buf = new byte[(int) (len - start)]
raf.readFully(buf)
String chunk = new String(buf, StandardCharsets.UTF_8)
// If we started mid-line, drop the partial first line so the tail reads cleanly.
if (start > 0) {
int nl = chunk.indexOf('\n')
if (nl >= 0) {
chunk = chunk.substring(nl + 1)
}
}
String[] all = chunk.split("\\R", -1)
int from = Math.max(0, all.length - lines)
def out = new StringBuilder()
if (start > 0) {
out << "...(truncated to last ${len - start} bytes)...\n"
}
for (int i = from; i < all.length; i++) {
if (i > from) {
out << "\n"
}
out << all[i]
}
return out.toString()
} catch (Throwable e) {
return "(failed to read ${f.name}: ${e.message})"
}
}

private String jcmdThreadPrint(long pid) {
String jcmdPath = resolveJcmdPath()
if (jcmdPath == null) {
return "(jcmd not found on java.home; skipping)"
}
try {
def logFile = new File(logFilePath)
if (!logFile.exists()) {
return "(log file does not exist: ${logFilePath})"
// Merge stderr into stdout and drain incrementally — for a wedged JVM the dump can be
// larger than the OS pipe buffer (~64KB on Linux), and waiting for exit before reading
// would deadlock both jcmd and us. See codex review.
ProcessBuilder pb = new ProcessBuilder(jcmdPath, Long.toString(pid), "Thread.print")
pb.redirectErrorStream(true)
Process proc = pb.start()
ByteArrayOutputStream baos = new ByteArrayOutputStream()
byte[] buf = new byte[8192]
long deadline = System.nanoTime() + SECONDS.toNanos(5)
InputStream is = proc.getInputStream()
while (true) {
if (is.available() > 0) {
int n = is.read(buf)
if (n < 0) {
break
}
// Bound the in-memory buffer so a runaway dump can't OOM us.
if (baos.size() < DIAG_TAIL_BYTES * 2) {
baos.write(buf, 0, n)
}
} else if (!proc.isAlive()) {
// Drain any remaining bytes after exit.
int n
while ((n = is.read(buf)) >= 0) {
if (baos.size() < DIAG_TAIL_BYTES * 2) {
baos.write(buf, 0, n)
}
}
break
} else if (System.nanoTime() > deadline) {
proc.destroyForcibly()
proc.waitFor(1, SECONDS)
if (baos.size() == 0) {
return "(jcmd timed out with no output)"
}
break
} else {
Thread.sleep(50)
}
}
String out = new String(baos.toByteArray(), StandardCharsets.UTF_8)
if (out.size() > DIAG_TAIL_BYTES) {
// Keep the head — application/agent threads tend to be earlier in the dump than
// generic VM/GC/JIT threads, and the head is what reveals where main is wedged.
return out.substring(0, DIAG_TAIL_BYTES) + "\n...(truncated; full dump was ${out.size()} bytes)..."
}
return out
} catch (Throwable e) {
return "(jcmd unavailable: ${e.message})"
}
}

private String resolveJcmdPath() {
// On Java 8, java.home points to the JRE subdirectory, so jcmd is at ../bin/jcmd; on
// Java 9+ it's at bin/jcmd. Try both, fall back to PATH.
String javaHome = System.getProperty("java.home")
if (javaHome != null) {
File direct = new File(javaHome, "bin/jcmd")
if (direct.canExecute()) {
return direct.absolutePath
}
File jdkSibling = new File(new File(javaHome).parentFile, "bin/jcmd")
if (jdkSibling.canExecute()) {
return jdkSibling.absolutePath
}
def allLines = logFile.readLines()
def tail = allLines.size() > lines ? allLines[-lines..-1] : allLines
return tail.join("\n")
} catch (Exception e) {
return "(failed to read log: ${e.message})"
}
return "jcmd"
}

def parseTraceFromStdOut( String line ) {
Expand Down
Loading