-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathtrace_sampler.cpp
More file actions
128 lines (106 loc) · 3.86 KB
/
trace_sampler.cpp
File metadata and controls
128 lines (106 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "trace_sampler.h"
#include <datadog/sampling_decision.h>
#include <datadog/sampling_priority.h>
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <limits>
#include "collector_response.h"
#include "json_serializer.h"
#include "sampling_util.h"
#include "span_data.h"
namespace datadog {
namespace tracing {
namespace {
nlohmann::json to_json(const TraceSamplerRule& rule) {
nlohmann::json j = rule.matcher;
j["sample_rate"] = rule.rate.value();
return j;
}
} // namespace
TraceSampler::TraceSampler(const FinalizedTraceSamplerConfig& config,
const Clock& clock)
: rules_(config.rules),
limiter_(clock, config.max_per_second),
limiter_max_per_second_(config.max_per_second) {}
void TraceSampler::set_rules(std::vector<TraceSamplerRule> rules) {
std::lock_guard lock(mutex_);
rules_ = std::move(rules);
}
SamplingDecision TraceSampler::decide(const SpanData& span) {
SamplingDecision decision;
decision.origin = SamplingDecision::Origin::LOCAL;
// First check sampling rules.
const auto found_rule =
std::find_if(rules_.cbegin(), rules_.cend(),
[&](const auto& it) { return it.matcher.match(span); });
// `mutex_` protects `limiter_`, `collector_sample_rates_`, and
// `collector_default_sample_rate_`, so let's lock it here.
std::lock_guard lock(mutex_);
if (found_rule != rules_.end()) {
const auto& rule = *found_rule;
decision.mechanism = int(rule.mechanism);
decision.limiter_max_per_second = limiter_max_per_second_;
decision.configured_rate = rule.rate;
const std::uint64_t threshold = max_id_from_rate(rule.rate);
if (knuth_hash(span.trace_id.low) <= threshold) {
const auto result = limiter_.allow();
if (result.allowed) {
decision.priority = int(SamplingPriority::USER_KEEP);
} else {
decision.priority = int(SamplingPriority::USER_DROP);
}
decision.limiter_effective_rate = result.effective_rate;
} else {
decision.priority = int(SamplingPriority::USER_DROP);
}
return decision;
}
// No sampling rule matched. Find the appropriate collector-controlled
// sample rate.
auto found_rate = collector_sample_rates_.find(
CollectorResponse::key(span.service, span.environment().value_or("")));
if (found_rate != collector_sample_rates_.end()) {
decision.configured_rate = found_rate->second;
decision.mechanism = int(SamplingMechanism::AGENT_RATE);
} else {
if (collector_default_sample_rate_) {
decision.configured_rate = *collector_default_sample_rate_;
decision.mechanism = int(SamplingMechanism::AGENT_RATE);
} else {
// We have yet to receive a default rate from the collector. This
// corresponds to the `DEFAULT` sampling mechanism.
decision.configured_rate = Rate::one();
decision.mechanism = int(SamplingMechanism::DEFAULT);
}
}
const std::uint64_t threshold = max_id_from_rate(*decision.configured_rate);
if (knuth_hash(span.trace_id.low) <= threshold) {
decision.priority = int(SamplingPriority::AUTO_KEEP);
} else {
decision.priority = int(SamplingPriority::AUTO_DROP);
}
return decision;
}
void TraceSampler::handle_collector_response(
const CollectorResponse& response) {
const auto found =
response.sample_rate_by_key.find(response.key_of_default_rate);
std::lock_guard<std::mutex> lock(mutex_);
if (found != response.sample_rate_by_key.end()) {
collector_default_sample_rate_ = found->second;
}
collector_sample_rates_ = response.sample_rate_by_key;
}
nlohmann::json TraceSampler::config_json() const {
std::vector<nlohmann::json> rules;
for (const auto& rule : rules_) {
rules.push_back(to_json(rule));
}
return nlohmann::json::object({
{"rules", rules},
{"max_per_second", limiter_max_per_second_},
});
}
} // namespace tracing
} // namespace datadog