Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ dependencies {
// TODO: revise these versions to be as old as usable for compatibility
implementation("io.opentelemetry:opentelemetry-api:1.51.0")
implementation("io.opentelemetry:opentelemetry-sdk:1.51.0")
implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.51.0")
implementation("io.opentelemetry:opentelemetry-exporter-logging-otlp:1.51.0")
implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.51.0")
implementation("io.opentelemetry:opentelemetry-sdk-logs:1.51.0")

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.launchdarkly.observability.logs

import com.launchdarkly.observability.replay.transport.BatchWorker
import com.launchdarkly.observability.replay.transport.EventQueue
import io.opentelemetry.context.Context
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.logs.LogRecordProcessor
import io.opentelemetry.sdk.logs.ReadWriteLogRecord

/**
* A [LogRecordProcessor] that enqueues every emitted record as a [LogItemPayload] into a shared
* [EventQueue]. Downstream, a [BatchWorker] pulls items off the queue and hands them to
* [OtlpLogExporter] for OTLP/JSON export.
*
* Replaces the OpenTelemetry `BatchLogRecordProcessor` + `OtlpHttpLogRecordExporter` wiring
* previously used for logs; mirrors the Swift `LogClient` path that pushes `LogItem`s to the
* shared event queue.
*
* @param eventQueue queue that receives the payloads.
* @param batchWorker optional worker used to satisfy [forceFlush]; when set, flush drains the
* queue before returning.
*/
internal class EventLogRecordProcessor(
private val eventQueue: EventQueue,
private val batchWorker: BatchWorker? = null,
) : LogRecordProcessor {

override fun onEmit(context: Context, logRecord: ReadWriteLogRecord) {
eventQueue.send(LogItemPayload(logRecord.toLogRecordData()))
}

override fun forceFlush(): CompletableResultCode {
batchWorker?.flush()
return CompletableResultCode.ofSuccess()
}

override fun shutdown(): CompletableResultCode {
batchWorker?.flush()
return CompletableResultCode.ofSuccess()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.launchdarkly.observability.logs

import com.launchdarkly.observability.replay.transport.EventExporting
import com.launchdarkly.observability.replay.transport.EventQueueItemPayload
import io.opentelemetry.sdk.logs.data.LogRecordData

/**
* Queue payload for a single OpenTelemetry [LogRecordData]. Mirrors the Swift `LogItem`.
*/
data class LogItemPayload(
val logRecord: LogRecordData,
) : EventQueueItemPayload {

override val exporterClass: Class<out EventExporting>
get() = OtlpLogExporter::class.java

/** Timestamp (ms since epoch) used for queue ordering. */
override val timestamp: Long
get() = logRecord.timestampEpochNanos / 1_000_000L

/**
* Queue cost heuristic: a fixed per-record base plus an attribute-count contribution,
* matching the Swift `LogItem.cost()` formula (`300 + attributes.count * 100`).
*/
override fun cost(): Int = BASE_COST + logRecord.attributes.size() * PER_ATTRIBUTE_COST

private companion object {
const val BASE_COST = 300
const val PER_ATTRIBUTE_COST = 100
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.launchdarkly.observability.logs

import com.launchdarkly.observability.otlp.OtlpConfiguration
import com.launchdarkly.observability.otlp.OtlpHttpClient
import com.launchdarkly.observability.otlp.json.logs.JsonLogRecordAdapter
import com.launchdarkly.observability.otlp.json.logs.OtlpJsonExportLogsServiceRequest
import com.launchdarkly.observability.replay.transport.EventExporting
import com.launchdarkly.observability.replay.transport.EventQueueItem
import io.opentelemetry.sdk.logs.data.LogRecordData

/**
* [EventExporting] implementation that sends queued [LogItemPayload]s to an OTLP/HTTP+JSON
* logs endpoint.
*
* Mirrors the Swift `OtlpLogExporter`.
*/
class OtlpLogExporter(
private val httpClient: OtlpHttpClient,
) : EventExporting {

constructor(
endpoint: String,
config: OtlpConfiguration = OtlpConfiguration(),
) : this(OtlpHttpClient(endpoint = endpoint, config = config))

override suspend fun export(items: List<EventQueueItem>) {
val logRecords: List<LogRecordData> = items.mapNotNull { item ->
(item.payload as? LogItemPayload)?.logRecord
}
if (logRecords.isEmpty()) return

val body = JsonLogRecordAdapter.toJsonRequest(logRecords)
httpClient.send(
body = body,
serializer = OtlpJsonExportLogsServiceRequest.serializer(),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.launchdarkly.observability.metrics

import com.launchdarkly.observability.replay.transport.BatchWorker
import com.launchdarkly.observability.replay.transport.EventQueue
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.InstrumentType
import io.opentelemetry.sdk.metrics.data.AggregationTemporality
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector
import io.opentelemetry.sdk.metrics.export.MetricExporter

/**
* A [MetricExporter] that drops the collected [MetricData] into a shared [EventQueue] wrapped as
* [MetricItemPayload]s, instead of exporting directly. Downstream, a [BatchWorker] hands the
* batched payloads to [OtlpMetricExporter].
*
* Mirrors the Swift `OtlpMetricScheduleExporter`.
*
* @param eventQueue queue that receives the payloads.
* @param temporalitySelector aggregation temporality policy, defaulting to
* [AggregationTemporalitySelector.deltaPreferred] (same as the previous OTLP exporter).
* @param batchWorker optional worker used to satisfy [flush]; when set, flush drains the queue.
* @param clock supplies the wall-clock timestamp stamped on each payload (for queue ordering).
*/
internal class EventMetricExporter(
private val eventQueue: EventQueue,
private val temporalitySelector: AggregationTemporalitySelector = AggregationTemporalitySelector.deltaPreferred(),
private val batchWorker: BatchWorker? = null,
private val clock: () -> Long = { System.currentTimeMillis() },
) : MetricExporter {

override fun export(metrics: Collection<MetricData>): CompletableResultCode {
if (metrics.isEmpty()) return CompletableResultCode.ofSuccess()
val timestamp = clock()
val payloads = metrics.map { MetricItemPayload(it, timestamp = timestamp) }
eventQueue.send(payloads)
return CompletableResultCode.ofSuccess()
}

override fun flush(): CompletableResultCode {
batchWorker?.flush()
return CompletableResultCode.ofSuccess()
}

override fun shutdown(): CompletableResultCode {
batchWorker?.flush()
return CompletableResultCode.ofSuccess()
}

override fun getAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality =
temporalitySelector.getAggregationTemporality(instrumentType)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.launchdarkly.observability.metrics

import com.launchdarkly.observability.replay.transport.EventExporting
import com.launchdarkly.observability.replay.transport.EventQueueItemPayload
import io.opentelemetry.sdk.metrics.data.MetricData

/**
* Queue payload for a single OpenTelemetry [MetricData]. Mirrors the Swift `MetricItem`.
*
* Metric payloads carry an explicit [timestamp] because `MetricData` itself does not expose a
* collection timestamp (the one on `PointData` is per-point). [EventMetricExporter] supplies the
* export wall-clock time when it enqueues these payloads.
*/
data class MetricItemPayload(
val metricData: MetricData,
override val timestamp: Long,
) : EventQueueItemPayload {

override val exporterClass: Class<out EventExporting>
get() = OtlpMetricExporter::class.java

/**
* Queue cost heuristic: a fixed per-metric base plus a contribution per data point, matching
* the Swift `MetricItem.cost()` formula (`300 + points.count * 100`).
*/
override fun cost(): Int = BASE_COST + metricData.data.points.size * PER_POINT_COST

private companion object {
const val BASE_COST = 300
const val PER_POINT_COST = 100
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.launchdarkly.observability.metrics

import com.launchdarkly.observability.otlp.OtlpConfiguration
import com.launchdarkly.observability.otlp.OtlpHttpClient
import com.launchdarkly.observability.otlp.json.metrics.JsonMetricsAdapter
import com.launchdarkly.observability.otlp.json.metrics.OtlpJsonExportMetricsServiceRequest
import com.launchdarkly.observability.replay.transport.EventExporting
import com.launchdarkly.observability.replay.transport.EventQueueItem
import io.opentelemetry.sdk.metrics.data.MetricData

/**
* [EventExporting] implementation that sends queued [MetricItemPayload]s to an OTLP/HTTP+JSON
* metrics endpoint.
*
* Mirrors the Swift `OtlpMetricEventExporter`.
*/
class OtlpMetricExporter(
private val httpClient: OtlpHttpClient,
) : EventExporting {

constructor(
endpoint: String,
config: OtlpConfiguration = OtlpConfiguration(),
) : this(OtlpHttpClient(endpoint = endpoint, config = config))

override suspend fun export(items: List<EventQueueItem>) {
val metrics: List<MetricData> = items.mapNotNull { item ->
(item.payload as? MetricItemPayload)?.metricData
}
if (metrics.isEmpty()) return

val body = JsonMetricsAdapter.toJsonRequest(metrics)
httpClient.send(
body = body,
serializer = OtlpJsonExportMetricsServiceRequest.serializer(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import java.io.IOException
import java.io.ByteArrayOutputStream
import java.net.HttpURLConnection
import java.net.URL
import java.util.zip.GZIPOutputStream

@Serializable
data class GraphQLRequest(
Expand Down Expand Up @@ -87,7 +85,7 @@ class GraphQLClient(

val requestJson = json.encodeToString(GraphQLRequest.serializer(), request)
val requestBytes = requestJson.toByteArray(Charsets.UTF_8)
val payloadBytes = if (compress) gzip(requestBytes) else requestBytes
val payloadBytes = if (compress) GzipUtil.gzip(requestBytes) else requestBytes
val connectionLocal = connectionProvider.openConnection(endpoint).also { connection = it }

connectionLocal.apply {
Expand Down Expand Up @@ -142,14 +140,6 @@ class GraphQLClient(
response
}

private fun gzip(data: ByteArray): ByteArray {
val byteStream = ByteArrayOutputStream()
GZIPOutputStream(byteStream).use { gzipStream ->
gzipStream.write(data)
}
return byteStream.toByteArray()
}

private fun logErrors(response: GraphQLResponse<*>) {
val errors = response.errors?.takeIf { it.isNotEmpty() } ?: return
errors.forEach { error ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.launchdarkly.observability.network

import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream

internal object GzipUtil {
fun gzip(data: ByteArray): ByteArray {
val byteStream = ByteArrayOutputStream()
GZIPOutputStream(byteStream).use { gzipStream ->
gzipStream.write(data)
}
return byteStream.toByteArray()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.launchdarkly.observability.otlp

import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

/**
* Compression type applied to OTLP/HTTP request bodies.
*/
enum class OtlpCompression {
GZIP,
NONE,
}

/**
* Configuration for [OtlpHttpClient].
*
* Mirrors the Swift `OtlpConfiguration` struct.
*
* @property timeout Maximum duration to wait for an OTLP/HTTP request to complete.
* @property compression Compression to apply to request bodies. Defaults to [OtlpCompression.GZIP].
* @property headers Additional static headers to attach to every OTLP/HTTP request.
*/
data class OtlpConfiguration(
val timeout: Duration = DEFAULT_TIMEOUT,
val compression: OtlpCompression = OtlpCompression.GZIP,
val headers: Map<String, String> = emptyMap(),
) {
companion object {
val DEFAULT_TIMEOUT: Duration = 10.seconds
}
}
Loading
Loading