diff --git a/ddtrace/tracer/sampler.go b/ddtrace/tracer/sampler.go index 69a7cf1bb04..0a3391948c0 100644 --- a/ddtrace/tracer/sampler.go +++ b/ddtrace/tracer/sampler.go @@ -15,6 +15,7 @@ import ( "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" "github.com/DataDog/dd-trace-go/v2/internal/locking" + "github.com/DataDog/dd-trace-go/v2/internal/locking/assert" "github.com/DataDog/dd-trace-go/v2/internal/samplernames" ) @@ -137,10 +138,11 @@ const rampUpInterval = time.Second // prioritySampler holds a set of per-service sampling rates and applies // them to spans. type prioritySampler struct { - mu locking.RWMutex - rates map[serviceEnvKey]float64 // +checklocks:mu - defaultRate float64 // +checklocks:mu - lastCapped time.Time // +checklocks:mu + mu locking.RWMutex + rates map[serviceEnvKey]float64 // +checklocks:mu + defaultRate float64 // +checklocks:mu + agentRatesLoaded bool // +checklocks:mu + lastCapped time.Time // +checklocks:mu } func newPrioritySampler() *prioritySampler { @@ -198,6 +200,7 @@ func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error { } ps.mu.Lock() defer ps.mu.Unlock() + ps.agentRatesLoaded = true now := time.Now() canIncrease := ps.lastCapped.IsZero() || now.Sub(ps.lastCapped) >= rampUpInterval capApplied := false @@ -225,9 +228,17 @@ func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error { // guard the span. // +checklocksignore — Called during initialization in StartSpan, span not yet shared. func (ps *prioritySampler) getRate(spn *Span) float64 { - key := serviceEnvKey{service: spn.service, env: spn.meta[ext.Environment]} ps.mu.RLock() defer ps.mu.RUnlock() + return ps.getRateLocked(spn) +} + +// getRateLocked returns the sampling rate for the given span. +// Caller must hold ps.mu (at least RLock). +// +checklocksignore — Called during initialization in StartSpan, span not yet shared. +func (ps *prioritySampler) getRateLocked(spn *Span) float64 { + assert.RWMutexRLocked(&ps.mu) + key := serviceEnvKey{service: spn.service, env: spn.meta[ext.Environment]} if rate, ok := ps.rates[key]; ok { return rate } @@ -245,13 +256,21 @@ func (ps *prioritySampler) getDefaultRate() float64 { // to modify the span. // +checklocksignore — Called during initialization in StartSpan, span not yet shared. func (ps *prioritySampler) apply(spn *Span) { - rate := ps.getRate(spn) + ps.mu.RLock() + rate := ps.getRateLocked(spn) + fromAgent := ps.agentRatesLoaded + ps.mu.RUnlock() if sampledByRate(spn.traceID, rate) { spn.setSamplingPriority(ext.PriorityAutoKeep, samplernames.AgentRate) } else { spn.setSamplingPriority(ext.PriorityAutoReject, samplernames.AgentRate) } spn.SetTag(keySamplingPriorityRate, rate) - // Set the Knuth sampling rate tag when sampled by agent rate - spn.SetTag(keyKnuthSamplingRate, formatKnuthSamplingRate(rate)) + // Only set the Knuth sampling rate tag when actual agent rates have been + // received. The initial default rate (1.0) is a client-side fallback that + // does not represent an agent-configured rate, so it must not propagate + // as _dd.p.ksr to stay consistent with other tracers. + if fromAgent { + spn.SetTag(keyKnuthSamplingRate, formatKnuthSamplingRate(rate)) + } } diff --git a/ddtrace/tracer/sampler_test.go b/ddtrace/tracer/sampler_test.go index 71481c58677..cb0cd74293c 100644 --- a/ddtrace/tracer/sampler_test.go +++ b/ddtrace/tracer/sampler_test.go @@ -202,6 +202,66 @@ func TestPrioritySampler(t *testing.T) { assert.EqualValues(ext.PriorityAutoReject, priority) assert.EqualValues(0.5, rate) }) + + t.Run("ksr-not-set-without-agent-rates", func(t *testing.T) { + // When no agent rates have been received, the priority sampler uses + // its initial default rate (1.0). This is a client-side fallback and + // should NOT propagate as _dd.p.ksr to stay consistent with other + // Datadog tracers. + assert := assert.New(t) + ps := newPrioritySampler() + spn := newBasicSpan("http.request") + spn.service = "my-service" + spn.traceID = 1 + + ps.apply(spn) + _, ok := getMeta(spn, keyKnuthSamplingRate) + assert.False(ok, "_dd.p.ksr must not be set when no agent rates have been received") + + // Sampling priority and rate metric should still be set normally + priority, _ := getMetric(spn, keySamplingPriority) + assert.EqualValues(ext.PriorityAutoKeep, priority) + rate, _ := getMetric(spn, keySamplingPriorityRate) + assert.EqualValues(1.0, rate) + }) + + t.Run("ksr-set-after-agent-rates-received", func(t *testing.T) { + // After agent rates are received via readRatesJSON, apply should + // set _dd.p.ksr — even when the span falls through to the default + // rate provided by the agent. + assert := assert.New(t) + ps := newPrioritySampler() + assert.NoError(ps.readRatesJSON( + io.NopCloser(strings.NewReader( + `{ + "rate_by_service":{ + "service:,env:":0.8, + "service:obfuscate.http,env:":0.5 + } + }`, + )), + )) + + // Span matching a per-service rate + spn1 := newBasicSpan("http.request") + spn1.service = "obfuscate.http" + spn1.traceID = 1 + + ps.apply(spn1) + ksr, ok := getMeta(spn1, keyKnuthSamplingRate) + assert.True(ok, "_dd.p.ksr must be set when agent rates have been received (per-service)") + assert.Equal("0.5", ksr) + + // Span falling through to agent-provided default rate + spn2 := newBasicSpan("http.request") + spn2.service = "unknown-service" + spn2.traceID = 1 + + ps.apply(spn2) + ksr, ok = getMeta(spn2, keyKnuthSamplingRate) + assert.True(ok, "_dd.p.ksr must be set when agent rates have been received (default)") + assert.Equal("0.8", ksr) + }) } func BenchmarkPrioritySamplerGetRate(b *testing.B) {