|
3 | 3 | //! Provides utilities to fetch the agent /info endpoint and an automatic fetcher to keep info |
4 | 4 | //! up-to-date |
5 | 5 |
|
6 | | -use super::{schema::AgentInfo, AGENT_INFO_CACHE}; |
| 6 | +use super::{ |
| 7 | + schema::{AgentInfo, AgentInfoStruct}, |
| 8 | + AGENT_INFO_CACHE, |
| 9 | +}; |
7 | 10 | use anyhow::{anyhow, Result}; |
8 | 11 | use http::header::HeaderName; |
9 | 12 | use http_body_util::BodyExt; |
@@ -31,23 +34,37 @@ pub enum FetchInfoStatus { |
31 | 34 | /// If the state hash is different from the current one: |
32 | 35 | /// - Return a `FetchInfoStatus::NewState` of the info struct |
33 | 36 | /// - Else return `FetchInfoStatus::SameState` |
34 | | -pub async fn fetch_info_with_state( |
| 37 | +async fn fetch_info_with_state_and_container( |
35 | 38 | info_endpoint: &Endpoint, |
36 | 39 | current_state_hash: Option<&str>, |
| 40 | + current_container_tags_hash: Option<&str>, |
37 | 41 | ) -> Result<FetchInfoStatus> { |
38 | | - let (new_state_hash, body_data) = fetch_and_hash_response(info_endpoint).await?; |
| 42 | + let (new_state_hash, body_data, container_tags_hash) = |
| 43 | + fetch_and_hash_response(info_endpoint).await?; |
39 | 44 |
|
40 | | - if current_state_hash.is_some_and(|state| state == new_state_hash) { |
| 45 | + if current_state_hash.is_some_and(|state| state == new_state_hash) |
| 46 | + && current_container_tags_hash == container_tags_hash.as_deref() |
| 47 | + { |
41 | 48 | return Ok(FetchInfoStatus::SameState); |
42 | 49 | } |
43 | 50 |
|
| 51 | + let mut info_struct: AgentInfoStruct = serde_json::from_slice(&body_data)?; |
| 52 | + info_struct.container_tags_hash = container_tags_hash; |
| 53 | + |
44 | 54 | let info = Box::new(AgentInfo { |
45 | 55 | state_hash: new_state_hash, |
46 | | - info: serde_json::from_slice(&body_data)?, |
| 56 | + info: info_struct, |
47 | 57 | }); |
48 | 58 | Ok(FetchInfoStatus::NewState(info)) |
49 | 59 | } |
50 | 60 |
|
| 61 | +pub async fn fetch_info_with_state( |
| 62 | + info_endpoint: &Endpoint, |
| 63 | + current_state_hash: Option<&str>, |
| 64 | +) -> Result<FetchInfoStatus> { |
| 65 | + fetch_info_with_state_and_container(info_endpoint, current_state_hash, None).await |
| 66 | +} |
| 67 | + |
51 | 68 | /// Fetch the info endpoint once and return the info. |
52 | 69 | /// |
53 | 70 | /// Can be used for one-time access to the agent's info. If you need to access the info several |
@@ -78,21 +95,30 @@ pub async fn fetch_info(info_endpoint: &Endpoint) -> Result<Box<AgentInfo>> { |
78 | 95 |
|
79 | 96 | /// Fetch and hash the response from the agent info endpoint. |
80 | 97 | /// |
81 | | -/// Returns a tuple of (state_hash, response_body_bytes). |
| 98 | +/// Returns a tuple of (state_hash, response_body_bytes, container_tags_hash). |
82 | 99 | /// The hash is calculated using SHA256 to match the agent's calculation method. |
83 | | -async fn fetch_and_hash_response(info_endpoint: &Endpoint) -> Result<(String, bytes::Bytes)> { |
| 100 | +async fn fetch_and_hash_response( |
| 101 | + info_endpoint: &Endpoint, |
| 102 | +) -> Result<(String, bytes::Bytes, Option<String>)> { |
84 | 103 | let req = info_endpoint |
85 | 104 | .to_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))? |
86 | 105 | .method(http::Method::GET) |
87 | 106 | .body(http_common::Body::empty()); |
88 | 107 | let client = http_common::new_default_client(); |
89 | 108 | let res = client.request(req?).await?; |
90 | 109 |
|
| 110 | + // Extract the Datadog-Container-Tags-Hash header |
| 111 | + let container_tags_hash = res |
| 112 | + .headers() |
| 113 | + .get("Datadog-Container-Tags-Hash") |
| 114 | + .and_then(|v| v.to_str().ok()) |
| 115 | + .map(|s| s.to_string()); |
| 116 | + |
91 | 117 | let body_bytes = res.into_body().collect().await?; |
92 | 118 | let body_data = body_bytes.to_bytes(); |
93 | 119 | let hash = format!("{:x}", Sha256::digest(&body_data)); |
94 | 120 |
|
95 | | - Ok((hash, body_data)) |
| 121 | + Ok((hash, body_data, container_tags_hash)) |
96 | 122 | } |
97 | 123 |
|
98 | 124 | /// Fetch the info endpoint and update an ArcSwap keeping it up-to-date. |
@@ -224,7 +250,15 @@ impl AgentInfoFetcher { |
224 | 250 | async fn fetch_and_update(&self) { |
225 | 251 | let current_info = AGENT_INFO_CACHE.load(); |
226 | 252 | let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str()); |
227 | | - let res = fetch_info_with_state(&self.info_endpoint, current_hash).await; |
| 253 | + let current_container_tags_hash = current_info |
| 254 | + .as_ref() |
| 255 | + .and_then(|info| info.info.container_tags_hash.as_deref()); |
| 256 | + let res = fetch_info_with_state_and_container( |
| 257 | + &self.info_endpoint, |
| 258 | + current_hash, |
| 259 | + current_container_tags_hash, |
| 260 | + ) |
| 261 | + .await; |
228 | 262 | match res { |
229 | 263 | Ok(FetchInfoStatus::NewState(new_info)) => { |
230 | 264 | debug!("New /info state received"); |
@@ -406,6 +440,41 @@ mod single_threaded_tests { |
406 | 440 | assert!(matches!(same_state_info_status, FetchInfoStatus::SameState)); |
407 | 441 | } |
408 | 442 |
|
| 443 | + #[cfg_attr(miri, ignore)] |
| 444 | + #[tokio::test] |
| 445 | + async fn test_fetch_info_with_same_state_but_different_container_tags_hash() { |
| 446 | + let server = MockServer::start(); |
| 447 | + let mock = server |
| 448 | + .mock_async(|when, then| { |
| 449 | + when.path("/info"); |
| 450 | + then.status(200) |
| 451 | + .header("content-type", "application/json") |
| 452 | + .header("Datadog-Container-Tags-Hash", "new-container-hash") |
| 453 | + .body(TEST_INFO); |
| 454 | + }) |
| 455 | + .await; |
| 456 | + let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap()); |
| 457 | + |
| 458 | + let info_status = fetch_info_with_state_and_container( |
| 459 | + &endpoint, |
| 460 | + Some(TEST_INFO_HASH), |
| 461 | + Some("old-container-hash"), |
| 462 | + ) |
| 463 | + .await |
| 464 | + .unwrap(); |
| 465 | + |
| 466 | + mock.assert(); |
| 467 | + assert!( |
| 468 | + matches!(info_status, FetchInfoStatus::NewState(info) if *info == AgentInfo { |
| 469 | + state_hash: TEST_INFO_HASH.to_string(), |
| 470 | + info: AgentInfoStruct { |
| 471 | + container_tags_hash: Some("new-container-hash".to_string()), |
| 472 | + ..serde_json::from_str(TEST_INFO).unwrap() |
| 473 | + }, |
| 474 | + }) |
| 475 | + ); |
| 476 | + } |
| 477 | + |
409 | 478 | #[cfg_attr(miri, ignore)] |
410 | 479 | #[tokio::test] |
411 | 480 | async fn test_fetch_info() { |
|
0 commit comments