44#if NETFRAMEWORK
55using System . Net . Http ;
66#endif
7+ using System . Buffers . Binary ;
78using System . Diagnostics . Tracing ;
9+ using System . IO . Compression ;
810using System . Net . Http . Headers ;
11+ using System . Net . Sockets ;
912using OpenTelemetry . Exporter . OpenTelemetryProtocol . Implementation . ExportClient . Grpc ;
1013
1114namespace OpenTelemetry . Exporter . OpenTelemetryProtocol . Implementation . ExportClient ;
@@ -14,6 +17,12 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
1417internal sealed class OtlpGrpcExportClient : OtlpExportClient
1518{
1619 public const string GrpcStatusDetailsHeader = "grpc-status-details-bin" ;
20+
21+ // A gRPC message frame header is 5 bytes:
22+ // byte 0 - Compression flag (0 = not compressed, 1 = compressed).
23+ // bytes 1-4 - Message length in big-endian format.
24+ private const int GrpcMessageHeaderSize = 5 ;
25+
1726 private static readonly ExportClientHttpResponse SuccessExportResponse = new ( success : true , deadlineUtc : default , response : null , exception : null ) ;
1827 private static readonly MediaTypeHeaderValue MediaHeaderValue = new ( "application/grpc" ) ;
1928
@@ -25,6 +34,10 @@ private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrp
2534 status : null ,
2635 grpcStatusDetailsHeader : null ) ;
2736
37+ #if ! NET
38+ private static readonly byte [ ] GrpcFrameHeader = [ 0 , 0 , 0 , 0 , 0 ] ;
39+ #endif
40+
2841 public OtlpGrpcExportClient ( OtlpExporterOptions options , HttpClient httpClient , string signalPath )
2942 : base ( options , httpClient , signalPath )
3043 {
@@ -37,6 +50,11 @@ public OtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient,
3750 // We need the entire response content to ensure that the response trailers are received
3851 internal override HttpCompletionOption CompletionOption => HttpCompletionOption . ResponseContentRead ;
3952
53+ #if NET
54+ // See https://vcsjones.dev/csharp-readonly-span-bytes-static/
55+ private static ReadOnlySpan < byte > GrpcFrameHeader => [ 0 , 0 , 0 , 0 , 0 ] ;
56+ #endif
57+
4058 /// <inheritdoc/>
4159 public override ExportClientResponse SendExportRequest ( byte [ ] buffer , int contentLength , DateTime deadlineUtc , CancellationToken cancellationToken = default )
4260 {
@@ -50,6 +68,12 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten
5068 // A missing TE header results in servers aborting the gRPC call.
5169 httpRequest . Headers . TryAddWithoutValidation ( "TE" , "trailers" ) ;
5270
71+ if ( this . CompressionEnabled )
72+ {
73+ httpRequest . Headers . Remove ( "grpc-encoding" ) ;
74+ httpRequest . Headers . TryAddWithoutValidation ( "grpc-encoding" , "gzip" ) ;
75+ }
76+
5377 httpResponse = this . SendHttpRequest ( httpRequest , cancellationToken ) ;
5478
5579 httpResponse . EnsureSuccessStatusCode ( ) ;
@@ -173,10 +197,54 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten
173197 }
174198 }
175199
200+ protected override HttpContent CreateHttpContent ( byte [ ] buffer , int contentLength )
201+ {
202+ if ( ! this . CompressionEnabled )
203+ {
204+ return base . CreateHttpContent ( buffer , contentLength ) ;
205+ }
206+
207+ // Build a gzip-compressed gRPC message frame:
208+ // byte 0 - Compression flag = 1 (gzip).
209+ // bytes 1-4 - Compressed payload length in big-endian format.
210+ // bytes 5+ - Gzip-compressed protobuf payload.
211+ #if NET
212+ var compressedStream = new PooledBufferStream ( ) ;
213+ #else
214+ var compressedStream = new MemoryStream ( ) ;
215+ #endif
216+
217+ // Reserve space for the gRPC frame header.
218+ #if NET
219+ compressedStream . Write ( GrpcFrameHeader ) ;
220+ #else
221+ compressedStream . Write ( GrpcFrameHeader , 0 , GrpcFrameHeader . Length ) ;
222+ #endif
223+
224+ using ( var gzipStream = new GZipStream ( compressedStream , CompressionLevel . Fastest , leaveOpen : true ) )
225+ {
226+ gzipStream . Write ( buffer , GrpcMessageHeaderSize , contentLength - GrpcMessageHeaderSize ) ;
227+ }
228+
229+ var compressedPayloadLength = ( uint ) ( compressedStream . Length - GrpcMessageHeaderSize ) ;
230+
231+ // Write the gRPC frame header: compression flag + big-endian payload length.
232+ compressedStream . Position = 0 ;
233+ compressedStream . WriteByte ( 1 ) ;
234+
235+ var lengthBytes = new byte [ 4 ] ;
236+ BinaryPrimitives . WriteUInt32BigEndian ( lengthBytes , compressedPayloadLength ) ;
237+ compressedStream . Write ( lengthBytes , 0 , 4 ) ;
238+
239+ compressedStream . Position = 0 ;
240+
241+ OpenTelemetryProtocolExporterEventSource . Log . CompressedGrpcPayload ( "gzip" , contentLength , compressedStream . Length ) ;
242+
243+ var content = new StreamContent ( compressedStream ) ;
244+ content . Headers . ContentType = this . MediaTypeHeader ;
245+ return content ;
246+ }
247+
176248 private static bool IsTransientNetworkError ( HttpRequestException ex ) =>
177- ex . InnerException is System . Net . Sockets . SocketException socketEx
178- && ( socketEx . SocketErrorCode == System . Net . Sockets . SocketError . TimedOut
179- || socketEx . SocketErrorCode == System . Net . Sockets . SocketError . ConnectionReset
180- || socketEx . SocketErrorCode == System . Net . Sockets . SocketError . HostUnreachable
181- || socketEx . SocketErrorCode == System . Net . Sockets . SocketError . ConnectionRefused ) ;
249+ ex . InnerException is SocketException { SocketErrorCode : SocketError . TimedOut or SocketError . ConnectionReset or SocketError . HostUnreachable or SocketError . ConnectionRefused } ;
182250}
0 commit comments