Skip to content

Commit 2e23e15

Browse files
authored
Merge branch 'main' into taegyunkim/prof-14423-prof-dictinary-bench
2 parents 427e0ea + 98016ad commit 2e23e15

22 files changed

Lines changed: 402 additions & 101 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datadog-remote-config/src/fetch/multitarget.rs

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ pub trait MultiTargetHandlers<
174174

175175
struct RuntimeInfo<N: NotifyTarget> {
176176
notify_target: N,
177+
session_id: String,
177178
targets: HashMap<Arc<Target>, u32>,
178179
}
179180

@@ -212,7 +213,8 @@ where
212213

213214
fn remove_target(
214215
self: &Arc<Self>,
215-
runtime_id: &str,
216+
instance_runtime_id: &str,
217+
session_id: &str,
216218
target: &Arc<Target>,
217219
runtimes: MutexGuard<HashMap<String, RuntimeInfo<N>>>,
218220
) {
@@ -229,13 +231,19 @@ where
229231
match *status {
230232
KnownTargetStatus::Removing(ref future) => {
231233
let future = future.clone();
232-
let runtime_id = runtime_id.to_string();
234+
let instance_runtime_id = instance_runtime_id.to_string();
235+
let session_id = session_id.to_string();
233236
let this = self.clone();
234237
let target = target.clone();
235238
tokio::spawn(async move {
236239
future.await;
237240
let runtimes = this.runtimes.lock_or_panic();
238-
this.remove_target(runtime_id.as_str(), &target, runtimes);
241+
this.remove_target(
242+
instance_runtime_id.as_str(),
243+
session_id.as_str(),
244+
&target,
245+
runtimes,
246+
);
239247
});
240248
return;
241249
}
@@ -246,7 +254,7 @@ where
246254
}
247255
}
248256
1 => {
249-
known_service.runtimes.remove(runtime_id);
257+
known_service.runtimes.remove(instance_runtime_id);
250258
let mut status = known_service.status.lock_or_panic();
251259
*status = match *status {
252260
KnownTargetStatus::Pending => KnownTargetStatus::Alive, /* not really */
@@ -265,7 +273,9 @@ where
265273
0
266274
}
267275
_ => {
268-
if let Some(product_data) = known_service.runtimes.remove(runtime_id) {
276+
if let Some(product_data) =
277+
known_service.runtimes.remove(instance_runtime_id)
278+
{
269279
let mut update_product_data = false;
270280
for product in product_data.products {
271281
update_product_data =
@@ -285,15 +295,15 @@ where
285295
return;
286296
}
287297
if known_service.fetcher.runtime_id.lock_or_panic().as_str()
288-
== runtime_id
298+
== session_id
289299
{
290300
'changed_rt_id: {
291-
for (id, runtime) in runtimes.iter() {
301+
for (_, runtime) in runtimes.iter() {
292302
if runtime.targets.len() == 1
293303
&& runtime.targets.contains_key(target)
294304
{
295305
*known_service.fetcher.runtime_id.lock_or_panic() =
296-
Arc::new(id.to_string());
306+
Arc::new(runtime.session_id.clone());
297307
break 'changed_rt_id;
298308
}
299309
}
@@ -317,6 +327,7 @@ where
317327
self: &Arc<Self>,
318328
synthetic_id: bool,
319329
runtime_id: &str,
330+
session_id: &str,
320331
target: Arc<Target>,
321332
product_capabilities: ProductCapabilities,
322333
) {
@@ -351,12 +362,14 @@ where
351362
// Avoid deadlocking between known_target.status and self.services
352363
self.pending_async_insertions.fetch_add(1, Ordering::AcqRel);
353364
let runtime_id = runtime_id.to_string();
365+
let session_id = session_id.to_string();
354366
let this = self.clone();
355367
tokio::spawn(async move {
356368
future.await;
357369
this.add_target(
358370
synthetic_id,
359371
runtime_id.as_str(),
372+
session_id.as_str(),
360373
target,
361374
product_capabilities,
362375
);
@@ -398,7 +411,7 @@ where
398411
.insert(runtime_id.to_string(), product_capabilities);
399412
if !synthetic_id && known_target.synthetic_id {
400413
known_target.synthetic_id = false;
401-
*known_target.fetcher.runtime_id.lock_or_panic() = Arc::new(runtime_id.into());
414+
*known_target.fetcher.runtime_id.lock_or_panic() = Arc::new(session_id.into());
402415
}
403416
}
404417
Entry::Vacant(e) => {
@@ -433,7 +446,7 @@ where
433446
if synthetic_id {
434447
Self::generate_synthetic_id()
435448
} else {
436-
runtime_id.into()
449+
session_id.into()
437450
},
438451
ConfigProductCapabilities::new(
439452
product_capabilities.products,
@@ -448,6 +461,7 @@ where
448461

449462
pub fn add_runtime(
450463
self: &Arc<Self>,
464+
session_id: String,
451465
runtime_id: String,
452466
notify_target: N,
453467
target: &Arc<Target>,
@@ -480,6 +494,7 @@ where
480494
self.add_target(
481495
true,
482496
runtime_entry.key(),
497+
session_id.as_str(),
483498
target.clone(),
484499
product_capabilities,
485500
);
@@ -498,16 +513,28 @@ where
498513
{
499514
let info = RuntimeInfo {
500515
notify_target,
516+
session_id: session_id.clone(),
501517
targets: HashMap::from([(target.clone(), 1)]),
502518
};
503-
self.add_target(false, e.key(), target.clone(), product_capabilities);
519+
self.add_target(
520+
false,
521+
e.key(),
522+
session_id.as_str(),
523+
target.clone(),
524+
product_capabilities,
525+
);
504526
e.insert(info);
505527
}
506528
}
507529
}
508530
}
509531

510-
pub fn delete_runtime(self: &Arc<Self>, runtime_id: &str, target: &Arc<Target>) {
532+
pub fn delete_runtime(
533+
self: &Arc<Self>,
534+
runtime_id: &str,
535+
session_id: &str,
536+
target: &Arc<Target>,
537+
) {
511538
trace!("Removing remote config runtime: {target:?} with runtime id {runtime_id}");
512539
let mut runtimes = self.runtimes.lock_or_panic();
513540
let last_removed = {
@@ -531,7 +558,7 @@ where
531558
if last_removed {
532559
runtimes.remove(runtime_id);
533560
}
534-
Self::remove_target(self, runtime_id, target, runtimes);
561+
Self::remove_target(self, runtime_id, session_id, target, runtimes);
535562
}
536563

537564
/// Sets the apply state on a stored file.
@@ -935,6 +962,7 @@ mod tests {
935962
fetcher.remote_config_interval.store(1000, Ordering::SeqCst);
936963

937964
fetcher.add_runtime(
965+
RT_ID_1.to_string(),
938966
RT_ID_1.to_string(),
939967
Notifier {
940968
id: 1,
@@ -962,6 +990,7 @@ mod tests {
962990
);
963991

964992
fetcher.add_runtime(
993+
RT_ID_1.to_string(),
965994
RT_ID_1.to_string(),
966995
Notifier {
967996
id: 1,
@@ -974,6 +1003,7 @@ mod tests {
9741003
},
9751004
);
9761005
fetcher.add_runtime(
1006+
RT_ID_2.to_string(),
9771007
RT_ID_2.to_string(),
9781008
Notifier {
9791009
id: 2,
@@ -1019,6 +1049,7 @@ mod tests {
10191049
assert_eq!(fetcher.services.lock().unwrap().len(), 2); // two fetchers
10201050

10211051
fetcher.add_runtime(
1052+
RT_ID_3.to_string(),
10221053
RT_ID_3.to_string(),
10231054
Notifier {
10241055
id: 3,
@@ -1080,10 +1111,10 @@ mod tests {
10801111
);
10811112
}
10821113

1083-
fetcher.delete_runtime(RT_ID_1, &OTHER_TARGET);
1084-
fetcher.delete_runtime(RT_ID_1, &DUMMY_TARGET);
1085-
fetcher.delete_runtime(RT_ID_2, &DUMMY_TARGET);
1086-
fetcher.delete_runtime(RT_ID_3, &OTHER_TARGET);
1114+
fetcher.delete_runtime(RT_ID_1, RT_ID_1, &OTHER_TARGET);
1115+
fetcher.delete_runtime(RT_ID_1, RT_ID_1, &DUMMY_TARGET);
1116+
fetcher.delete_runtime(RT_ID_2, RT_ID_2, &DUMMY_TARGET);
1117+
fetcher.delete_runtime(RT_ID_3, RT_ID_3, &OTHER_TARGET);
10871118

10881119
fetcher.shutdown();
10891120
storage.expect_expiration(&DUMMY_TARGET);

datadog-sidecar-ffi/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,7 @@ pub unsafe extern "C" fn ddog_sidecar_set_universal_service_tags(
13501350
app_version: ffi::CharSlice,
13511351
global_tags: &libdd_common_ffi::Vec<Tag>,
13521352
dynamic_instrumentation_state: DynamicInstrumentationConfigState,
1353+
remote_config_generation: u64,
13531354
) -> MaybeError {
13541355
try_c!(blocking::set_universal_service_tags(
13551356
transport,
@@ -1360,6 +1361,7 @@ pub unsafe extern "C" fn ddog_sidecar_set_universal_service_tags(
13601361
app_version.to_utf8_lossy().into(),
13611362
global_tags.to_vec(),
13621363
dynamic_instrumentation_state,
1364+
remote_config_generation,
13631365
));
13641366

13651367
MaybeError::None

datadog-sidecar/src/one_way_shared_memory.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,17 @@ impl<T: FileBackedHandle + From<MappedMem<T>>, D> OneWayShmReader<T, D> {
8282
extra,
8383
}
8484
}
85+
86+
/// Returns the generation of the last successfully read data, or 0 if nothing has been read.
87+
pub fn last_read_generation(&self) -> u64 {
88+
self.current_data
89+
.as_ref()
90+
.map(|d| {
91+
let source_data: &RawData = d.as_slice().into();
92+
source_data.meta.generation.load(Ordering::Acquire)
93+
})
94+
.unwrap_or(0)
95+
}
8596
}
8697

8798
impl<T: FileBackedHandle + From<MappedMem<T>>> OneWayShmWriter<T> {
@@ -225,4 +236,10 @@ impl<T: FileBackedHandle + From<MappedMem<T>>> OneWayShmWriter<T> {
225236
pub fn size(&self) -> usize {
226237
self.handle.lock_or_panic().as_slice().len()
227238
}
239+
240+
pub fn current_generation(&self) -> u64 {
241+
let mapped = self.handle.lock_or_panic();
242+
let data = unsafe { &*(mapped.as_slice() as *const [u8] as *const RawData) };
243+
data.meta.generation.load(Ordering::Acquire)
244+
}
228245
}

datadog-sidecar/src/service/blocking.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ pub fn set_universal_service_tags(
375375
app_version: String,
376376
global_tags: Vec<Tag>,
377377
dynamic_instrumentation_state: DynamicInstrumentationConfigState,
378+
remote_config_generation: u64,
378379
) -> io::Result<()> {
379380
lock_sender(transport)?.set_universal_service_tags(
380381
instance_id.clone(),
@@ -384,6 +385,7 @@ pub fn set_universal_service_tags(
384385
app_version,
385386
global_tags,
386387
dynamic_instrumentation_state,
388+
remote_config_generation,
387389
);
388390
Ok(())
389391
}

datadog-sidecar/src/service/remote_configs.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use crate::service::DynamicInstrumentationConfigState;
4+
use crate::service::{DynamicInstrumentationConfigState, InstanceId};
55
use crate::shm_remote_config::{ShmRemoteConfigs, ShmRemoteConfigsGuard};
66
use datadog_remote_config::fetch::{
77
ConfigInvariants, ConfigOptions, MultiTargetStats, NotifyTarget, ProductCapabilities,
@@ -103,7 +103,8 @@ impl RemoteConfigs {
103103
&self,
104104
options: ConfigOptions,
105105
poll_interval: Duration,
106-
runtime_id: String,
106+
instance_id: InstanceId,
107+
remote_config_generation: u64,
107108
notify_target: RemoteConfigNotifyTarget,
108109
env: String,
109110
service: String,
@@ -138,7 +139,8 @@ impl RemoteConfigs {
138139
}
139140
}
140141
.add_runtime(
141-
runtime_id,
142+
instance_id,
143+
remote_config_generation,
142144
notify_target,
143145
env,
144146
service,

datadog-sidecar/src/service/runtime_info.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ impl ActiveApplication {
133133
&mut self,
134134
remote_configs: &RemoteConfigs,
135135
session: &SessionInfo,
136+
instance_id: InstanceId,
137+
remote_config_generation: u64,
136138
notify_target: RemoteConfigNotifyTarget,
137139
dynamic_instrumentation_state: DynamicInstrumentationConfigState,
138140
) {
@@ -149,7 +151,8 @@ impl ActiveApplication {
149151
remote_configs.add_runtime(
150152
options,
151153
*session.remote_config_interval.lock_or_panic(),
152-
session.session_id.clone(),
154+
instance_id,
155+
remote_config_generation,
153156
notify_target,
154157
self.env.clone().expect("set_metadata was called before"),
155158
self.service_name

datadog-sidecar/src/service/sender.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ impl SidecarSender {
246246
app_version: String,
247247
global_tags: Vec<Tag>,
248248
dynamic_instrumentation_state: DynamicInstrumentationConfigState,
249+
remote_config_generation: u64,
249250
) {
250251
coalesce(
251252
&mut self.outbox,
@@ -257,6 +258,7 @@ impl SidecarSender {
257258
app_version,
258259
global_tags,
259260
dynamic_instrumentation_state,
261+
remote_config_generation,
260262
},
261263
);
262264
self.try_drain_outbox();

datadog-sidecar/src/service/sidecar_interface.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ pub trait SidecarInterface {
177177
/// * `global_tags` - Global tags which need to be propagated.
178178
/// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or
179179
/// not set.
180+
/// * `remote_config_generation` - The SHM reader generation last read by the client (0 if
181+
/// unread).
180182
async fn set_universal_service_tags(
181183
instance_id: InstanceId,
182184
queue_id: QueueId,
@@ -185,6 +187,7 @@ pub trait SidecarInterface {
185187
app_version: String,
186188
global_tags: Vec<Tag>,
187189
dynamic_instrumentation_state: DynamicInstrumentationConfigState,
190+
remote_config_generation: u64,
188191
);
189192

190193
/// Sets request state which does not directly affect the RC connection.

0 commit comments

Comments
 (0)