Skip to content

Commit 2952e1f

Browse files
cataphractclaude
andauthored
fix(telemetry): refactor to shared_ptr and add deterministic shutdown() (#326)
* fix(telemetry): refactor to shared_ptr and add deterministic shutdown() Refactors Telemetry to be heap-allocated via a shared_ptr (enable_shared_from_this + factory create()). Scheduled-task callbacks now capture a weak_from_this() so they become no-ops automatically once the object is no longer reachable. Move semantics are removed. Adds a shutdown() function that cancels scheduled tasks, sends the app-closing payload, and releases the HTTP client. After the call, all HTTP sending becomes a no-op: send_payload() snapshots http_client_ under http_client_mutex_ and returns early if null. http_client_ is reset under the same mutex in shutdown(), so any concurrent call that already holds a live snapshot completes cleanly while new calls short-circuit. If this is the last shared_ptr to the client, the background Curl thread is joined at that point. Rationale: shutdown() cannot guarantee that, after it returns, there are no in-flight HTTP callbacks still holding a reference to Telemetry state. Using a shared_ptr ensures any late callback finds a dead weak_ptr and short-circuits instead of accessing a destroyed object. The load-bearing weak_from_this() captures are the on_response and on_error lambdas inside send_payload(). Those run on CurlImpl's independent event_loop_ thread (via handle_message()), which has no blocking-cancel mechanism. app_closing() calls drain() with a 2-second deadline, so if the agent is slow, requests can still be in flight on the curl thread after shutdown() returns. The weak.lock() guard is what prevents a use-after-free in that window. The weak_from_this() captures in the scheduled-task callbacks (schedule_tasks()) are defense-in-depth for a different concern: ThreadedEventScheduler's cancel() blocks until any in-progress callback finishes, making those captures redundant against the threaded scheduler. However, the EventScheduler interface does not mandate a blocking cancel(), so the captures guard against non-blocking implementations where a callback could otherwise touch Telemetry state after it has been torn down. Also fixes a race in test_span.cpp: disabling telemetry there prevents app_started() from posting to the Curl background thread, which would otherwise fail to reach the (absent) agent asynchronously and call log_error() on the shared MockLogger, racing against the error_count == 0 assertion. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix comment wording * run clang-format --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 93ed055 commit 2952e1f

6 files changed

Lines changed: 257 additions & 250 deletions

File tree

include/datadog/telemetry/telemetry.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@ void init(FinalizedConfiguration configuration,
4444
tracing::HTTPClient::URL agent_url,
4545
tracing::Clock clock = tracing::default_clock);
4646

47+
/// Deterministically shut down the telemetry module.
48+
///
49+
/// Cancels all scheduled tasks, sends the app-closing payload, and waits up
50+
/// to 2 seconds for in-flight requests to complete (HTTPClient::drain()).
51+
/// Then releases the HTTP client reference; if this is the last shared_ptr
52+
/// to the client, the background Curl thread is joined synchronously and any
53+
/// remaining in-flight requests are abandoned without firing their callbacks.
54+
/// If other holders of the client remain, the Curl thread continues running
55+
/// and requests may still complete and fire their callbacks after this call
56+
/// returns (the callbacks guard against this with weak_from_this()). After
57+
/// this call all telemetry send functions become no-ops. Safe to call even if
58+
/// telemetry was never initialized, but must be called at most once after a
59+
/// successful init.
60+
///
61+
/// Call this from the worker process exit path (e.g. before destroying the
62+
/// tracer) so that telemetry's reference to the HTTP client is released
63+
/// promptly, reducing the window before the Curl thread is quiesced.
64+
void shutdown();
65+
4766
/// Sends configuration changes.
4867
///
4968
/// This function is responsible for sending reported configuration changes

src/datadog/telemetry/telemetry.cpp

Lines changed: 67 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ using NoopTelemetry = std::monostate;
2222

2323
/// `TelemetryProxy` holds either the real implementation or a no-op
2424
/// implementation.
25-
using TelemetryProxy = std::variant<NoopTelemetry, Telemetry>;
25+
using TelemetryProxy = std::variant<NoopTelemetry, std::shared_ptr<Telemetry>>;
2626

2727
/// NOTE(@dmehala): Here to facilitate Meyer's singleton construction.
2828
struct Ctor_param final {
@@ -37,9 +37,9 @@ struct Ctor_param final {
3737

3838
TelemetryProxy make_telemetry(const tracing::Optional<Ctor_param>& init) {
3939
if (!init || !init->configuration.enabled) return NoopTelemetry{};
40-
return Telemetry{init->configuration, init->tracer_signature, init->logger,
41-
init->client, init->scheduler, init->agent_url,
42-
init->clock};
40+
return Telemetry::create(init->configuration, init->tracer_signature,
41+
init->logger, init->client, init->scheduler,
42+
init->agent_url, init->clock);
4343
}
4444

4545
TelemetryProxy& instance(
@@ -69,20 +69,30 @@ void init(FinalizedConfiguration configuration,
6969
event_scheduler, agent_url, clock});
7070
}
7171

72-
void send_configuration_change() {
72+
void shutdown() {
7373
std::visit(
7474
details::Overload{
75-
[&](Telemetry& telemetry) { telemetry.send_configuration_change(); },
75+
[](std::shared_ptr<Telemetry>& telemetry) { telemetry->shutdown(); },
7676
[](NoopTelemetry) {},
7777
},
7878
instance());
7979
}
8080

81+
void send_configuration_change() {
82+
std::visit(details::Overload{
83+
[&](std::shared_ptr<Telemetry>& telemetry) {
84+
telemetry->send_configuration_change();
85+
},
86+
[](NoopTelemetry) {},
87+
},
88+
instance());
89+
}
90+
8191
void capture_configuration_change(
8292
const std::vector<tracing::ConfigMetadata>& new_configuration) {
8393
std::visit(details::Overload{
84-
[&](Telemetry& telemetry) {
85-
telemetry.capture_configuration_change(new_configuration);
94+
[&](std::shared_ptr<Telemetry>& telemetry) {
95+
telemetry->capture_configuration_change(new_configuration);
8696
},
8797
[](NoopTelemetry) {},
8898
},
@@ -92,24 +102,28 @@ void capture_configuration_change(
92102
namespace log {
93103
void warning(std::string message) {
94104
std::visit(details::Overload{
95-
[&](Telemetry& telemetry) { telemetry.log_warning(message); },
105+
[&](std::shared_ptr<Telemetry>& telemetry) {
106+
telemetry->log_warning(message);
107+
},
96108
[](NoopTelemetry) {},
97109
},
98110
instance());
99111
}
100112

101113
void error(std::string message) {
102114
std::visit(details::Overload{
103-
[&](Telemetry& telemetry) { telemetry.log_error(message); },
115+
[&](std::shared_ptr<Telemetry>& telemetry) {
116+
telemetry->log_error(message);
117+
},
104118
[](NoopTelemetry) {},
105119
},
106120
instance());
107121
}
108122

109123
void error(std::string message, std::string stacktrace) {
110124
std::visit(details::Overload{
111-
[&](Telemetry& telemetry) {
112-
telemetry.log_error(message, stacktrace);
125+
[&](std::shared_ptr<Telemetry>& telemetry) {
126+
telemetry->log_error(message, stacktrace);
113127
},
114128
[](auto&&) {},
115129
},
@@ -119,57 +133,60 @@ void error(std::string message, std::string stacktrace) {
119133

120134
namespace counter {
121135
void increment(const Counter& counter) {
122-
std::visit(
123-
details::Overload{
124-
[&](Telemetry& telemetry) { telemetry.increment_counter(counter); },
125-
[](auto&&) {},
126-
},
127-
instance());
136+
std::visit(details::Overload{
137+
[&](std::shared_ptr<Telemetry>& telemetry) {
138+
telemetry->increment_counter(counter);
139+
},
140+
[](auto&&) {},
141+
},
142+
instance());
128143
}
129144

130145
void increment(const Counter& counter, const std::vector<std::string>& tags) {
131146
std::visit(details::Overload{
132-
[&](Telemetry& telemetry) {
133-
telemetry.increment_counter(counter, tags);
147+
[&](std::shared_ptr<Telemetry>& telemetry) {
148+
telemetry->increment_counter(counter, tags);
134149
},
135150
[](auto&&) {},
136151
},
137152
instance());
138153
}
139154

140155
void decrement(const Counter& counter) {
141-
std::visit(
142-
details::Overload{
143-
[&](Telemetry& telemetry) { telemetry.decrement_counter(counter); },
144-
[](auto&&) {},
145-
},
146-
instance());
156+
std::visit(details::Overload{
157+
[&](std::shared_ptr<Telemetry>& telemetry) {
158+
telemetry->decrement_counter(counter);
159+
},
160+
[](auto&&) {},
161+
},
162+
instance());
147163
}
148164

149165
void decrement(const Counter& counter, const std::vector<std::string>& tags) {
150166
std::visit(details::Overload{
151-
[&](Telemetry& telemetry) {
152-
telemetry.decrement_counter(counter, tags);
167+
[&](std::shared_ptr<Telemetry>& telemetry) {
168+
telemetry->decrement_counter(counter, tags);
153169
},
154170
[](auto&&) {},
155171
},
156172
instance());
157173
}
158174

159175
void set(const Counter& counter, uint64_t value) {
160-
std::visit(
161-
details::Overload{
162-
[&](Telemetry& telemetry) { telemetry.set_counter(counter, value); },
163-
[](auto&&) {},
164-
},
165-
instance());
176+
std::visit(details::Overload{
177+
[&](std::shared_ptr<Telemetry>& telemetry) {
178+
telemetry->set_counter(counter, value);
179+
},
180+
[](auto&&) {},
181+
},
182+
instance());
166183
}
167184

168185
void set(const Counter& counter, const std::vector<std::string>& tags,
169186
uint64_t value) {
170187
std::visit(details::Overload{
171-
[&](Telemetry& telemetry) {
172-
telemetry.set_counter(counter, tags, value);
188+
[&](std::shared_ptr<Telemetry>& telemetry) {
189+
telemetry->set_counter(counter, tags, value);
173190
},
174191
[](auto&&) {},
175192
},
@@ -181,29 +198,32 @@ void set(const Counter& counter, const std::vector<std::string>& tags,
181198
namespace rate {
182199
void set(const Rate& rate, uint64_t value) {
183200
std::visit(details::Overload{
184-
[&](Telemetry& telemetry) { telemetry.set_rate(rate, value); },
201+
[&](std::shared_ptr<Telemetry>& telemetry) {
202+
telemetry->set_rate(rate, value);
203+
},
185204
[](auto&&) {},
186205
},
187206
instance());
188207
}
189208

190209
void set(const Rate& rate, const std::vector<std::string>& tags,
191210
uint64_t value) {
192-
std::visit(
193-
details::Overload{
194-
[&](Telemetry& telemetry) { telemetry.set_rate(rate, tags, value); },
195-
[](auto&&) {},
196-
},
197-
instance());
211+
std::visit(details::Overload{
212+
[&](std::shared_ptr<Telemetry>& telemetry) {
213+
telemetry->set_rate(rate, tags, value);
214+
},
215+
[](auto&&) {},
216+
},
217+
instance());
198218
}
199219
} // namespace rate
200220

201221
namespace distribution {
202222

203223
void add(const Distribution& distribution, uint64_t value) {
204224
std::visit(details::Overload{
205-
[&](Telemetry& telemetry) {
206-
telemetry.add_datapoint(distribution, value);
225+
[&](std::shared_ptr<Telemetry>& telemetry) {
226+
telemetry->add_datapoint(distribution, value);
207227
},
208228
[](auto&&) {},
209229
},
@@ -213,8 +233,8 @@ void add(const Distribution& distribution, uint64_t value) {
213233
void add(const Distribution& distribution, const std::vector<std::string>& tags,
214234
uint64_t value) {
215235
std::visit(details::Overload{
216-
[&](Telemetry& telemetry) {
217-
telemetry.add_datapoint(distribution, tags, value);
236+
[&](std::shared_ptr<Telemetry>& telemetry) {
237+
telemetry->add_datapoint(distribution, tags, value);
218238
},
219239
[](auto&&) {},
220240
},

0 commit comments

Comments
 (0)