diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 588854a2069..43c72885ef8 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -105,6 +105,8 @@ public final class GeneralConfig { public static final String JDK_SOCKET_ENABLED = "jdk.socket.enabled"; public static final String OPTIMIZED_MAP_ENABLED = "optimized.map.enabled"; + public static final String TAG_NAME_UTF8_CACHE_SIZE = "tag.name.utf8.cache.size"; + public static final String TAG_VALUE_UTF8_CACHE_SIZE = "tag.value.utf8.cache.size"; public static final String STACK_TRACE_LENGTH_LIMIT = "stack.trace.length.limit"; public static final String SSI_INJECTION_ENABLED = "injection.enabled"; diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/writer/ddagent/Utf8Benchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/writer/ddagent/Utf8Benchmark.java new file mode 100644 index 00000000000..37d63e9e783 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/writer/ddagent/Utf8Benchmark.java @@ -0,0 +1,147 @@ +package datadog.trace.common.writer.ddagent; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ThreadLocalRandom; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.infra.Blackhole; + +/** + * This benchmark isn't really intended to used to measure throughput, but rather to be used with + * "-prof gc" to check bytes / op. + * + *
Since {@link String#getBytes(java.nio.charset.Charset)} is intrinsified the caches typically + * perform worse throughput wise, the benefit of the caches is to reduce allocation. Intention of + * this benchmark is to create data that roughly resembles what might be seen in a trace payload. + * Tag names are quite static, tag values are mostly low cardinality, but some tag values have + * infinite cardinality. + */ +@BenchmarkMode(Mode.Throughput) +public class Utf8Benchmark { + static final int NUM_LOOKUPS = 10_000; + + static final String[] TAGS = { + "_dd.asm.keep", + "ci.provider", + "language", + "db.statement", + "ci.job.url", + "ci.pipeline.url", + "db.pool", + "http.forwarder", + "db.warehouse", + "custom" + }; + + static int pos = 0; + static int standardVal = 0; + + static final String nextTag() { + if (pos == TAGS.length - 1) { + pos = 0; + } else { + pos += 1; + } + return TAGS[pos]; + } + + static final String nextValue(String tag) { + if (tag.equals("custom")) { + return nextCustomValue(tag); + } else { + return nextStandardValue(tag); + } + } + + /* + * Produces a high cardinality value - > thousands of distinct values per tag - many 1-time values + */ + static final String nextCustomValue(String tag) { + return tag + ThreadLocalRandom.current().nextInt(); + } + + /* + * Produces a moderate cardinality value - tens of distinct values per tag + */ + static final String nextStandardValue(String tag) { + return tag + ThreadLocalRandom.current().nextInt(20); + } + + @Benchmark + public static final String tagUtf8_baseline() { + return nextTag(); + } + + @Benchmark + public static final byte[] tagUtf8_nocache() { + String tag = nextTag(); + return tag.getBytes(StandardCharsets.UTF_8); + } + + static final SimpleUtf8Cache TAG_CACHE = new SimpleUtf8Cache(128); + + @Benchmark + public static final byte[] tagUtf8_w_cache() { + String tag = nextTag(); + + byte[] cache = TAG_CACHE.getUtf8(tag); + if (cache != null) return cache; + + return tag.getBytes(StandardCharsets.UTF_8); + } + + @Benchmark + public static final void valueUtf8_baseline(Blackhole bh) { + for (int i = 0; i < NUM_LOOKUPS; ++i) { + String tag = nextTag(); + String value = nextValue(tag); + + bh.consume(tag); + bh.consume(value); + } + } + + static final GenerationalUtf8Cache VALUE_CACHE = new GenerationalUtf8Cache(64, 128); + + @Benchmark + public static final void valueUtf8_cache_generational(Blackhole bh) { + GenerationalUtf8Cache valueCache = VALUE_CACHE; + valueCache.recalibrate(); + + for (int i = 0; i < NUM_LOOKUPS; ++i) { + String tag = nextTag(); + String value = nextValue(tag); + + byte[] lookup = valueCache.getUtf8(value); + bh.consume(lookup); + } + } + + static final SimpleUtf8Cache SIMPLE_VALUE_CACHE = new SimpleUtf8Cache(128); + + @Benchmark + public static final void valueUtf8_cache_simple(Blackhole bh) { + SimpleUtf8Cache valueCache = SIMPLE_VALUE_CACHE; + valueCache.recalibrate(); + + for (int i = 0; i < NUM_LOOKUPS; ++i) { + String tag = nextTag(); + String value = nextValue(tag); + + byte[] lookup = valueCache.getUtf8(value); + bh.consume(lookup); + } + } + + @Benchmark + public static final void valueUtf8_nocache(Blackhole bh) { + for (int i = 0; i < NUM_LOOKUPS; ++i) { + String tag = nextTag(); + String value = nextValue(tag); + + bh.consume(tag); + bh.consume(value.getBytes(StandardCharsets.UTF_8)); + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/Caching.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/Caching.java new file mode 100644 index 00000000000..bc61b037784 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/Caching.java @@ -0,0 +1,81 @@ +package datadog.trace.common.writer.ddagent; + +import java.util.Arrays; + +/** Some common static functions used by simple & generational caches */ +final class Caching { + private Caching() {} + + /** + * Provides the cache size that holds the requestedCapacity + * + * @param requestedCapacity > 0 + * @return size >= requestedCapacity + */ + static final int cacheSizeFor(int requestedCapacity) { + int pow; + for (pow = 1; pow < requestedCapacity; pow *= 2) ; + return pow; + } + + /** Provides an "adjusted" (e.g. non-zero) hash for the given String */ + static final int adjHash(String value) { + int hash = value.hashCode(); + return (hash == 0) ? 0xDA7AD06 : hash; + } + + /** Resets markers to zero */ + static final void reset(int[] marks) { + Arrays.fill(marks, 0); + } + + /** + * Changes the mark status of the corresponding slot in the marking array. If there was previously + * a matching mark, resets the slot to zero and returns true If there was previously a mismatching + * mark, updates the slot and returns false + * + *
A return value of true indicates that the requested value has likely been seen previously
+ * and cache entry should be created.
+ */
+ static final boolean mark(int[] marks, int newAdjHash) {
+ int index = bucketIndex(marks, newAdjHash);
+
+ // This is the 4th iteration of the marking strategy
+ // First version - used a mark entry, but that would prematurely
+ // burn a slot in the cache
+ // Second version - used a mark boolean, that worked well, but
+ // was a overly permissive in allowing the next request to the same slot
+ // to immediately create a CacheEntry
+ // Third version - used a mark hash that to match exactly,
+ // that could lead to access order fights over the cache slot
+ // So this version is a hybrid of 2nd & 3rd, using a bloom filter
+ // that effectively degenerates to a boolean
+
+ // This approach provides a nice balance when there's an A-B-A access pattern
+ // The first A will mark the slot
+ // Then B will mark the slot with A | B
+ // Then either A or B can claim and reset the slot
+
+ int priorMarkHash = marks[index];
+ boolean match = ((priorMarkHash & newAdjHash) == newAdjHash);
+ if (match) {
+ marks[index] = 0;
+ } else {
+ marks[index] = priorMarkHash | newAdjHash;
+ }
+ return match;
+ }
+
+ /** Provides the corresponding index into the marking array */
+ static final int bucketIndex(int[] marks, int adjHash) {
+ return adjHash & (marks.length - 1);
+ }
+
+ /**
+ * Provides the corresponding index into an entry array Assumes that array size was determined by
+ * using {@Caching#cacheSizeFor}
+ */
+ static final Cache is designed to take advantage of low cardinality tags to avoid repeated UTF8 encodings
+ * while also minimizing cache overhead and churn from high cardinality tags.
+ *
+ * NOTE: The aim of this cache is to reduce allocation overhead -- not CPU overhead. Using the
+ * cache has higher CPU overhead than simply calling {@link
+ * String#getBytes(java.nio.charset.Charset)}.
+ *
+ * The cache is thread safe.
+ */
+/*
+ * Cache works by using a 2-level promotion based scheme.
+ *
+ * Thread safety is achieved through using CacheEntry objects where the key data
+ * fields are final.
+ *
+ * Updating of the cache and bookkeeping are deliberately allowed to be racy to
+ * minimize CPU overhead and lock contention.
+ *
+ * The first time a value is requested, the value isn't cached and no CacheEntry
+ * is created. Without this refinement, the cost for constructing
+ * CacheEntry for unique values would negate the benefit of the cache.
+ *
+ * These first requests are tracked via edenMarkers which indicate if there was a
+ * previously unsatisfied request to the same initial cache line.
+ *
+ * If there was a request, then CacheEntry is created and stored into edenEntries.
+ * NOTE: The eden line marking process is imprecise and subject to request
+ * ordering issues. But given that low cardinality entries are more likely to repeat
+ * next, imperically this scheme works well.
+ *
+ * If a collision occurs in the cache, linear probing is used to check other slots.
+ * New cache entries fill any available slot within the probing window.
+ *
+ * If a subsequent request, finds a matching item in edenEntries. The hit count
+ * of the CacheEntry is bumped. If the CacheEntry exceeds the current promotion
+ * threshold, then the CacheEntry is inserted into promotionEntries -- freeing up
+ * a slot in edenEntries. If there isn't an available slot in promotionEntries,
+ * the LRU: least recently used promotionEntry is evicted.
+ *
+ * If there are no available slots in edenEntries for a newly created CacheEntry...
+ *
+ * First, attempt to early promote the MFU: most frequently used CacheEntry from
+ * edenEntries to promotedEntries (without eviction).
+ *
+ * If there's no space in promotedEntries to promote the MFU, then evict the
+ * LFU: least frequently used entry from edenEntries instead.
+ *
+ *
+ * LRU based eviction of the promotedEntries works on tagging with the last hit time.
+ * The access time can be provided directly to GenerationalUtf8Cache#getUtf8 or can
+ * be refreshed periodically by calling GenerationalUtf8Cache#updateAccessTime.
+ *
+ * If there's a natural transaction boundary around the UTF8 cache,
+ * calling ValueUtf8Cache#reclibrate will adjust promotion thresholds to
+ * provide better cache utilization.
+ */
+public final class GenerationalUtf8Cache implements EncodingCache {
+ static final int MAX_EDEN_CAPACITY = 512;
+ static final int MAX_TENURED_CAPACITY = 1024;
+
+ private static final int MAX_EDEN_PROBES = 4;
+ private static final int MAX_TENURED_PROBES = 8;
+
+ private static final int MIN_PROMOTION_TRESHOLD = 2;
+ private static final int INITIAL_PROMOTION_THRESHOLD = 16;
+
+ private static final double SCORE_DECAY = 0.5D;
+ private static final double PURGE_THRESHOLD = 0.25D;
+ private static final double PROMOTION_THRESHOLD_ADJ_FACTOR = 1.5;
+
+ private static final double EDEN_PROPORTION = 1D / 3D;
+ private static final double TENURED_PROPORTION = 1 - EDEN_PROPORTION;
+
+ private static final int MAX_ENTRY_LEN = 256;
+
+ private final CacheEntry[] edenEntries;
+ private final int[] edenMarkers;
+
+ private final CacheEntry[] tenuredEntries;
+
+ private long accessTimeMs;
+ private double promotionThreshold = INITIAL_PROMOTION_THRESHOLD;
+
+ int edenHits = 0;
+ int tenuredHits = 0;
+ int earlyPromotions = 0;
+ int promotions = 0;
+ int edenEvictions = 0;
+ int tenuredEvictions = 0;
+
+ public GenerationalUtf8Cache(int capacity) {
+ this.accessTimeMs = System.currentTimeMillis();
+
+ int edenCapacity = (int) (capacity * EDEN_PROPORTION);
+ int edenSize = Caching.cacheSizeFor(Math.min(edenCapacity, MAX_EDEN_CAPACITY));
+
+ // These sizes must be powers of 2
+ this.edenEntries = new CacheEntry[edenSize];
+ this.edenMarkers = new int[edenSize];
+
+ int tenuredCapacity = (int) (capacity * TENURED_PROPORTION);
+ int tenuredSize = Caching.cacheSizeFor(Math.min(tenuredCapacity, MAX_TENURED_CAPACITY));
+
+ // The size must be a power of 2
+ this.tenuredEntries = new CacheEntry[tenuredSize];
+ }
+
+ public GenerationalUtf8Cache(int edenCapacity, int tenuredCapacity) {
+ this.accessTimeMs = System.currentTimeMillis();
+
+ int edenSize = Caching.cacheSizeFor(Math.min(tenuredCapacity, MAX_EDEN_CAPACITY));
+ this.edenEntries = new CacheEntry[edenSize];
+ this.edenMarkers = new int[edenSize];
+
+ int tenuredSize = Caching.cacheSizeFor(Math.min(tenuredCapacity, MAX_TENURED_CAPACITY));
+ this.tenuredEntries = new CacheEntry[tenuredSize];
+ }
+
+ public int edenCapacity() {
+ return this.edenEntries.length;
+ }
+
+ public int tenuredCapacity() {
+ return this.tenuredEntries.length;
+ }
+
+ /** Updates access time used @link {@link #getUtf8(String, String)} to the provided value */
+ public void updateAccessTime(long accessTimeMs) {
+ this.accessTimeMs = accessTimeMs;
+ }
+
+ /** Updates access time to the @link {@link System#currentTimeMillis()} */
+ public void refreshAcessTime() {
+ this.updateAccessTime(System.currentTimeMillis());
+ }
+
+ public synchronized void recalibrate() {
+ this.recalibrate(System.currentTimeMillis());
+ }
+
+ /**
+ * Recalibrates the cache Applies a decay to existing entries - and purges entries below the
+ * PURGE_THRESHOLD
+ *
+ * Adjusts the promotion threshold depending on ratio of promotions to evictions, since prior
+ * recalibration
+ *
+ * While still racy this method is synchronized to avoid simultaneous recalibrations
+ */
+ public void recalibrate(long accessTimeMs) {
+ this.accessTimeMs = accessTimeMs;
+
+ recalibrate(this.edenEntries);
+ Caching.reset(this.edenMarkers);
+ recalibrate(this.tenuredEntries);
+
+ int totalPromotions = this.promotions + this.earlyPromotions;
+ if (totalPromotions == 0 && this.promotionThreshold >= MIN_PROMOTION_TRESHOLD) {
+ this.promotionThreshold /= PROMOTION_THRESHOLD_ADJ_FACTOR;
+ } else if (totalPromotions > this.tenuredEvictions / 2) {
+ this.promotionThreshold *= PROMOTION_THRESHOLD_ADJ_FACTOR;
+ }
+
+ this.edenHits = 0;
+ this.tenuredHits = 0;
+ this.earlyPromotions = 0;
+ this.promotions = 0;
+ this.edenEvictions = 0;
+ this.tenuredEvictions = 0;
+ }
+
+ static final void recalibrate(CacheEntry[] entries) {
+ for (int i = 0; i < entries.length; ++i) {
+ CacheEntry entry = entries[i];
+ if (entry == null) continue;
+
+ boolean purge = entry.decay();
+ if (purge) entries[i] = null;
+ }
+ }
+
+ @Override
+ public byte[] encode(CharSequence charSeq) {
+ if (charSeq instanceof String) {
+ String str = (String) charSeq;
+ return this.getUtf8(str);
+ } else {
+ return null;
+ }
+ }
+
+ /** Returns the UTF-8 encoding of value -- using a cache value if available */
+ public final byte[] getUtf8(String value) {
+ return this.getUtf8(value, this.accessTimeMs);
+ }
+
+ /**
+ * Returns the UTF-8 encoding of value -- using a cache value if available If there is cache hit,
+ * the specified accessTimeMs is used to update the cache entry
+ */
+ public final byte[] getUtf8(String value, long accessTimeMs) {
+ if (value.length() > MAX_ENTRY_LEN) return CacheEntry.utf8(value);
+
+ int adjHash = Caching.adjHash(value);
+ long lookupTimeMs = this.accessTimeMs;
+
+ CacheEntry[] tenuredEntries = this.tenuredEntries;
+ int matchingTenuredIndex = lookupEntryIndex(tenuredEntries, MAX_TENURED_PROBES, adjHash, value);
+ if (matchingTenuredIndex != -1) {
+ CacheEntry tenuredEntry = tenuredEntries[matchingTenuredIndex];
+
+ tenuredEntry.hit(lookupTimeMs);
+
+ this.tenuredHits += 1;
+ return tenuredEntry.utf8();
+ }
+
+ CacheEntry[] edenEntries = this.edenEntries;
+ int matchingEdenIndex = lookupEntryIndex(edenEntries, MAX_EDEN_PROBES, adjHash, value);
+ if (matchingEdenIndex != -1) {
+ CacheEntry edenEntry = edenEntries[matchingEdenIndex];
+
+ double hits = edenEntry.hit(lookupTimeMs);
+ if (hits > this.promotionThreshold) {
+ // mark promoted first - to avoid racy insertions
+ this.promotions += 1;
+
+ boolean evicted = lruInsert(this.tenuredEntries, MAX_TENURED_PROBES, edenEntry);
+ if (evicted) this.tenuredEvictions += 1;
+
+ edenEntries[matchingEdenIndex] = null;
+ }
+
+ this.edenHits += 1;
+ return edenEntry.utf8();
+ }
+
+ boolean wasMarked = Caching.mark(this.edenMarkers, adjHash);
+
+ // If slot isn't marked, this is likely the first request
+ // Don't create an entry yet
+ if (!wasMarked) return CacheEntry.utf8(value);
+
+ CacheEntry newEntry = new CacheEntry(adjHash, value);
+ // First request was swallowed by marking, so double hit
+ newEntry.hit(lookupTimeMs);
+ newEntry.hit(lookupTimeMs);
+
+ // search for empty slot or failing that the MFU entry
+ int edenMfuIndex = findFirstAvailableOrMfuIndex(edenEntries, MAX_EDEN_PROBES, adjHash);
+ CacheEntry edenMfuEntry = edenEntries[edenMfuIndex];
+
+ // Found an empty slot - fill it
+ if (edenMfuEntry == null) {
+ edenEntries[edenMfuIndex] = newEntry;
+ return newEntry.utf8();
+ }
+
+ // See if we can early promote the local MFU entry into the global cache
+ // Early promotion doesn't evict from the global cache
+
+ // NOTE: Need to make sure to use hash of the entry being promoted,
+ // since it may differ from the requested hash
+ int tenuredAvailableIndex =
+ findAvailableIndex(tenuredEntries, MAX_TENURED_PROBES, edenMfuEntry.adjHash());
+ if (tenuredAvailableIndex != -1) {
+ tenuredEntries[tenuredAvailableIndex] = edenMfuEntry;
+ this.earlyPromotions += 1;
+
+ edenEntries[edenMfuIndex] = newEntry;
+ return newEntry.utf8();
+ }
+
+ // No empty slot - or space to promote into the global cache
+ // Insert into local cache while evicting the LFU
+ boolean evicted = lfuInsert(edenEntries, MAX_EDEN_PROBES, newEntry);
+ if (evicted) this.edenEvictions += 1;
+
+ return newEntry.utf8();
+ }
+
+ static final int findAvailableIndex(CacheEntry[] entries, int numProbes, int newAdjHash) {
+ int initialBucketIndex = Caching.bucketIndex(entries, newAdjHash);
+ for (int probe = 0, index = initialBucketIndex; probe < numProbes; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry == null || entry.isPurgeable()) return index;
+ }
+ return -1;
+ }
+
+ static final int findFirstAvailableOrMfuIndex(
+ CacheEntry[] entries, int numProbes, int newAdjHash) {
+ double mfuScore = Double.MIN_VALUE;
+ int mfuIndex = -1;
+
+ int initialBucketIndex = Caching.bucketIndex(entries, newAdjHash);
+ for (int probe = 0, index = initialBucketIndex; probe < numProbes; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry == null) return index;
+
+ double score = entry.score();
+ if (score > mfuScore) {
+ mfuScore = score;
+ mfuIndex = index;
+ }
+ }
+ return mfuIndex;
+ }
+
+ static final boolean lfuInsert(CacheEntry[] entries, int numProbes, CacheEntry newEntry) {
+ int initialBucketIndex = Caching.bucketIndex(entries, newEntry.adjHash());
+
+ // initial scan to see if there's an empty slot or marker entry is already present
+ double lowestScore = Double.MAX_VALUE;
+ int lfuIndex = -1;
+ for (int probe = 0, index = initialBucketIndex; probe < numProbes; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry == null || entry.isPurgeable()) {
+ entries[index] = newEntry;
+ return false;
+ } else {
+ double score = entry.score();
+ if (score < lowestScore) {
+ lowestScore = score;
+ lfuIndex = index;
+ }
+ }
+ }
+
+ // If we get here, then we're evicted the LRU
+ entries[lfuIndex] = newEntry;
+ return true;
+ }
+
+ static final boolean lruInsert(CacheEntry[] entries, int numProbes, CacheEntry newEntry) {
+ int initialBucketIndex = Caching.bucketIndex(entries, newEntry.adjHash());
+
+ // initial scan to see if there's an empty slot or entry is already present
+ long lowestUsedMs = Long.MAX_VALUE;
+ int lruIndex = -1;
+ for (int probe = 0, index = initialBucketIndex; probe < numProbes; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry == null || entry.matches(newEntry)) {
+ entries[index] = newEntry;
+ return false;
+ }
+
+ long lastUsedMs = entry.lastUsedMs();
+ if (lastUsedMs < lowestUsedMs) {
+ lowestUsedMs = lastUsedMs;
+ lruIndex = index;
+ }
+ }
+
+ entries[lruIndex] = newEntry;
+ return true;
+ }
+
+ static final int lookupEntryIndex(
+ CacheEntry[] entries, int numProbes, int adjHash, String value) {
+ int initialBucketIndex = Caching.bucketIndex(entries, adjHash);
+ for (int probe = 0, index = initialBucketIndex; probe < numProbes; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry != null && entry.matches(adjHash, value)) {
+ return index;
+ }
+ }
+ return -1;
+ }
+
+ static final class CacheEntry {
+ final int adjHash;
+ final String value;
+ final byte[] valueUtf8;
+
+ boolean promoted = false;
+ long lastUsedMs = 0;
+ double score = 0;
+
+ public CacheEntry(int adjHash, String value) {
+ this.adjHash = adjHash;
+ this.value = value;
+ this.valueUtf8 = utf8(value);
+ }
+
+ boolean matches(CacheEntry thatEntry) {
+ return (this == thatEntry) || this.matches(thatEntry.adjHash, thatEntry.value);
+ }
+
+ boolean matches(int adjHash, String value) {
+ return (this.adjHash == adjHash) && value.equals(this.value);
+ }
+
+ int adjHash() {
+ return this.adjHash;
+ }
+
+ double score() {
+ return this.score;
+ }
+
+ long lastUsedMs() {
+ return this.lastUsedMs;
+ }
+
+ byte[] utf8() {
+ return this.valueUtf8;
+ }
+
+ double hit(long lastUsedMs) {
+ this.lastUsedMs = lastUsedMs;
+ this.score += 1;
+
+ return this.score;
+ }
+
+ boolean decay() {
+ this.score *= SCORE_DECAY;
+
+ return this.isPurgeable();
+ }
+
+ boolean isPurgeable() {
+ return (this.score < PURGE_THRESHOLD);
+ }
+
+ static final byte[] utf8(String value) {
+ return value.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String toString() {
+ return this.value + " - score: " + this.score + " used (ms): " + this.lastUsedMs;
+ }
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/SimpleUtf8Cache.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/SimpleUtf8Cache.java
new file mode 100644
index 00000000000..56cb10594a6
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/SimpleUtf8Cache.java
@@ -0,0 +1,230 @@
+package datadog.trace.common.writer.ddagent;
+
+import datadog.communication.serialization.EncodingCache;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A simple UTF8 cache - primarily intended for tag names
+ *
+ * Cache is designed to be resilient against single use values
+ *
+ * NOTE: The aim of this cache is to reduce allocation overhead -- not CPU overhead. Using the
+ * cache has higher CPU overhead than simply calling {@link
+ * String#getBytes(java.nio.charset.Charset)}.
+ *
+ * The cache is thread safe.
+ */
+/*
+ * Thread safety is achieved through using CacheEntry objects where the key data
+ * fields are final.
+ *
+ * Updating of the cache and bookkeeping are deliberately allowed to be racy to
+ * minimize CPU overhead and lock contention.
+ *
+ * The first time a value is requested, the value isn't cached and no CacheEntry
+ * is created. Without this refinement, the cost for constructing
+ * CacheEntry for unique values would negate the benefit of the cache.
+ *
+ * These first requests are tracked via markers which indicate if there was
+ * previously an unsatisfied request to the same initial cache line.
+ *
+ * If there was a request, then CacheEntry is created and stored into entries.
+ * NOTE: The cache line marking process is imprecise and subject to request
+ * ordering issues. But given that low cardinality entries are more likely to repeat
+ * next, imperically this scheme works well.
+ *
+ * If a collision occurs in the cache, linear probing is used to check other slots.
+ * New cache entries fill any available slot within the probing window.
+ *
+ * If a subsequent request, finds a matching item in entries. The hit count
+ * of the CacheEntry is bumped.
+ *
+ * If there are no available slots in entries for a newly created CacheEntry,
+ * a LFU: least frequently used eviction policy is used to free up a slot.
+ */
+public final class SimpleUtf8Cache implements EncodingCache {
+ static final int MAX_CAPACITY = 1024;
+
+ private static final int MAX_PROBES = 4;
+
+ private final int SIZE = 64;
+
+ private final int[] markers;
+ private final CacheEntry[] entries;
+
+ private static final double HIT_DECAY = 0.5D;
+ private static final double PURGE_THRESHOLD = 0.25D;
+
+ private static final int MAX_ENTRY_LEN = 128;
+
+ protected int hits = 0;
+ protected int evictions = 0;
+
+ public SimpleUtf8Cache(int capacity) {
+ int size = Caching.cacheSizeFor(Math.min(capacity, MAX_CAPACITY));
+
+ this.markers = new int[size];
+ this.entries = new CacheEntry[size];
+ }
+
+ public int capacity() {
+ return this.entries.length;
+ }
+
+ /**
+ * Recalibrates the cache Applies a decay to existing entries - and purges entries below the
+ * PURGE_THRESHOLD
+ *
+ * While still racy this method is synchronized to avoid simultaneous recalibrations
+ */
+ public synchronized void recalibrate() {
+ CacheEntry[] thisEntries = this.entries;
+ for (int i = 0; i < thisEntries.length; ++i) {
+ CacheEntry entry = thisEntries[i];
+ if (entry == null) continue;
+
+ boolean purge = entry.decay();
+ if (purge) thisEntries[i] = null;
+ }
+
+ Caching.reset(this.markers);
+ }
+
+ @Override
+ public byte[] encode(CharSequence charSeq) {
+ if (charSeq instanceof String) {
+ String str = (String) charSeq;
+ return this.getUtf8(str);
+ } else {
+ return null;
+ }
+ }
+
+ /** Returns the UTF-8 encoding of value -- using a cache value if available */
+ public final byte[] getUtf8(String value) {
+ if (value.length() > MAX_ENTRY_LEN) return CacheEntry.utf8(value);
+
+ CacheEntry[] thisEntries = this.entries;
+
+ int adjHash = Caching.adjHash(value);
+
+ CacheEntry matchingEntry = lookupEntry(thisEntries, adjHash, value);
+ if (matchingEntry != null) {
+ matchingEntry.hit();
+
+ this.hits += 1;
+ return matchingEntry.utf8();
+ }
+
+ boolean wasMarked = Caching.mark(this.markers, adjHash);
+ if (!wasMarked) return CacheEntry.utf8(value);
+
+ CacheEntry newEntry = new CacheEntry(adjHash, value);
+ newEntry.hit();
+
+ boolean evicted = lfuInsert(thisEntries, newEntry);
+ if (evicted) this.evictions += 1;
+
+ return newEntry.utf8();
+ }
+
+ static final CacheEntry lookupEntry(CacheEntry[] entries, int adjHash, String value) {
+ int initialBucketIndex = Caching.bucketIndex(entries, adjHash);
+ for (int probe = 0, index = initialBucketIndex; probe < MAX_PROBES; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry != null && entry.matches(adjHash, value)) {
+ return entry;
+ }
+ }
+ return null;
+ }
+
+ static final boolean lfuInsert(CacheEntry[] entries, CacheEntry newEntry) {
+ int initialBucketIndex = Caching.bucketIndex(entries, newEntry.adjHash());
+
+ // initial scan to see if there's an empty slot or marker entry is already present
+ double lowestHits = Double.MAX_VALUE;
+ int lfuIndex = -1;
+ for (int probe = 0, index = initialBucketIndex; probe < MAX_PROBES; ++probe, ++index) {
+ if (index >= entries.length) index = 0;
+
+ CacheEntry entry = entries[index];
+ if (entry == null || entry.isPurgeable()) {
+ entries[index] = newEntry;
+ return false;
+ }
+
+ double hits = entry.score();
+ if (hits < lowestHits) {
+ lowestHits = hits;
+ lfuIndex = index;
+ }
+ }
+
+ // If we get here, then we're evicting the LRU
+ entries[lfuIndex] = newEntry;
+ return true;
+ }
+
+ static final class CacheEntry {
+ final int adjHash;
+ final String value;
+ final byte[] valueUtf8;
+
+ boolean promoted = false;
+ double score = 0;
+
+ public CacheEntry(int adjHash, String value) {
+ this.adjHash = adjHash;
+ this.value = value;
+ this.valueUtf8 = utf8(value);
+ }
+
+ boolean matches(CacheEntry thatEntry) {
+ return (this == thatEntry) || this.matches(thatEntry.adjHash, thatEntry.value);
+ }
+
+ boolean matches(int adjHash, String value) {
+ return (this.adjHash == adjHash) && value.equals(this.value);
+ }
+
+ int adjHash() {
+ return this.adjHash;
+ }
+
+ double score() {
+ return this.score;
+ }
+
+ byte[] utf8() {
+ return this.valueUtf8;
+ }
+
+ double hit() {
+ this.score += 1;
+
+ return this.score;
+ }
+
+ boolean decay() {
+ this.score *= HIT_DECAY;
+
+ return this.isPurgeable();
+ }
+
+ boolean isPurgeable() {
+ return (this.score < PURGE_THRESHOLD);
+ }
+
+ static final byte[] utf8(String value) {
+ return value.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String toString() {
+ return this.value + " - score: " + this.score;
+ }
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java
index a1d60164b82..6c9fca64112 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java
@@ -6,6 +6,7 @@
import datadog.communication.serialization.GrowableBuffer;
import datadog.communication.serialization.Writable;
import datadog.communication.serialization.msgpack.MsgPackWriter;
+import datadog.trace.api.Config;
import datadog.trace.api.ProcessTags;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
@@ -23,6 +24,15 @@
import okhttp3.RequestBody;
public final class TraceMapperV0_4 implements TraceMapper {
+ static final SimpleUtf8Cache TAG_CACHE =
+ Config.get().getTagNameUtf8CacheSize() > 0
+ ? new SimpleUtf8Cache(Config.get().getTagNameUtf8CacheSize())
+ : null;
+
+ static final GenerationalUtf8Cache VALUE_CACHE =
+ Config.get().getTagValueUtf8CacheSize() > 0
+ ? new GenerationalUtf8Cache(Config.get().getTagValueUtf8CacheSize())
+ : null;
private final int size;
@@ -57,6 +67,9 @@ MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
@Override
public void accept(Metadata metadata) {
+ if (TAG_CACHE != null) TAG_CACHE.recalibrate();
+ if (VALUE_CACHE != null) VALUE_CACHE.recalibrate();
+
final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
final UTF8BytesString processTags =
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
@@ -111,8 +124,8 @@ public void accept(Metadata metadata) {
writable.writeLong(metadata.getThreadId());
for (Map.Entry