@@ -8,132 +8,108 @@ import ch.qos.logback.core.status.ErrorStatus
88import com.teamscale.client.ITeamscaleService
99import com.teamscale.client.ProfilerLogEntry
1010import com.teamscale.jacoco.agent.options.AgentOptions
11+ import kotlinx.coroutines.CoroutineScope
12+ import kotlinx.coroutines.Dispatchers
13+ import kotlinx.coroutines.Job
14+ import kotlinx.coroutines.SupervisorJob
15+ import kotlinx.coroutines.cancel
16+ import kotlinx.coroutines.channels.BufferOverflow
17+ import kotlinx.coroutines.channels.Channel
18+ import kotlinx.coroutines.currentCoroutineContext
19+ import kotlinx.coroutines.isActive
20+ import kotlinx.coroutines.launch
21+ import kotlinx.coroutines.runBlocking
22+ import kotlinx.coroutines.time.delay
23+ import kotlinx.coroutines.time.withTimeoutOrNull
1124import java.net.ConnectException
1225import java.time.Duration
13- import java.util.*
14- import java.util.concurrent.CompletableFuture
15- import java.util.concurrent.Executors
16- import java.util.concurrent.ScheduledExecutorService
17- import java.util.concurrent.TimeUnit
18- import java.util.concurrent.atomic.AtomicBoolean
19- import java.util.function.BiConsumer
2026
2127/* *
22- * Custom log appender that sends logs to Teamscale; it buffers log that were not sent due to connection issues and
23- * sends them later.
28+ * Custom log appender that sends logs to Teamscale; it buffers logs that were not sent due to connection issues and
29+ * sends them later. Uses a [Channel] as a lock-free producer-consumer buffer with a coroutine collector for batching.
2430 */
2531class LogToTeamscaleAppender : AppenderBase <ILoggingEvent >() {
26- /* * The unique ID of the profiler */
32+ /* * The unique ID of the profiler. */
2733 private var profilerId: String? = null
2834
29- /* *
30- * Buffer for unsent logs. We use a set here to allow for removing entries fast after sending them to Teamscale was
31- * successful.
32- */
33- private val logBuffer = LinkedHashSet <ProfilerLogEntry >()
34-
35- /* * Scheduler for sending logs after the configured time interval */
36- private val scheduler: ScheduledExecutorService = Executors .newScheduledThreadPool(1 ) { r ->
37- // Make the thread a daemon so that it does not prevent the JVM from terminating.
38- val t = Executors .defaultThreadFactory().newThread(r)
39- t.setDaemon(true )
40- t
41- }
35+ /* * Lock-free channel for log entries. [Channel.trySend] is called from Logback threads, [Channel.receive] from the collector coroutine. */
36+ private val logChannel = Channel <ProfilerLogEntry >(capacity = BUFFER_CAPACITY , onBufferOverflow = BufferOverflow .DROP_OLDEST )
4237
43- /* * Active log flushing threads */
44- private val activeLogFlushes: MutableSet <CompletableFuture <Void >> =
45- Collections .newSetFromMap(IdentityHashMap ())
38+ /* * Structured concurrency scope backing the collector coroutine. */
39+ private val scope = CoroutineScope (Dispatchers .IO + SupervisorJob ())
4640
47- /* * Is there a flush going on right now? */
48- private val isFlusing = AtomicBoolean ( false )
41+ /* * The collector coroutine job, tracked so [stop] can wait for it to finish. */
42+ private var collectorJob : Job ? = null
4943
5044 override fun start () {
5145 super .start()
52- scheduler.scheduleAtFixedRate({
53- synchronized(activeLogFlushes) {
54- activeLogFlushes.removeIf { it.isDone }
55- if (activeLogFlushes.isEmpty()) flush()
56- }
57- }, FLUSH_INTERVAL .toMillis(), FLUSH_INTERVAL .toMillis(), TimeUnit .MILLISECONDS )
46+ collectorJob = scope.launch { collectAndSend() }
5847 }
5948
6049 override fun append (eventObject : ILoggingEvent ) {
61- synchronized(logBuffer) {
62- logBuffer.add(formatLog(eventObject))
63- if (logBuffer.size >= BATCH_SIZE ) flush()
64- }
50+ logChannel.trySend(formatLog(eventObject))
6551 }
6652
67- private fun formatLog (eventObject : ILoggingEvent ): ProfilerLogEntry {
68- val trace = LoggingUtils .getStackTraceFromEvent(eventObject)
69- val timestamp = eventObject.timeStamp
70- val message = eventObject.formattedMessage
71- val severity = eventObject.level.toString()
72- return ProfilerLogEntry (timestamp, message, trace, severity)
73- }
53+ private fun formatLog (eventObject : ILoggingEvent ) = ProfilerLogEntry (
54+ eventObject.timeStamp,
55+ eventObject.formattedMessage,
56+ LoggingUtils .getStackTraceFromEvent(eventObject),
57+ eventObject.level.toString()
58+ )
7459
75- private fun flush () {
76- sendLogs()
77- }
60+ /* *
61+ * Collector coroutine: drains [logChannel] into batches, sends them to Teamscale, and retries with backoff on
62+ * failure.
63+ */
64+ private suspend fun collectAndSend () {
65+ val batch = mutableListOf<ProfilerLogEntry >()
66+
67+ while (currentCoroutineContext().isActive) {
68+ if (batch.isEmpty()) {
69+ val receiveResult = withTimeoutOrNull(FLUSH_INTERVAL ) {
70+ logChannel.receiveCatching()
71+ } ? : continue
72+ val entry = receiveResult.getOrNull() ? : break
73+ batch.add(entry)
74+ }
7875
79- /* * Send logs in a separate thread */
80- private fun sendLogs () {
81- synchronized(activeLogFlushes) {
82- activeLogFlushes.add(CompletableFuture .runAsync {
83- if (isFlusing.compareAndSet(false , true )) {
84- try {
85- val client = teamscaleClient ? : return @runAsync // There might be no connection configured.
86-
87- val logsToSend: MutableList <ProfilerLogEntry >
88- synchronized(logBuffer) {
89- logsToSend = logBuffer.toMutableList()
90- }
91-
92- val call = client.postProfilerLog(profilerId!! , logsToSend)
93- val response = call.execute()
94- check(response.isSuccessful) { " Failed to send log: HTTP error code : ${response.code()} " }
95-
96- synchronized(logBuffer) {
97- // Removing the logs that have been sent after the fact.
98- // This handles problems with lost network connections.
99- logBuffer.removeAll(logsToSend.toSet())
100- }
101- } catch (e: Exception ) {
102- // We do not report on exceptions here.
103- if (e !is ConnectException ) {
104- addStatus(ErrorStatus (" Sending logs to Teamscale failed: ${e.message} " , this , e))
105- }
106- } finally {
107- isFlusing.set(false )
108- }
109- }
110- }.whenComplete(BiConsumer { _, _ ->
111- synchronized(activeLogFlushes) {
112- activeLogFlushes.removeIf { it.isDone }
76+ while (batch.size < BATCH_SIZE ) {
77+ logChannel.tryReceive().getOrNull()?.let { batch.add(it) } ? : break
78+ }
79+
80+ if (batch.isNotEmpty()) {
81+ if (sendBatch(batch)) {
82+ batch.clear()
83+ } else {
84+ delay(RETRY_BACKOFF )
11385 }
114- }))
86+ }
11587 }
11688 }
11789
118- override fun stop () {
119- // Already flush here once to make sure that we do not miss too much.
120- flush()
121-
122- scheduler.shutdown()
123- try {
124- if (! scheduler.awaitTermination(5 , TimeUnit .SECONDS )) {
125- scheduler.shutdownNow()
90+ /* *
91+ * Posts the given [batch] to Teamscale.
92+ *
93+ * @return `true` if the batch was sent successfully, `false` if it should be retried.
94+ */
95+ private fun sendBatch (batch : List <ProfilerLogEntry >): Boolean {
96+ val client = teamscaleClient ? : return true
97+ return try {
98+ val response = client.postProfilerLog(profilerId!! , batch).execute()
99+ check(response.isSuccessful) { " Failed to send log: HTTP error code : ${response.code()} " }
100+ true
101+ } catch (e: Exception ) {
102+ if (e !is ConnectException ) {
103+ addStatus(ErrorStatus (" Sending logs to Teamscale failed: ${e.message} " , this , e))
126104 }
127- } catch (_: InterruptedException ) {
128- scheduler.shutdownNow()
105+ false
129106 }
107+ }
130108
131- // A final flush after the scheduler has been shut down.
132- flush()
133-
134- // Block until all flushes are done
135- CompletableFuture .allOf(* activeLogFlushes.toTypedArray()).join()
136-
109+ override fun stop () {
110+ logChannel.close()
111+ runBlocking { withTimeoutOrNull(SHUTDOWN_TIMEOUT ) { collectorJob?.join() } }
112+ scope.cancel()
137113 super .stop()
138114 }
139115
@@ -146,13 +122,22 @@ class LogToTeamscaleAppender : AppenderBase<ILoggingEvent>() {
146122 }
147123
148124 companion object {
149- /* * Flush the logs after N elements are in the queue */
125+ /* * Maximum number of log entries held in memory. Older entries are dropped on overflow. */
126+ private const val BUFFER_CAPACITY = 10_000
127+
128+ /* * Flush the logs after N elements are in the queue. */
150129 private const val BATCH_SIZE = 50
151130
152- /* * Flush the logs in the given time interval */
131+ /* * Flush the logs in the given time interval. */
153132 private val FLUSH_INTERVAL : Duration = Duration .ofSeconds(3 )
154133
155- /* * The service client for sending logs to Teamscale */
134+ /* * Backoff duration before retrying a failed batch. */
135+ private val RETRY_BACKOFF : Duration = Duration .ofSeconds(5 )
136+
137+ /* * Maximum time to wait for the collector to drain during shutdown. */
138+ private val SHUTDOWN_TIMEOUT : Duration = Duration .ofSeconds(3 )
139+
140+ /* * The service client for sending logs to Teamscale. */
156141 private var teamscaleClient: ITeamscaleService ? = null
157142
158143 /* *
@@ -178,4 +163,4 @@ class LogToTeamscaleAppender : AppenderBase<ILoggingEvent>() {
178163 return true
179164 }
180165 }
181- }
166+ }
0 commit comments