Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ function write_callback(
)

# If we are response streaming unblock the task waiting on response_c
!isnothing(req.response_c) && close(req.response_c)
close(req.response_c)
return typemax(Csize_t)
end

Expand Down Expand Up @@ -259,8 +259,8 @@ mutable struct gRPCRequest
# curl_easy_setopt(easy_handle, CURLOPT_VERBOSE, UInt32(1))

curl_easy_setopt(easy_handle, CURLOPT_URL, url)
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, Clong(ceil(deadline)))
curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT, Clong(ceil(deadline)))
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000*deadline)))
curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT_MS, Clong(ceil(1000*deadline)))
curl_easy_setopt(easy_handle, CURLOPT_PIPEWAIT, Clong(1))
curl_easy_setopt(easy_handle, CURLOPT_POST, Clong(1))
curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, "POST")
Expand Down Expand Up @@ -408,7 +408,7 @@ function handle_write(
)

# If we are response streaming unblock the task waiting on response_c
!isnothing(req.response_c) && close(req.response_c)
close(req.response_c)
notify(req.ready)
return n, nothing
elseif req.response_length > req.max_recieve_message_length
Expand All @@ -420,7 +420,7 @@ function handle_write(
),
)
# If we are response streaming unblock the task waiting on response_c
!isnothing(req.response_c) && close(req.response_c)
close(req.response_c)
notify(req.ready)
return n, nothing
end
Expand Down Expand Up @@ -805,6 +805,9 @@ function cleanup_request(grpc::gRPCCURL, req::gRPCRequest)
curl_slist_free_all(req.headers)
# Allow this to be GC now that there is no risk of use in C callback
unpreserve_handle(req)
# Close streaming channels
close(req.response_c)
close(req.request_c)
# Increment the request semaphore to allow more requests through
max_reqs_inc(grpc, req)
# Unblock anything waiting on the request
Expand Down
60 changes: 40 additions & 20 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,46 @@ include("gen/test/test_pb.jl")
grpc_async_await(req)
end

@testset "Response Streaming hang after END_STREAM" begin
N = 10

client = TestService_TestServerStreamRPC_Client(_TEST_HOST, _TEST_PORT)

response_c = Channel{TestResponse}(N)

req = grpc_async_request(client, TestRequest(N, zeros(UInt64, 1)), response_c)

i = 1
try
while i <= N + 1
response = take!(response_c)
i += 1
end
@test false
catch ex
@test isa(ex, InvalidStateException)
@test i == N + 1
end
grpc_async_await(req)
end

@testset "Deadline Exceeded" begin
client = TestService_TestClientStreamRPC_Client(_TEST_HOST, _TEST_PORT; deadline=0.001)
request_c = Channel{TestRequest}(1)

request = grpc_async_request(client, request_c)
sleep(1.0)

try
grpc_async_await(request)
@test false
catch ex
# Verify the deadline was exceeded
@test isa(ex, gRPCServiceCallException)
@test ex.grpc_status == GRPC_DEADLINE_EXCEEDED
end
end

@testset "Response Streaming - Small Messages" begin
N = 1000
client = TestService_TestServerStreamRPC_Client(_TEST_HOST, _TEST_PORT)
Expand Down Expand Up @@ -395,26 +435,6 @@ include("gen/test/test_pb.jl")

# end

@testset "Deadline - Very short timeout" begin
# Test with an extremely short deadline that might timeout
# Note: This test is timing-sensitive and might be flaky
client = TestService_TestRPC_Client(_TEST_HOST, _TEST_PORT; deadline = 0.000000001)

# Try to make a request - it might timeout depending on server response time
try
response = grpc_sync_request(client, TestRequest(1, zeros(UInt64, 1)))
if Sys.iswindows()
@test_broken false
else
@test false
end
catch ex
# If it times out, verify it's an exception (CURL timeout or gRPC error)
@test isa(ex, gRPCServiceCallException)
@test ex.grpc_status == GRPC_DEADLINE_EXCEEDED
end
end

@testset "Max Message Size" begin
# Create a client with much more restictive max message lengths
client = TestService_TestRPC_Client(
Expand Down
Loading