Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ We use [semantic versioning](http://semver.org/):
- PATCH version when you make backwards compatible bug fixes.

# Next version
- [breaking] _report-generator_: `RevisionInfo` is now a sealed class with polymorphic Jackson serialization (`@JsonTypeInfo` / `@JsonSubTypes`). The JSON representation now uses "COMMIT" and "REVISION" as type discriminator values instead of the previous `ERevisionType` enum names.
- [feature] _agent_: Added official support for Java 26 and experimental support for Java 27 (via JaCoCo 0.8.15)

# 36.5.2
Expand Down
1 change: 1 addition & 0 deletions agent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {

implementation(libs.jackson.databind)
implementation(libs.jetbrains.annotations)
implementation(libs.coroutines.core)

testImplementation(project(":tia-client"))
testImplementation(libs.retrofit.converter.jackson)
Expand Down
5 changes: 3 additions & 2 deletions agent/src/main/kotlin/com/teamscale/jacoco/agent/Agent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ class Agent(options: AgentOptions, instrumentation: Instrumentation?) : AgentBas
override fun initResourceConfig(): ResourceConfig? {
val resourceConfig = ResourceConfig()
resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true.toString())
AgentResource.setAgent(this)
return resourceConfig.register(AgentResource::class.java).register(GenericExceptionMapper::class.java)
return resourceConfig
.register(AgentResource(this))
.register(GenericExceptionMapper::class.java)
}

override fun prepareShutdown() {
Expand Down
16 changes: 2 additions & 14 deletions agent/src/main/kotlin/com/teamscale/jacoco/agent/AgentResource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import javax.ws.rs.core.Response
* The resource of the Jersey + Jetty http server holding all the endpoints specific for the [Agent].
*/
@Path("/")
class AgentResource : ResourceBase() {
class AgentResource(private val agent: Agent) : ResourceBase(agent) {
/** Handles dumping a XML coverage report for coverage collected until now. */
@POST
@Path("/dump")
Expand All @@ -26,16 +26,4 @@ class AgentResource : ResourceBase() {
agent.controller.reset()
return Response.noContent().build()
}

companion object {
private lateinit var agent: Agent

/**
* Static setter to inject the [Agent] to the resource.
*/
fun setAgent(agent: Agent) {
Companion.agent = agent
agentBase = agent
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.lang.instrument.IllegalClassFormatException
import java.security.ProtectionDomain

/**
* A class file transformer which delegates to the JaCoCo [org.jacoco.agent.rt.internal_bac9136.CoverageTransformer] to do the actual instrumentation,
* A class file transformer which delegates to the JaCoCo [CoverageTransformer] to do the actual instrumentation,
* but treats instrumentation errors e.g. due to unsupported class file versions more lenient by only logging them, but
* not bailing out completely. Those unsupported classes will not be instrumented and will therefore not be contained in
* the collected coverage report.
Expand All @@ -19,8 +19,7 @@ class LenientCoverageTransformer(
options: AgentOptions,
private val logger: Logger
) : CoverageTransformer(
runtime,
options,
runtime, options,
// The coverage transformer only uses the logger to print an error when the instrumentation fails.
// We want to show our more specific error message instead, so we only log this for debugging at trace.
IExceptionLogger { logger.trace(it.message, it) }
Expand Down Expand Up @@ -48,4 +47,4 @@ class LenientCoverageTransformer(
private fun getRootCauseMessage(e: Throwable): String? =
e.cause?.let { getRootCauseMessage(it) } ?: e.message
}
}
}
15 changes: 3 additions & 12 deletions agent/src/main/kotlin/com/teamscale/jacoco/agent/ResourceBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,10 @@
/**
* The resource of the Jersey + Jetty http server holding all the endpoints specific for the [AgentBase].
*/
abstract class ResourceBase {
abstract class ResourceBase(protected val agentBase: AgentBase) {

Check warning on line 16 in agent/src/main/kotlin/com/teamscale/jacoco/agent/ResourceBase.kt

View check run for this annotation

cqse.teamscale.io / Teamscale | Findings

agent/src/main/kotlin/com/teamscale/jacoco/agent/ResourceBase.kt#L16

Interface comment missing for `agentBase` https://cqse.teamscale.io/findings/details/teamscale-java-profiler?id=A390023C44F61B84F50F81185B82FCEA&t=ts%2F38628_kotlin_refactor%3AHEAD
/** The logger. */
protected val logger: Logger = LoggingUtils.getLogger(this)

companion object {
/**
* The agentBase inject via [AgentResource.setAgent] or
* [com.teamscale.jacoco.agent.testimpact.TestwiseCoverageResource.setAgent].
*/
@JvmStatic
protected lateinit var agentBase: AgentBase
}

@get:Path("/partition")
@get:GET
val partition: String
Expand Down Expand Up @@ -119,7 +110,7 @@
/** Returns revision information for the Teamscale upload. */
get() {
val server = agentBase.options.teamscaleServer
return RevisionInfo(server.commit, server.revision)
return RevisionInfo.of(server.commit, server.revision)
}

/**
Expand All @@ -131,4 +122,4 @@
logger.error(message)
throw BadRequestException(message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.teamscale.report.util.ClasspathWildcardIncludeFilter
import java.io.File
import java.lang.instrument.ClassFileTransformer
import java.security.ProtectionDomain
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentSkipListSet

/**
Expand All @@ -17,6 +18,7 @@ class GitPropertiesLocatingTransformer(
) : ClassFileTransformer {
private val logger = getLogger(this)
private val seenJars = ConcurrentSkipListSet<String>()
private val classIncludedCache = ConcurrentHashMap<String, Boolean>()

override fun transform(
classLoader: ClassLoader?,
Expand All @@ -30,8 +32,14 @@ class GitPropertiesLocatingTransformer(
return null
}

if (className.isNullOrEmpty() || !locationIncludeFilter.isIncluded(className)) {
// only search in jar files of included classes
if (className.isNullOrEmpty()) {
return null
}

val isIncluded = classIncludedCache.computeIfAbsent(className) {
locationIncludeFilter.isIncluded(className)
}
if (!isIncluded) {
return null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,132 +8,108 @@ import ch.qos.logback.core.status.ErrorStatus
import com.teamscale.client.ITeamscaleService
import com.teamscale.client.ProfilerLogEntry
import com.teamscale.jacoco.agent.options.AgentOptions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.time.delay
import kotlinx.coroutines.time.withTimeoutOrNull
import java.net.ConnectException
import java.time.Duration
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.BiConsumer

/**
* Custom log appender that sends logs to Teamscale; it buffers log that were not sent due to connection issues and
* sends them later.
* Custom log appender that sends logs to Teamscale; it buffers logs that were not sent due to connection issues and
* sends them later. Uses a [Channel] as a lock-free producer-consumer buffer with a coroutine collector for batching.
*/
class LogToTeamscaleAppender : AppenderBase<ILoggingEvent>() {
/** The unique ID of the profiler */
/** The unique ID of the profiler. */
private var profilerId: String? = null

/**
* Buffer for unsent logs. We use a set here to allow for removing entries fast after sending them to Teamscale was
* successful.
*/
private val logBuffer = LinkedHashSet<ProfilerLogEntry>()

/** Scheduler for sending logs after the configured time interval */
private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1) { r ->
// Make the thread a daemon so that it does not prevent the JVM from terminating.
val t = Executors.defaultThreadFactory().newThread(r)
t.setDaemon(true)
t
}
/** Lock-free channel for log entries. [Channel.trySend] is called from Logback threads, [Channel.receive] from the collector coroutine. */
private val logChannel = Channel<ProfilerLogEntry>(capacity = BUFFER_CAPACITY, onBufferOverflow = BufferOverflow.DROP_OLDEST)

/** Active log flushing threads */
private val activeLogFlushes: MutableSet<CompletableFuture<Void>> =
Collections.newSetFromMap(IdentityHashMap())
/** Structured concurrency scope backing the collector coroutine. */
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

/** Is there a flush going on right now? */
private val isFlusing = AtomicBoolean(false)
/** The collector coroutine job, tracked so [stop] can wait for it to finish. */
private var collectorJob: Job? = null

override fun start() {
super.start()
scheduler.scheduleAtFixedRate({
synchronized(activeLogFlushes) {
activeLogFlushes.removeIf { it.isDone }
if (activeLogFlushes.isEmpty()) flush()
}
}, FLUSH_INTERVAL.toMillis(), FLUSH_INTERVAL.toMillis(), TimeUnit.MILLISECONDS)
collectorJob = scope.launch { collectAndSend() }
}

override fun append(eventObject: ILoggingEvent) {
synchronized(logBuffer) {
logBuffer.add(formatLog(eventObject))
if (logBuffer.size >= BATCH_SIZE) flush()
}
logChannel.trySend(formatLog(eventObject))
}

private fun formatLog(eventObject: ILoggingEvent): ProfilerLogEntry {
val trace = LoggingUtils.getStackTraceFromEvent(eventObject)
val timestamp = eventObject.timeStamp
val message = eventObject.formattedMessage
val severity = eventObject.level.toString()
return ProfilerLogEntry(timestamp, message, trace, severity)
}
private fun formatLog(eventObject: ILoggingEvent) = ProfilerLogEntry(
eventObject.timeStamp,
eventObject.formattedMessage,
LoggingUtils.getStackTraceFromEvent(eventObject),
eventObject.level.toString()
)

private fun flush() {
sendLogs()
}
/**
* Collector coroutine: drains [logChannel] into batches, sends them to Teamscale, and retries with backoff on
* failure.
*/
private suspend fun collectAndSend() {
val batch = mutableListOf<ProfilerLogEntry>()

while (currentCoroutineContext().isActive) {
if (batch.isEmpty()) {
val receiveResult = withTimeoutOrNull(FLUSH_INTERVAL) {
logChannel.receiveCatching()
} ?: continue
val entry = receiveResult.getOrNull() ?: break
batch.add(entry)
}

/** Send logs in a separate thread */
private fun sendLogs() {
synchronized(activeLogFlushes) {
activeLogFlushes.add(CompletableFuture.runAsync {
if (isFlusing.compareAndSet(false, true)) {
try {
val client = teamscaleClient ?: return@runAsync // There might be no connection configured.

val logsToSend: MutableList<ProfilerLogEntry>
synchronized(logBuffer) {
logsToSend = logBuffer.toMutableList()
}

val call = client.postProfilerLog(profilerId!!, logsToSend)
val response = call.execute()
check(response.isSuccessful) { "Failed to send log: HTTP error code : ${response.code()}" }

synchronized(logBuffer) {
// Removing the logs that have been sent after the fact.
// This handles problems with lost network connections.
logBuffer.removeAll(logsToSend.toSet())
}
} catch (e: Exception) {
// We do not report on exceptions here.
if (e !is ConnectException) {
addStatus(ErrorStatus("Sending logs to Teamscale failed: ${e.message}", this, e))
}
} finally {
isFlusing.set(false)
}
}
}.whenComplete(BiConsumer { _, _ ->
synchronized(activeLogFlushes) {
activeLogFlushes.removeIf { it.isDone }
while (batch.size < BATCH_SIZE) {
logChannel.tryReceive().getOrNull()?.let { batch.add(it) } ?: break
}

if (batch.isNotEmpty()) {
if (sendBatch(batch)) {
batch.clear()
} else {
delay(RETRY_BACKOFF)
}
}))
}
}
}

override fun stop() {
// Already flush here once to make sure that we do not miss too much.
flush()

scheduler.shutdown()
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow()
/**
* Posts the given [batch] to Teamscale.
*
* @return `true` if the batch was sent successfully, `false` if it should be retried.
*/
private fun sendBatch(batch: List<ProfilerLogEntry>): Boolean {
val client = teamscaleClient ?: return true
return try {
val response = client.postProfilerLog(profilerId!!, batch).execute()
check(response.isSuccessful) { "Failed to send log: HTTP error code : ${response.code()}" }
true
} catch (e: Exception) {
if (e !is ConnectException) {
addStatus(ErrorStatus("Sending logs to Teamscale failed: ${e.message}", this, e))
}
} catch (_: InterruptedException) {
scheduler.shutdownNow()
false
}
}

// A final flush after the scheduler has been shut down.
flush()

// Block until all flushes are done
CompletableFuture.allOf(*activeLogFlushes.toTypedArray()).join()

override fun stop() {
logChannel.close()
runBlocking { withTimeoutOrNull(SHUTDOWN_TIMEOUT) { collectorJob?.join() } }
scope.cancel()
super.stop()
}

Expand All @@ -146,13 +122,22 @@ class LogToTeamscaleAppender : AppenderBase<ILoggingEvent>() {
}

companion object {
/** Flush the logs after N elements are in the queue */
/** Maximum number of log entries held in memory. Older entries are dropped on overflow. */
private const val BUFFER_CAPACITY = 10_000

/** Flush the logs after N elements are in the queue. */
private const val BATCH_SIZE = 50

/** Flush the logs in the given time interval */
/** Flush the logs in the given time interval. */
private val FLUSH_INTERVAL: Duration = Duration.ofSeconds(3)

/** The service client for sending logs to Teamscale */
/** Backoff duration before retrying a failed batch. */
private val RETRY_BACKOFF: Duration = Duration.ofSeconds(5)

/** Maximum time to wait for the collector to drain during shutdown. */
private val SHUTDOWN_TIMEOUT: Duration = Duration.ofSeconds(3)

/** The service client for sending logs to Teamscale. */
private var teamscaleClient: ITeamscaleService? = null

/**
Expand All @@ -178,4 +163,4 @@ class LogToTeamscaleAppender : AppenderBase<ILoggingEvent>() {
return true
}
}
}
}
Loading
Loading