From 0af8c10ce77f231454de939dbf15df45dbf83370 Mon Sep 17 00:00:00 2001 From: Carroll Vance Date: Thu, 18 Dec 2025 08:38:35 -0600 Subject: [PATCH 1/3] Fix response streaming hang (close channel after grpc-status trailer) --- src/Curl.jl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Curl.jl b/src/Curl.jl index db31862..30f59fc 100644 --- a/src/Curl.jl +++ b/src/Curl.jl @@ -59,7 +59,7 @@ function write_callback( ) # If we are response streaming unblock the task waiting on response_c - !isnothing(req.response_c) && close(req.response_c) + close(req.response_c) return typemax(Csize_t) end @@ -137,6 +137,7 @@ function header_callback( capture = m_grpc_status.captures[1] if capture !== nothing req.grpc_status = parse(UInt64, capture) + close(req.response_c) end elseif (m_grpc_message = match(regex_grpc_message, header)) isa RegexMatch capture = m_grpc_message.captures[1] @@ -259,8 +260,8 @@ mutable struct gRPCRequest # curl_easy_setopt(easy_handle, CURLOPT_VERBOSE, UInt32(1)) curl_easy_setopt(easy_handle, CURLOPT_URL, url) - curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, Clong(ceil(deadline))) - curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT, Clong(ceil(deadline))) + curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000*deadline))) + curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT_MS, Clong(ceil(1000*deadline))) curl_easy_setopt(easy_handle, CURLOPT_PIPEWAIT, Clong(1)) curl_easy_setopt(easy_handle, CURLOPT_POST, Clong(1)) curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, "POST") @@ -408,7 +409,7 @@ function handle_write( ) # If we are response streaming unblock the task waiting on response_c - !isnothing(req.response_c) && close(req.response_c) + close(req.response_c) notify(req.ready) return n, nothing elseif req.response_length > req.max_recieve_message_length @@ -420,7 +421,7 @@ function handle_write( ), ) # If we are response streaming unblock the task waiting on response_c - !isnothing(req.response_c) && close(req.response_c) + close(req.response_c) notify(req.ready) return n, nothing end From 153dcd5d201bf402a2418cabd27c6f8ff0d253d9 Mon Sep 17 00:00:00 2001 From: Carroll Vance Date: Thu, 18 Dec 2025 08:38:56 -0600 Subject: [PATCH 2/3] Add test for response streaming hang + improve existing deadline exceeded test --- test/runtests.jl | 60 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 8c64644..c77608b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -230,6 +230,46 @@ include("gen/test/test_pb.jl") grpc_async_await(req) end + @testset "Response Streaming hang after END_STREAM" begin + N = 10 + + client = TestService_TestServerStreamRPC_Client(_TEST_HOST, _TEST_PORT) + + response_c = Channel{TestResponse}(N) + + req = grpc_async_request(client, TestRequest(N, zeros(UInt64, 1)), response_c) + + i = 1 + try + while i <= N + 1 + response = take!(response_c) + i += 1 + end + @test false + catch ex + @test isa(ex, InvalidStateException) + @test i == N + 1 + end + grpc_async_await(req) + end + + @testset "Deadline Exceeded" begin + client = TestService_TestClientStreamRPC_Client(_TEST_HOST, _TEST_PORT; deadline=0.001) + request_c = Channel{TestRequest}(1) + + request = grpc_async_request(client, request_c) + sleep(1.0) + + try + grpc_async_await(request) + @test false + catch ex + # Verify the deadline was exceeded + @test isa(ex, gRPCServiceCallException) + @test ex.grpc_status == GRPC_DEADLINE_EXCEEDED + end + end + @testset "Response Streaming - Small Messages" begin N = 1000 client = TestService_TestServerStreamRPC_Client(_TEST_HOST, _TEST_PORT) @@ -395,26 +435,6 @@ include("gen/test/test_pb.jl") # end - @testset "Deadline - Very short timeout" begin - # Test with an extremely short deadline that might timeout - # Note: This test is timing-sensitive and might be flaky - client = TestService_TestRPC_Client(_TEST_HOST, _TEST_PORT; deadline = 0.000000001) - - # Try to make a request - it might timeout depending on server response time - try - response = grpc_sync_request(client, TestRequest(1, zeros(UInt64, 1))) - if Sys.iswindows() - @test_broken false - else - @test false - end - catch ex - # If it times out, verify it's an exception (CURL timeout or gRPC error) - @test isa(ex, gRPCServiceCallException) - @test ex.grpc_status == GRPC_DEADLINE_EXCEEDED - end - end - @testset "Max Message Size" begin # Create a client with much more restictive max message lengths client = TestService_TestRPC_Client( From a5f3c7bcfaefa45b3f92732ac6371da72cbcb249 Mon Sep 17 00:00:00 2001 From: Carroll Vance Date: Thu, 18 Dec 2025 08:46:48 -0600 Subject: [PATCH 3/3] More principled fix for streaming hang: close streaming channels in cleanup_request (called after CURLMSG_DONE) --- src/Curl.jl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Curl.jl b/src/Curl.jl index 30f59fc..9033ed0 100644 --- a/src/Curl.jl +++ b/src/Curl.jl @@ -137,7 +137,6 @@ function header_callback( capture = m_grpc_status.captures[1] if capture !== nothing req.grpc_status = parse(UInt64, capture) - close(req.response_c) end elseif (m_grpc_message = match(regex_grpc_message, header)) isa RegexMatch capture = m_grpc_message.captures[1] @@ -806,6 +805,9 @@ function cleanup_request(grpc::gRPCCURL, req::gRPCRequest) curl_slist_free_all(req.headers) # Allow this to be GC now that there is no risk of use in C callback unpreserve_handle(req) + # Close streaming channels + close(req.response_c) + close(req.request_c) # Increment the request semaphore to allow more requests through max_reqs_inc(grpc, req) # Unblock anything waiting on the request