1616//!
1717//! This is a private module, it is not exposed in the public API.
1818
19- /// A macro instrumented to generate the client request logs natively within the generated crates.
19+ use super :: RequestRecorder ;
20+
21+ use crate :: observability:: attributes:: keys:: {
22+ ERROR_TYPE , GCP_CLIENT_ARTIFACT , GCP_CLIENT_REPO , GCP_CLIENT_SERVICE , GCP_CLIENT_VERSION ,
23+ GCP_ERRORS_DOMAIN , GCP_ERRORS_METADATA , HTTP_REQUEST_METHOD , HTTP_REQUEST_RESEND_COUNT ,
24+ RPC_RESPONSE_STATUS_CODE , RPC_SERVICE , RPC_SYSTEM_NAME , SERVER_ADDRESS , SERVER_PORT , URL_FULL ,
25+ } ;
26+ use crate :: observability:: errors:: ErrorType ;
27+ use google_cloud_gax:: error:: Error ;
28+ use opentelemetry_semantic_conventions:: attribute:: { RPC_METHOD , URL_DOMAIN , URL_TEMPLATE } ;
29+ use pin_project:: pin_project;
30+ use std:: future:: Future ;
31+ use std:: pin:: Pin ;
32+ use std:: task:: { Context , Poll } ;
33+
34+ // A tentative name for the error logs.
35+ pub const NAME : & str = "experimental.client.request.error" ;
36+ // A tentative target for the error logs.
37+ pub const TARGET : & str = "experimental.client.request" ;
38+
39+ /// A future instrumented to generate the client request logs.
2040///
21- /// Decorates the `inner ` future, which represents a pending client request,
41+ /// Decorates the `F ` future, which represents a pending client request,
2242/// to emit the error logs. Typically this is used in the tracing layer:
2343///
2444/// ```ignore
3050/// req: crate::model::EchoRequest,
3151/// options: crate::RequestOptions,
3252/// ) -> Result<crate::Response<crate::model::EchoResponse>> {
53+ /// use google_cloud_gax_internal::observability::client_signals::WithClientLogging;
3354/// let pending = self.inner.echo(req, options);
34- /// google_cloud_gax_internal::with_client_logging! (pending).await
55+ /// WithClientLogging::new (pending).await
3556/// }
3657/// # }
3758/// ```
3859///
39- #[ macro_export]
40- macro_rules! with_client_logging {
41- ( $inner: expr) => { {
42- let inner_future = $inner;
43- async move {
44- let output = inner_future. await ;
45- if let Some ( snapshot) =
46- $crate:: observability:: RequestRecorder :: current( ) . map( |r| r. client_snapshot( ) )
47- {
48- if let Err ( error) = & output {
49- let gax_error: & google_cloud_gax:: error:: Error = error;
50- let rpc_status_code = gax_error. status( ) . map( |s| s. code. name( ) ) ;
51- let error_type = $crate:: observability:: errors:: ErrorType :: from_gax_error( gax_error) ;
52- let error_info = gax_error. status( ) . and_then( |s| {
53- s. details. iter( ) . find_map( |d| match d {
54- google_cloud_gax:: error:: rpc:: StatusDetails :: ErrorInfo ( i) => Some ( i) ,
55- _ => None ,
56- } )
57- } ) ;
58- let error_domain = error_info. map( |i| i. domain. as_str( ) ) ;
59- let error_metadata = error_info. and_then( |i| {
60- if i. metadata. is_empty( ) {
61- None
62- } else {
63- serde_json:: to_string( & i. metadata) . ok( )
64- }
65- } ) ;
60+ #[ must_use = "futures do nothing unless you `.await` or poll them" ]
61+ #[ pin_project]
62+ pub struct WithClientLogging < F > {
63+ #[ pin]
64+ inner : F ,
65+ }
66+
67+ impl < F , R > WithClientLogging < F >
68+ where
69+ F : Future < Output = Result < R , Error > > ,
70+ {
71+ pub fn new ( inner : F ) -> Self {
72+ Self { inner }
73+ }
74+ }
6675
67- :: tracing:: event!(
68- name: "experimental.client.request.error" ,
69- target: env!( "CARGO_PKG_NAME" ) ,
70- :: tracing:: Level :: WARN ,
71- { $crate:: observability:: attributes:: keys:: RPC_SYSTEM_NAME } = snapshot. rpc_system( ) ,
72- { $crate:: observability:: attributes:: keys:: RPC_SERVICE } = snapshot. service_name( ) ,
73- { :: opentelemetry_semantic_conventions:: attribute:: RPC_METHOD } = snapshot. rpc_method( ) ,
74- { $crate:: observability:: attributes:: keys:: GCP_CLIENT_VERSION } = snapshot. client_version( ) ,
75- { $crate:: observability:: attributes:: keys:: GCP_CLIENT_REPO } = snapshot. client_repo( ) ,
76- { $crate:: observability:: attributes:: keys:: GCP_CLIENT_ARTIFACT } = snapshot. client_artifact( ) ,
77- { :: opentelemetry_semantic_conventions:: attribute:: URL_DOMAIN } = snapshot. default_host( ) ,
78- { $crate:: observability:: attributes:: keys:: URL_FULL } = snapshot. sanitized_url( ) ,
79- { :: opentelemetry_semantic_conventions:: attribute:: URL_TEMPLATE } = snapshot. url_template( ) ,
80- { $crate:: observability:: attributes:: keys:: RPC_RESPONSE_STATUS_CODE } = rpc_status_code,
81- { $crate:: observability:: attributes:: keys:: ERROR_TYPE } = error_type. as_str( ) ,
82- { $crate:: observability:: attributes:: keys:: SERVER_ADDRESS } = snapshot. server_address( ) ,
83- { $crate:: observability:: attributes:: keys:: SERVER_PORT } = snapshot. server_port( ) as i64 ,
84- { $crate:: observability:: attributes:: keys:: HTTP_REQUEST_METHOD } = snapshot. http_method( ) ,
85- { $crate:: observability:: attributes:: keys:: HTTP_REQUEST_RESEND_COUNT } = snapshot. http_resend_count( ) . map( |v| v as i64 ) ,
86- { $crate:: observability:: attributes:: keys:: GCP_CLIENT_SERVICE } = snapshot. service_name( ) ,
87- { $crate:: observability:: attributes:: keys:: GCP_ERRORS_DOMAIN } = error_domain,
88- { $crate:: observability:: attributes:: keys:: GCP_ERRORS_METADATA } = error_metadata,
89- "{error:?}" ,
90- error = gax_error
91- ) ;
92- }
76+ impl < F , R > Future for WithClientLogging < F >
77+ where
78+ F : Future < Output = Result < R , Error > > ,
79+ {
80+ type Output = <F as Future >:: Output ;
81+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
82+ let this = self . project ( ) ;
83+ let output = futures:: ready!( this. inner. poll( cx) ) ;
84+ let Some ( snapshot) = RequestRecorder :: current ( ) . map ( |r| r. client_snapshot ( ) ) else {
85+ return Poll :: Ready ( output) ;
86+ } ;
87+ match & output {
88+ Ok ( _) => ( ) ,
89+ Err ( error) => {
90+ let rpc_status_code = error. status ( ) . map ( |s| s. code . name ( ) ) ;
91+ let error_type = ErrorType :: from_gax_error ( error) ;
92+ let error_info = error. status ( ) . and_then ( |s| {
93+ s. details . iter ( ) . find_map ( |d| match d {
94+ google_cloud_gax:: error:: rpc:: StatusDetails :: ErrorInfo ( i) => Some ( i) ,
95+ _ => None ,
96+ } )
97+ } ) ;
98+ let error_domain = error_info. map ( |i| i. domain . as_str ( ) ) ;
99+ let error_metadata = error_info. and_then ( |i| {
100+ if i. metadata . is_empty ( ) {
101+ None
102+ } else {
103+ serde_json:: to_string ( & i. metadata ) . ok ( )
104+ }
105+ } ) ;
106+
107+ // TODO(#4795) - use the correct name and target
108+ tracing:: event!(
109+ name: NAME ,
110+ target: TARGET ,
111+ tracing:: Level :: WARN ,
112+ { RPC_SYSTEM_NAME } = snapshot. rpc_system( ) ,
113+ { RPC_SERVICE } = snapshot. service_name( ) ,
114+ { RPC_METHOD } = snapshot. rpc_method( ) ,
115+ { GCP_CLIENT_VERSION } = snapshot. client_version( ) ,
116+ { GCP_CLIENT_REPO } = snapshot. client_repo( ) ,
117+ { GCP_CLIENT_ARTIFACT } = snapshot. client_artifact( ) ,
118+ { URL_DOMAIN } = snapshot. default_host( ) ,
119+ { URL_FULL } = snapshot. sanitized_url( ) ,
120+ { URL_TEMPLATE } = snapshot. url_template( ) ,
121+ { RPC_RESPONSE_STATUS_CODE } = rpc_status_code,
122+ { ERROR_TYPE } = error_type. as_str( ) ,
123+ { SERVER_ADDRESS } = snapshot. server_address( ) ,
124+ { SERVER_PORT } = snapshot. server_port( ) as i64 ,
125+ { HTTP_REQUEST_METHOD } = snapshot. http_method( ) ,
126+ { HTTP_REQUEST_RESEND_COUNT } = snapshot. http_resend_count( ) . map( |v| v as i64 ) ,
127+ { GCP_CLIENT_SERVICE } = snapshot. service_name( ) ,
128+ { GCP_ERRORS_DOMAIN } = error_domain,
129+ { GCP_ERRORS_METADATA } = error_metadata,
130+ "{error:?}"
131+ ) ;
93132 }
94- output
95133 }
96- } } ;
134+ Poll :: Ready ( output)
135+ }
97136}
98137
99138#[ cfg( test) ]
100139mod tests {
101140 use super :: super :: tests:: {
102141 TEST_INFO , TEST_METHOD , TEST_URL_TEMPLATE , recorded_request_transport_stub,
103142 } ;
104- use crate :: observability:: RequestRecorder ;
105- use google_cloud_gax:: error:: Error ;
143+ use super :: * ;
106144 use google_cloud_test_utils:: tracing:: Buffer ;
107145 use httptest:: Expectation ;
108146 use httptest:: Server ;
@@ -117,27 +155,31 @@ mod tests {
117155
118156 #[ tokio:: test]
119157 async fn no_recorder ( ) -> anyhow:: Result < ( ) > {
120- let _guard = capture_logs ( ) ; // test removed to avoid breaking things, since not generating log
158+ let ( _guard, buffer ) = capture_logs ( ) ;
121159
122- let logging = with_client_logging ! ( async { Ok :: < i32 , Error > ( 123 ) } ) ;
160+ let logging = WithClientLogging :: new ( async { Ok ( 123 ) } ) ;
123161 let got = logging. await ;
124162 assert ! ( matches!( got, Ok ( 123 ) ) , "{got:?}" ) ;
163+ let contents = String :: from_utf8 ( buffer. captured ( ) ) ?;
164+ assert ! ( contents. is_empty( ) , "{contents}" ) ;
125165 Ok ( ( ) )
126166 }
127167
128168 #[ tokio:: test]
129169 async fn ok ( ) -> anyhow:: Result < ( ) > {
130- let _guard = capture_logs ( ) ;
170+ let ( _guard, buffer ) = capture_logs ( ) ;
131171
132172 let recorder = RequestRecorder :: new ( TEST_INFO ) ;
133173 let scoped = recorder. clone ( ) ;
134- let logging = with_client_logging ! ( async {
174+ let logging = WithClientLogging :: new ( async {
135175 let _current =
136176 RequestRecorder :: current ( ) . expect ( "current recorder should be available" ) ;
137- Ok :: < i32 , Error > ( 123 )
177+ Ok ( 123 )
138178 } ) ;
139179 let got = scoped. scope ( logging) . await ;
140180 assert ! ( matches!( got, Ok ( 123 ) ) , "{got:?}" ) ;
181+ let contents = String :: from_utf8 ( buffer. captured ( ) ) ?;
182+ assert ! ( contents. is_empty( ) , "{contents}" ) ;
141183 Ok ( ( ) )
142184 }
143185
@@ -148,7 +190,7 @@ mod tests {
148190 let ( _guard, buffer) = capture_logs ( ) ;
149191 let recorder = RequestRecorder :: new ( TEST_INFO ) ;
150192 let scoped = recorder. clone ( ) ;
151- let logging = with_client_logging ! ( recorded_request_transport_stub( BAD_URL ) ) ;
193+ let logging = WithClientLogging :: new ( recorded_request_transport_stub ( BAD_URL ) ) ;
152194 let got = scoped. scope ( logging) . await ;
153195 assert ! ( got. is_err( ) , "{got:?}" ) ;
154196 let parsed = extract_captured_log ( buffer) ?;
@@ -164,7 +206,7 @@ mod tests {
164206 assert ! ( object. remove( "timestamp" ) . is_some( ) , "{parsed:?}" ) ;
165207 let want = json ! ( {
166208 "level" : "WARN" ,
167- "target" : env! ( "CARGO_PKG_NAME" ) ,
209+ "target" : "experimental.client.request" ,
168210 } ) ;
169211 assert_eq ! ( Some ( & object) , want. as_object( ) , "{parsed:?}" ) ;
170212
@@ -207,7 +249,9 @@ mod tests {
207249 let recorder = RequestRecorder :: new ( TEST_INFO ) ;
208250 let scoped = recorder. clone ( ) ;
209251 let got = scoped
210- . scope ( with_client_logging ! ( recorded_request_transport_stub( & url, ) ) )
252+ . scope ( WithClientLogging :: new ( recorded_request_transport_stub (
253+ & url,
254+ ) ) )
211255 . await ;
212256 assert ! ( matches!( got, Err ( ref e) if e. is_transport( ) ) , "{got:?}" ) ;
213257 let parsed = extract_captured_log ( buffer) ?;
@@ -224,7 +268,7 @@ mod tests {
224268 assert ! ( object. remove( "timestamp" ) . is_some( ) , "{parsed:?}" ) ;
225269 let want = json ! ( {
226270 "level" : "WARN" ,
227- "target" : env! ( "CARGO_PKG_NAME" ) ,
271+ "target" : "experimental.client.request" ,
228272 } ) ;
229273 assert_eq ! ( Some ( & object) , want. as_object( ) , "{parsed:?}" ) ;
230274
0 commit comments