Skip to content

Commit 722fb4f

Browse files
authored
Merge pull request #220 from benthecarman/fix-bounded-grpc-stream-frames
Bound gRPC frame sizes
2 parents 1e9e9ba + f9d1f46 commit 722fb4f

3 files changed

Lines changed: 263 additions & 15 deletions

File tree

ldk-server-client/src/client.rs

Lines changed: 140 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,16 @@ use crate::error::LdkServerErrorCode::{
7171

7272
type StreamingClient = HyperClient<HttpsConnector<hyper::client::HttpConnector>, HyperBody>;
7373

74+
const GRPC_FRAME_HEADER_LEN: usize = 5;
75+
76+
// Applies to complete unary gRPC responses. The server applies the same cap to unary request
77+
// bodies before protobuf decoding.
78+
const MAX_GRPC_UNARY_RESPONSE_LEN: usize = 10 * 1024 * 1024;
79+
80+
// Applies to each server-streaming gRPC message. Graph RPCs use the unary client path and are not
81+
// constrained by this limit.
82+
const MAX_GRPC_STREAM_MESSAGE_LEN: usize = 4 * 1024 * 1024;
83+
7484
/// Client to access a hosted instance of LDK Server via gRPC.
7585
///
7686
/// The client requires the server's TLS certificate to be provided for verification.
@@ -426,6 +436,7 @@ impl LdkServerClient {
426436
&self, request: &Rq, method: &str,
427437
) -> Result<Rs, LdkServerError> {
428438
let grpc_body = encode_grpc_frame(&request.encode_to_vec()).to_vec();
439+
let content_length = grpc_body.len().to_string();
429440

430441
let url = format!("https://{}{}{}", self.base_url, GRPC_SERVICE_PREFIX, method);
431442
let auth_header = self.compute_auth_header(&grpc_body);
@@ -434,6 +445,7 @@ impl LdkServerClient {
434445
.client
435446
.post(&url)
436447
.header("content-type", "application/grpc+proto")
448+
.header("content-length", content_length)
437449
.header("te", "trailers")
438450
.header("x-auth", auth_header)
439451
.body(grpc_body)
@@ -450,10 +462,7 @@ impl LdkServerClient {
450462
return Err(error);
451463
}
452464

453-
// Read the response body
454-
let payload = response.bytes().await.map_err(|e| {
455-
LdkServerError::new(InternalError, format!("Failed to read response body: {}", e))
456-
})?;
465+
let payload = read_grpc_unary_response_body(response).await?;
457466

458467
let proto_bytes = decode_grpc_body(&payload)
459468
.map_err(|e| LdkServerError::new(InternalError, e.message))?;
@@ -469,6 +478,7 @@ impl LdkServerClient {
469478
&self, request: &Rq, method: &str,
470479
) -> Result<GrpcStream<Rs>, LdkServerError> {
471480
let grpc_body = encode_grpc_frame(&request.encode_to_vec()).to_vec();
481+
let content_length = grpc_body.len().to_string();
472482

473483
let url = format!("https://{}{}{}", self.base_url, GRPC_SERVICE_PREFIX, method);
474484
let auth_header = self.compute_auth_header(&grpc_body);
@@ -479,6 +489,7 @@ impl LdkServerClient {
479489
HyperRequest::post(&url)
480490
.version(Version::HTTP_2)
481491
.header("content-type", "application/grpc+proto")
492+
.header("content-length", content_length)
482493
.header("te", "trailers")
483494
.header("x-auth", auth_header)
484495
.body(HyperBody::from(grpc_body))
@@ -508,6 +519,42 @@ impl LdkServerClient {
508519
}
509520
}
510521

522+
async fn read_grpc_unary_response_body(
523+
mut response: reqwest::Response,
524+
) -> Result<Vec<u8>, LdkServerError> {
525+
let capacity = if let Some(content_length) = response.content_length() {
526+
check_grpc_unary_response_len(content_length)?;
527+
content_length as usize
528+
} else {
529+
0
530+
};
531+
532+
let mut payload = Vec::with_capacity(capacity);
533+
while let Some(chunk) = response.chunk().await.map_err(|e| {
534+
LdkServerError::new(InternalError, format!("Failed to read response body: {}", e))
535+
})? {
536+
let len = payload.len().checked_add(chunk.len()).ok_or_else(|| {
537+
LdkServerError::new(InternalError, "gRPC unary response body length overflow")
538+
})?;
539+
check_grpc_unary_response_len(len as u64)?;
540+
payload.extend_from_slice(&chunk);
541+
}
542+
Ok(payload)
543+
}
544+
545+
fn check_grpc_unary_response_len(len: u64) -> Result<(), LdkServerError> {
546+
if len > MAX_GRPC_UNARY_RESPONSE_LEN as u64 {
547+
return Err(LdkServerError::new(
548+
InternalError,
549+
format!(
550+
"gRPC unary response exceeds maximum size of {} bytes",
551+
MAX_GRPC_UNARY_RESPONSE_LEN
552+
),
553+
));
554+
}
555+
Ok(())
556+
}
557+
511558
/// Map a gRPC status code to an LdkServerError.
512559
fn grpc_code_to_error(code: u32, message: String) -> LdkServerError {
513560
match code {
@@ -568,19 +615,43 @@ impl<M: Message + Default> GrpcStream<M> {
568615
pub async fn next_message(&mut self) -> Option<Result<M, LdkServerError>> {
569616
loop {
570617
// Try to decode a complete gRPC frame from the buffer
571-
if self.buf.len() >= 5 {
618+
if self.buf.len() >= GRPC_FRAME_HEADER_LEN {
619+
if self.buf[0] != 0 {
620+
return Some(Err(LdkServerError::new(
621+
InternalError,
622+
"gRPC stream compression is not supported",
623+
)));
624+
}
572625
let msg_len =
573626
u32::from_be_bytes([self.buf[1], self.buf[2], self.buf[3], self.buf[4]])
574627
as usize;
575-
if self.buf.len() >= 5 + msg_len {
576-
let proto_bytes = &self.buf[5..5 + msg_len];
628+
if msg_len > MAX_GRPC_STREAM_MESSAGE_LEN {
629+
return Some(Err(LdkServerError::new(
630+
InternalError,
631+
format!(
632+
"gRPC stream message exceeds maximum size of {} bytes",
633+
MAX_GRPC_STREAM_MESSAGE_LEN
634+
),
635+
)));
636+
}
637+
let frame_len = match GRPC_FRAME_HEADER_LEN.checked_add(msg_len) {
638+
Some(frame_len) => frame_len,
639+
None => {
640+
return Some(Err(LdkServerError::new(
641+
InternalError,
642+
"gRPC stream frame length overflow",
643+
)));
644+
},
645+
};
646+
if self.buf.len() >= frame_len {
647+
let proto_bytes = &self.buf[GRPC_FRAME_HEADER_LEN..frame_len];
577648
let result = M::decode(proto_bytes).map_err(|e| {
578649
LdkServerError::new(
579650
InternalError,
580651
format!("Failed to decode gRPC stream message: {}", e),
581652
)
582653
});
583-
self.buf.drain(..5 + msg_len);
654+
self.buf.drain(..frame_len);
584655
return Some(result);
585656
}
586657
}
@@ -691,6 +762,25 @@ mod tests {
691762
assert_eq!(err.message, "gRPC stream became unavailable: server shutting down");
692763
}
693764

765+
#[test]
766+
fn test_grpc_unary_response_len_allows_limit() {
767+
assert!(check_grpc_unary_response_len(MAX_GRPC_UNARY_RESPONSE_LEN as u64).is_ok());
768+
}
769+
770+
#[test]
771+
fn test_grpc_unary_response_len_rejects_oversized() {
772+
let err =
773+
check_grpc_unary_response_len(MAX_GRPC_UNARY_RESPONSE_LEN as u64 + 1).unwrap_err();
774+
assert_eq!(err.error_code, InternalError);
775+
assert_eq!(
776+
err.message,
777+
format!(
778+
"gRPC unary response exceeds maximum size of {} bytes",
779+
MAX_GRPC_UNARY_RESPONSE_LEN
780+
)
781+
);
782+
}
783+
694784
#[tokio::test]
695785
async fn test_event_stream_surfaces_terminal_grpc_status() {
696786
let (mut sender, body) = Body::channel();
@@ -713,6 +803,48 @@ mod tests {
713803
assert!(stream.next_message().await.is_none());
714804
}
715805

806+
#[tokio::test]
807+
async fn test_event_stream_rejects_oversized_frame_header() {
808+
let (mut sender, body) = Body::channel();
809+
sender.send_data(vec![0u8, 0xff, 0xff, 0xff, 0xff].into()).await.unwrap();
810+
drop(sender);
811+
812+
let mut stream: EventStream = GrpcStream {
813+
body,
814+
buf: Vec::new(),
815+
trailers_checked: false,
816+
_marker: std::marker::PhantomData,
817+
};
818+
819+
let result = stream.next_message().await.unwrap().unwrap_err();
820+
assert_eq!(result.error_code, InternalError);
821+
assert_eq!(
822+
result.message,
823+
format!(
824+
"gRPC stream message exceeds maximum size of {} bytes",
825+
MAX_GRPC_STREAM_MESSAGE_LEN
826+
)
827+
);
828+
}
829+
830+
#[tokio::test]
831+
async fn test_event_stream_rejects_compressed_frame() {
832+
let (mut sender, body) = Body::channel();
833+
sender.send_data(vec![1u8, 0, 0, 0, 0].into()).await.unwrap();
834+
drop(sender);
835+
836+
let mut stream: EventStream = GrpcStream {
837+
body,
838+
buf: Vec::new(),
839+
trailers_checked: false,
840+
_marker: std::marker::PhantomData,
841+
};
842+
843+
let result = stream.next_message().await.unwrap().unwrap_err();
844+
assert_eq!(result.error_code, InternalError);
845+
assert_eq!(result.message, "gRPC stream compression is not supported");
846+
}
847+
716848
#[test]
717849
fn test_grpc_code_to_error_all_known_codes() {
718850
let cases = [

ldk-server-grpc/src/grpc.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ pub fn grpc_error_response(status: GrpcStatus) -> http::Response<GrpcBody> {
191191
.status(200)
192192
.header("content-type", "application/grpc+proto")
193193
.header("grpc-accept-encoding", "identity")
194+
.header("content-length", "0")
194195
.header("grpc-status", status.code.to_string());
195196
if !status.message.is_empty() {
196197
let encoded = percent_encode(&status.message);
@@ -203,12 +204,15 @@ pub fn grpc_error_response(status: GrpcStatus) -> http::Response<GrpcBody> {
203204

204205
/// Build an HTTP 200 response with gRPC content-type and the given body.
205206
pub fn grpc_response(body: GrpcBody) -> http::Response<GrpcBody> {
206-
http::Response::builder()
207+
let mut builder = http::Response::builder()
207208
.status(200)
208209
.header("content-type", "application/grpc+proto")
209-
.header("grpc-accept-encoding", "identity")
210-
.body(body)
211-
.unwrap()
210+
.header("grpc-accept-encoding", "identity");
211+
if let GrpcBody::Unary { data, .. } = &body {
212+
let len = data.as_ref().map_or(0, Bytes::len);
213+
builder = builder.header("content-length", len.to_string());
214+
}
215+
builder.body(body).unwrap()
212216
}
213217

214218
/// Validate that the request looks like a gRPC call.
@@ -322,6 +326,30 @@ mod tests {
322326
assert!(decoded.is_empty());
323327
}
324328

329+
#[test]
330+
fn test_grpc_response_sets_unary_content_length() {
331+
let data = encode_grpc_frame(b"hello");
332+
let expected_len = data.len().to_string();
333+
let response = grpc_response(GrpcBody::Unary { data: Some(data), trailers_sent: false });
334+
335+
assert_eq!(response.headers().get("content-length").unwrap(), expected_len.as_str());
336+
}
337+
338+
#[test]
339+
fn test_grpc_response_omits_stream_content_length() {
340+
let (_tx, rx) = tokio::sync::mpsc::channel(1);
341+
let response = grpc_response(GrpcBody::Stream { rx, done: false });
342+
343+
assert!(response.headers().get("content-length").is_none());
344+
}
345+
346+
#[test]
347+
fn test_grpc_error_response_sets_zero_content_length() {
348+
let response = grpc_error_response(GrpcStatus::new(GRPC_STATUS_INVALID_ARGUMENT, "bad"));
349+
350+
assert_eq!(response.headers().get("content-length").unwrap(), "0");
351+
}
352+
325353
#[test]
326354
fn test_decode_too_short() {
327355
assert!(decode_grpc_body(&[0, 0, 0]).is_err());

0 commit comments

Comments
 (0)