11// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22// SPDX-License-Identifier: Apache-2.0
33
4+ use http_body_util:: BodyExt ;
45use hyper:: service:: service_fn;
56use hyper:: { http, Method , Response , StatusCode } ;
67use libdd_common:: hyper_migration;
@@ -12,7 +13,8 @@ use std::time::Instant;
1213use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
1314use tracing:: { debug, error} ;
1415
15- use crate :: http_utils:: log_and_create_http_response;
16+ use crate :: http_utils:: { log_and_create_http_response, verify_request_content_length} ;
17+ use crate :: proxy_flusher:: { ProxyFlusher , ProxyRequest } ;
1618use crate :: { config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor} ;
1719use libdd_trace_protobuf:: pb;
1820use libdd_trace_utils:: trace_utils;
@@ -22,8 +24,10 @@ const MINI_AGENT_PORT: usize = 8126;
2224const TRACE_ENDPOINT_PATH : & str = "/v0.4/traces" ;
2325const STATS_ENDPOINT_PATH : & str = "/v0.6/stats" ;
2426const INFO_ENDPOINT_PATH : & str = "/info" ;
27+ const PROFILING_ENDPOINT_PATH : & str = "/profiling/v1/input" ;
2528const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE : usize = 10 ;
2629const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE : usize = 10 ;
30+ const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE : usize = 10 ;
2731
2832pub struct MiniAgent {
2933 pub config : Arc < config:: Config > ,
@@ -32,6 +36,7 @@ pub struct MiniAgent {
3236 pub stats_processor : Arc < dyn stats_processor:: StatsProcessor + Send + Sync > ,
3337 pub stats_flusher : Arc < dyn stats_flusher:: StatsFlusher + Send + Sync > ,
3438 pub env_verifier : Arc < dyn env_verifier:: EnvVerifier + Send + Sync > ,
39+ pub proxy_flusher : Arc < ProxyFlusher > ,
3540}
3641
3742impl MiniAgent {
@@ -42,7 +47,7 @@ impl MiniAgent {
4247 let mini_agent_metadata = Arc :: new (
4348 self . env_verifier
4449 . verify_environment (
45- self . config . verify_env_timeout ,
50+ self . config . verify_env_timeout_ms ,
4651 & self . config . env_type ,
4752 & self . config . os ,
4853 )
@@ -82,6 +87,17 @@ impl MiniAgent {
8287 . await ;
8388 } ) ;
8489
90+ // channels to send processed profiling requests to our proxy flusher
91+ let ( proxy_tx, proxy_rx) : ( Sender < ProxyRequest > , Receiver < ProxyRequest > ) =
92+ mpsc:: channel ( PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE ) ;
93+
94+ // start our proxy flusher for profiling requests
95+ let proxy_flusher = self . proxy_flusher . clone ( ) ;
96+ tokio:: spawn ( async move {
97+ let proxy_flusher = proxy_flusher. clone ( ) ;
98+ proxy_flusher. start_proxy_flusher ( proxy_rx) . await ;
99+ } ) ;
100+
85101 // setup our hyper http server, where the endpoint_handler handles incoming requests
86102 let trace_processor = self . trace_processor . clone ( ) ;
87103 let stats_processor = self . stats_processor . clone ( ) ;
@@ -96,14 +112,17 @@ impl MiniAgent {
96112 let endpoint_config = endpoint_config. clone ( ) ;
97113 let mini_agent_metadata = Arc :: clone ( & mini_agent_metadata) ;
98114
115+ let proxy_tx = proxy_tx. clone ( ) ;
116+
99117 MiniAgent :: trace_endpoint_handler (
100118 endpoint_config,
101119 req. map ( hyper_migration:: Body :: incoming) ,
102- trace_processor,
103- trace_tx,
104- stats_processor,
105- stats_tx,
106- mini_agent_metadata,
120+ trace_processor. clone ( ) ,
121+ trace_tx. clone ( ) ,
122+ stats_processor. clone ( ) ,
123+ stats_tx. clone ( ) ,
124+ Arc :: clone ( & mini_agent_metadata) ,
125+ proxy_tx. clone ( ) ,
107126 )
108127 } ) ;
109128
@@ -167,6 +186,7 @@ impl MiniAgent {
167186 stats_processor : Arc < dyn stats_processor:: StatsProcessor + Send + Sync > ,
168187 stats_tx : Sender < pb:: ClientStatsPayload > ,
169188 mini_agent_metadata : Arc < trace_utils:: MiniAgentMetadata > ,
189+ proxy_tx : Sender < ProxyRequest > ,
170190 ) -> http:: Result < hyper_migration:: HttpResponse > {
171191 match ( req. method ( ) , req. uri ( ) . path ( ) ) {
172192 ( & Method :: PUT | & Method :: POST , TRACE_ENDPOINT_PATH ) => {
@@ -190,6 +210,15 @@ impl MiniAgent {
190210 ) ,
191211 }
192212 }
213+ ( & Method :: POST , PROFILING_ENDPOINT_PATH ) => {
214+ match Self :: profiling_proxy_handler ( config, req, proxy_tx) . await {
215+ Ok ( res) => Ok ( res) ,
216+ Err ( err) => log_and_create_http_response (
217+ & format ! ( "Error processing profiling request: {err}" ) ,
218+ StatusCode :: INTERNAL_SERVER_ERROR ,
219+ ) ,
220+ }
221+ }
193222 ( _, INFO_ENDPOINT_PATH ) => match Self :: info_handler ( config. dd_dogstatsd_port ) {
194223 Ok ( res) => Ok ( res) ,
195224 Err ( err) => log_and_create_http_response (
@@ -205,13 +234,67 @@ impl MiniAgent {
205234 }
206235 }
207236
237+ /// Handles incoming proxy requests for profiling - can be abstracted into a generic proxy handler for other proxy requests in the future
238+ async fn profiling_proxy_handler (
239+ config : Arc < config:: Config > ,
240+ request : hyper_migration:: HttpRequest ,
241+ proxy_tx : Sender < ProxyRequest > ,
242+ ) -> http:: Result < hyper_migration:: HttpResponse > {
243+ debug ! ( "Trace Agent | Received profiling request" ) ;
244+
245+ // Extract headers and body
246+ let ( parts, body) = request. into_parts ( ) ;
247+ if let Some ( response) = verify_request_content_length (
248+ & parts. headers ,
249+ config. max_request_content_length ,
250+ "Error processing profiling request" ,
251+ ) {
252+ return response;
253+ }
254+
255+ let body_bytes = match body. collect ( ) . await {
256+ Ok ( collected) => collected. to_bytes ( ) ,
257+ Err ( e) => {
258+ return log_and_create_http_response (
259+ & format ! ( "Error reading profiling request body: {e}" ) ,
260+ StatusCode :: BAD_REQUEST ,
261+ ) ;
262+ }
263+ } ;
264+
265+ // Create proxy request
266+ let proxy_request = ProxyRequest {
267+ headers : parts. headers ,
268+ body : body_bytes,
269+ target_url : config. profiling_intake . url . to_string ( ) ,
270+ } ;
271+
272+ debug ! (
273+ "Trace Agent | Sending profiling request to channel, target: {}" ,
274+ proxy_request. target_url
275+ ) ;
276+
277+ // Send to channel
278+ match proxy_tx. send ( proxy_request) . await {
279+ Ok ( _) => log_and_create_http_response (
280+ "Successfully buffered profiling request to be flushed" ,
281+ StatusCode :: OK ,
282+ ) ,
283+ Err ( err) => log_and_create_http_response (
284+ & format ! ( "Error sending profiling request to the proxy flusher: {err}" ) ,
285+ StatusCode :: INTERNAL_SERVER_ERROR ,
286+ ) ,
287+ }
288+ }
289+
208290 fn info_handler ( dd_dogstatsd_port : u16 ) -> http:: Result < hyper_migration:: HttpResponse > {
209291 let response_json = json ! (
210292 {
211293 "endpoints" : [
212294 TRACE_ENDPOINT_PATH ,
213295 STATS_ENDPOINT_PATH ,
214- INFO_ENDPOINT_PATH
296+ INFO_ENDPOINT_PATH ,
297+ PROFILING_ENDPOINT_PATH
215298 ] ,
216299 "client_drop_p0s" : true ,
217300 "config" : {
0 commit comments