Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/internal/locking"
"github.com/DataDog/dd-trace-go/v2/internal/locking/assert"
"github.com/DataDog/dd-trace-go/v2/internal/samplernames"
)

Expand Down Expand Up @@ -137,10 +138,11 @@ const rampUpInterval = time.Second
// prioritySampler holds a set of per-service sampling rates and applies
// them to spans.
type prioritySampler struct {
mu locking.RWMutex
rates map[serviceEnvKey]float64 // +checklocks:mu
defaultRate float64 // +checklocks:mu
lastCapped time.Time // +checklocks:mu
mu locking.RWMutex
rates map[serviceEnvKey]float64 // +checklocks:mu
defaultRate float64 // +checklocks:mu
agentRatesLoaded bool // +checklocks:mu
lastCapped time.Time // +checklocks:mu
}

func newPrioritySampler() *prioritySampler {
Expand Down Expand Up @@ -198,6 +200,7 @@ func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
}
ps.mu.Lock()
defer ps.mu.Unlock()
ps.agentRatesLoaded = true
now := time.Now()
canIncrease := ps.lastCapped.IsZero() || now.Sub(ps.lastCapped) >= rampUpInterval
capApplied := false
Expand Down Expand Up @@ -225,9 +228,17 @@ func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
// guard the span.
// +checklocksignore — Called during initialization in StartSpan, span not yet shared.
func (ps *prioritySampler) getRate(spn *Span) float64 {
key := serviceEnvKey{service: spn.service, env: spn.meta[ext.Environment]}
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.getRateLocked(spn)
}

// getRateLocked returns the sampling rate for the given span.
// Caller must hold ps.mu (at least RLock).
// +checklocksignore — Called during initialization in StartSpan, span not yet shared.
func (ps *prioritySampler) getRateLocked(spn *Span) float64 {
assert.RWMutexRLocked(&ps.mu)
key := serviceEnvKey{service: spn.service, env: spn.meta[ext.Environment]}
if rate, ok := ps.rates[key]; ok {
return rate
}
Expand All @@ -245,13 +256,21 @@ func (ps *prioritySampler) getDefaultRate() float64 {
// to modify the span.
// +checklocksignore — Called during initialization in StartSpan, span not yet shared.
func (ps *prioritySampler) apply(spn *Span) {
rate := ps.getRate(spn)
ps.mu.RLock()
rate := ps.getRateLocked(spn)
fromAgent := ps.agentRatesLoaded
ps.mu.RUnlock()
if sampledByRate(spn.traceID, rate) {
spn.setSamplingPriority(ext.PriorityAutoKeep, samplernames.AgentRate)
} else {
spn.setSamplingPriority(ext.PriorityAutoReject, samplernames.AgentRate)
}
spn.SetTag(keySamplingPriorityRate, rate)
// Set the Knuth sampling rate tag when sampled by agent rate
spn.SetTag(keyKnuthSamplingRate, formatKnuthSamplingRate(rate))
// Only set the Knuth sampling rate tag when actual agent rates have been
// received. The initial default rate (1.0) is a client-side fallback that
// does not represent an agent-configured rate, so it must not propagate
// as _dd.p.ksr to stay consistent with other tracers.
if fromAgent {
spn.SetTag(keyKnuthSamplingRate, formatKnuthSamplingRate(rate))
}
}
60 changes: 60 additions & 0 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,66 @@ func TestPrioritySampler(t *testing.T) {
assert.EqualValues(ext.PriorityAutoReject, priority)
assert.EqualValues(0.5, rate)
})

t.Run("ksr-not-set-without-agent-rates", func(t *testing.T) {
// When no agent rates have been received, the priority sampler uses
// its initial default rate (1.0). This is a client-side fallback and
// should NOT propagate as _dd.p.ksr to stay consistent with other
// Datadog tracers.
assert := assert.New(t)
ps := newPrioritySampler()
spn := newBasicSpan("http.request")
spn.service = "my-service"
spn.traceID = 1

ps.apply(spn)
_, ok := getMeta(spn, keyKnuthSamplingRate)
assert.False(ok, "_dd.p.ksr must not be set when no agent rates have been received")

// Sampling priority and rate metric should still be set normally
priority, _ := getMetric(spn, keySamplingPriority)
assert.EqualValues(ext.PriorityAutoKeep, priority)
rate, _ := getMetric(spn, keySamplingPriorityRate)
assert.EqualValues(1.0, rate)
})

t.Run("ksr-set-after-agent-rates-received", func(t *testing.T) {
// After agent rates are received via readRatesJSON, apply should
// set _dd.p.ksr — even when the span falls through to the default
// rate provided by the agent.
assert := assert.New(t)
ps := newPrioritySampler()
assert.NoError(ps.readRatesJSON(
io.NopCloser(strings.NewReader(
`{
"rate_by_service":{
"service:,env:":0.8,
"service:obfuscate.http,env:":0.5
}
}`,
)),
))

// Span matching a per-service rate
spn1 := newBasicSpan("http.request")
spn1.service = "obfuscate.http"
spn1.traceID = 1

ps.apply(spn1)
ksr, ok := getMeta(spn1, keyKnuthSamplingRate)
assert.True(ok, "_dd.p.ksr must be set when agent rates have been received (per-service)")
assert.Equal("0.5", ksr)

// Span falling through to agent-provided default rate
spn2 := newBasicSpan("http.request")
spn2.service = "unknown-service"
spn2.traceID = 1

ps.apply(spn2)
ksr, ok = getMeta(spn2, keyKnuthSamplingRate)
assert.True(ok, "_dd.p.ksr must be set when agent rates have been received (default)")
assert.Equal("0.8", ksr)
})
}

func BenchmarkPrioritySamplerGetRate(b *testing.B) {
Expand Down
Loading