Connection pooling for gateway stream connections#1073
Conversation
Pool the gateway's streaming RPCs across multiple HTTP/2 connections to multiply the effective MAX_CONCURRENT_STREAMS capacity. Each connection in the pool carries an independent stream budget, and splitConn round-robins NewStream calls across them. Configurable via ARKD_STREAM_CONN_POOL_SIZE (default 4). A pool size of 1 preserves the previous single-connection behavior.
WalkthroughThe PR introduces configurable gRPC stream connection pooling. A new ChangesgRPC stream connection pooling
Sequence Diagram(s)sequenceDiagram
participant Client
participant splitConn
participant unaryConn
participant streamPool as Stream Pool<br/>(round-robin)
Client->>splitConn: Invoke (unary RPC)
splitConn->>unaryConn: Invoke
unaryConn-->>splitConn: response
splitConn-->>Client: response
Client->>splitConn: NewStream (streaming RPC)
splitConn->>streamPool: next connection<br/>(atomic index % poolSize)
streamPool->>streamPool: select pooled conn[i]
streamPool-->>splitConn: pooled connection
splitConn->>streamPool: NewStream on conn[i]
streamPool-->>Client: stream
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Review: Connection pooling for gateway stream connections
Verdict: APPROVE — Clean, well-scoped infrastructure change. Not protocol-critical (no VTXO/signing/forfeit/round/exit paths touched).
What it does
Replaces the single streamConn with a pool of N *grpc.ClientConn, round-robined via atomic.Uint64 in splitConn.NewStream. Each connection gets its own HTTP/2 MAX_CONCURRENT_STREAMS budget, so a pool of 4 gives 4× the stream capacity. Config: ARKD_STREAM_CONN_POOL_SIZE, default 4.
✅ Correctness
- Round-robin logic (
service.go:689-691):atomic.Add(1)-1then%len(pool)is correct and lock-free. Wraparound atmath.MaxUint64is safe — modulo still distributes evenly. - Pool initialization (
service.go:383-403): Proper cleanup of already-created connections on mid-loop error. Good. - Shutdown (
service.go:210-213,483-488): Iterates all pool connections. Correct. - Double guard on pool size:
max(1, viper.GetUint32(...))inconfig.go:462ANDif poolSize == 0 { poolSize = 1 }inservice.go:384. Belt and suspenders — fine. streamPoolslice is read-only after construction — no mutation afternewServerreturns, so concurrent reads inNewStreamare safe without a mutex.
✅ Test coverage
service_test.go: Tests unary isolation, round-robin distribution, single-pool fallback, and concurrent safety with 100 goroutines × 100 calls. Distribution tolerance of ±20% is reasonable for round-robin under contention.
✅ Cross-repo impact
splitConnis unexported, internal togrpcservicepackage. No downstream consumers.ARKD_STREAM_CONN_POOL_SIZEis a new additive config — no breaking change. Default of 4 changes behavior vs. implicit pool-of-1 before, but this is the stated intent.
⚠️ Nits (non-blocking)
-
No upper bound on pool size (
config.go:462): A misconfiguredARKD_STREAM_CONN_POOL_SIZE=10000would open 10k gRPC connections to localhost. Consider capping at something sane (e.g., 64) with a warning log, or at minimum validating inConfig.Validate(). -
splitConnhas no guard against emptystreamPool(service.go:689):len(c.streamPool) == 0would panic with divide-by-zero. Currently impossible due to the double guard, but a defensiveif len(c.streamPool) == 0panic with a clear message would make future refactors safer. -
Log the pool size at startup: Would help operators debug stream capacity issues. A single
log.Infof("stream connection pool size: %d", poolSize)after the loop would be useful.
Good work. The round-robin approach is the right first step — least-loaded picker can come later as mentioned in the PR description.
|
Happy to address Arkana's #2 above, but it necessitates abstracting the connection construction only for a panic and is already protected by the default and max settings. |
There was a problem hiding this comment.
Follow-up review (new commits e4321eab, ba38bdea)
Both commits address nits from my previous review:
- ✅ Pool size cap (
config.go:294-295, 465-467):maxStreamConnPoolSize = 64withmin(max, max(1, input))— correctly bounds both ends. Good. - ✅ Startup log (
service.go):log.Infof("stream connection pool size: %d", poolSize)— placed right after pool construction. Good.
No issues. Previous approval stands.
|
Still exploring performance tests, but the idea is to have this approach available and review-able alongside them. |
There was a problem hiding this comment.
Review: Connection pooling for gateway stream connections
Verdict: Approve ✅
Not protocol-critical — this is gateway transport layer only. No VTXO, signing, forfeit, round lifecycle, or exit path code is touched.
What this does
Replaces the single streamConn *grpc.ClientConn with a []*grpc.ClientConn pool, round-robining NewStream calls via atomic.Uint64. Each HTTP/2 connection carries an independent MAX_CONCURRENT_STREAMS budget, so effective stream capacity = MaxConcurrentStreams × StreamConnPoolSize.
Correctness ✅
- Round-robin atomics (
service.go:689-691):streamIndex.Add(1) - 1with% len(pool)is correct. The uint64 wrap atmath.MaxUint64is a non-issue (once per ~18 quintillion calls, distribution skew of exactly 1 call). - Config clamping (
config.go:465-467):min(maxStreamConnPoolSize, max(1, ...))correctly bounds to[1, 64]. - Defensive zero-check (
service.go:386-388): Redundant with config clamping but harmless — good belt-and-suspenders. - Error cleanup in
newServer(service.go:393-401): Properly closes all previously-opened pool connections on partial failure. Good. stop()cleanup (service.go:211-214): Correctly iterates the slice. Good.- Admin conn error path (
service.go:486-491): Updated to loop overstreamConns. Correct.
Concurrency ✅
atomic.Uint64for the round-robin counter — no mutex needed, correct.streamPoolslice is written once during init, read-only after — no race.- Tests cover concurrent access with 100 goroutines × 100 calls and verify even distribution within 20% tolerance. Solid.
Tests ✅
TestSplitConnInvoke: Verifies unary isolation.TestSplitConnNewStream: Round-robin, single-pool, and concurrent subtests. Good coverage for the unit.
Minor observations (non-blocking)
-
Operator log could be more informative (
service.go:406): Consider logging effective capacity too:log.Infof("stream connection pool size: %d (effective capacity: %d streams)", poolSize, poolSize * s.config.MaxConcurrentStreams) -
Round-robin vs least-loaded: As you noted in the PR description, long-lived streams can cause uneven load. The planned least-loaded follow-up is the right call. Worth tracking as a TODO.
-
No cross-repo impact:
splitConnis internal to the grpc service package. No public API, proto, or interface changes. SDKs unaffected. Confirmed via grep across all repos.
Clean PR. Well-scoped. Good error handling. Good tests.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/interface/grpc/service.go`:
- Around line 693-698: The NewStream method on splitConn can panic when
streamPool is empty because it does idx % uint64(len(c.streamPool)); add a
defensive check at the start of splitConn.NewStream to handle len(c.streamPool)
== 0 (referencing splitConn, NewStream, streamPool, streamIndex) and return a
clear non-nil error (e.g., wrap or return a grpc-friendly error) instead of
performing the modulus; keep the existing index increment behavior but only use
it after confirming the pool length is > 0.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: afbaa972-5d6a-4746-b518-62eb2dbd4709
📒 Files selected for processing (5)
cmd/arkd/main.gointernal/config/config.gointernal/interface/grpc/config.gointernal/interface/grpc/service.gointernal/interface/grpc/service_test.go
| func (c *splitConn) NewStream( | ||
| ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption, | ||
| ) (grpc.ClientStream, error) { | ||
| return c.stream.NewStream(ctx, desc, method, opts...) | ||
| idx := c.streamIndex.Add(1) - 1 | ||
| conn := c.streamPool[idx%uint64(len(c.streamPool))] | ||
| return conn.NewStream(ctx, desc, method, opts...) |
There was a problem hiding this comment.
Guard against empty stream pools to prevent panic.
If streamPool is empty, idx % uint64(len(c.streamPool)) panics. A defensive check keeps this path safe under future refactors/tests.
Proposed fix
func (c *splitConn) NewStream(
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
+ if len(c.streamPool) == 0 {
+ return nil, fmt.Errorf("stream pool is empty")
+ }
idx := c.streamIndex.Add(1) - 1
conn := c.streamPool[idx%uint64(len(c.streamPool))]
return conn.NewStream(ctx, desc, method, opts...)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (c *splitConn) NewStream( | |
| ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption, | |
| ) (grpc.ClientStream, error) { | |
| return c.stream.NewStream(ctx, desc, method, opts...) | |
| idx := c.streamIndex.Add(1) - 1 | |
| conn := c.streamPool[idx%uint64(len(c.streamPool))] | |
| return conn.NewStream(ctx, desc, method, opts...) | |
| func (c *splitConn) NewStream( | |
| ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption, | |
| ) (grpc.ClientStream, error) { | |
| if len(c.streamPool) == 0 { | |
| return nil, fmt.Errorf("stream pool is empty") | |
| } | |
| idx := c.streamIndex.Add(1) - 1 | |
| conn := c.streamPool[idx%uint64(len(c.streamPool))] | |
| return conn.NewStream(ctx, desc, method, opts...) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/interface/grpc/service.go` around lines 693 - 698, The NewStream
method on splitConn can panic when streamPool is empty because it does idx %
uint64(len(c.streamPool)); add a defensive check at the start of
splitConn.NewStream to handle len(c.streamPool) == 0 (referencing splitConn,
NewStream, streamPool, streamIndex) and return a clear non-nil error (e.g., wrap
or return a grpc-friendly error) instead of performing the modulus; keep the
existing index increment behavior but only use it after confirming the pool
length is > 0.
There was a problem hiding this comment.
In order for this to break:
- The min / max on the config value must be missed
- The condition setting
poolSize = 1if it is already0on line383must fail - Subsequent stream pool initialization must still not fail
Also, the suggested fix adds a conditional evaluation to every new stream creation. We could fix this better by abstracting a constructor for the splitConn type which validates for an empty pool and panics at startup, but we're already doing this indirectly in three different places.
There was a problem hiding this comment.
@s373nZ, you're right on all counts. The invariant is upheld by three independent layers:
- Config clamping to
[1, maxStreamConnPoolSize] - The explicit
poolSize = 1fallback at line 383 - Pool construction aborting early (and returning an error) if any
grpc.NewClientcall fails — so a partial or empty slice never reachessplitConn
For streamPool to be empty at NewStream time, all three would need to fail silently and simultaneously, which is not a realistic scenario with the current construction path.
Your constructor-panic idea is the correct tradeoff if belt-and-suspenders validation is ever desired: validate once at startup, pay zero cost at runtime, and make misconfiguration immediately visible. A simple newSplitConn that panics on len(streamPool) == 0 would be a clean addition without the per-call branch overhead. That said, given the existing guards, it's not necessary here either.
I'll withdraw the suggestion.
(¬_¬ )ノ🐇
Pools the gateway's streaming RPCs across multiple HTTP/2 connections to multiply the effective MAX_CONCURRENT_STREAMS capacity. Each connection in the pool carries an independent stream budget, and
splitConnround-robinsNewStreamcalls across them.Configurable via
ARKD_STREAM_CONN_POOL_SIZE(default 4). A pool size of 1 preserves the previous single-connection behavior.Good work has already been done on streaming scalability by splitting unary RPCs from streaming RPCs into separate connections. However, they are both still limited to the same
MAX_CONCURRENT_STREAMSvalue per connection.This PR implements a basic round-robin connection pool for stream connections, allowing us to scale streaming capacity separately from the unary connection.
Alternative Approaches
An enhanced version of this PR can be constructed to replace the round-robin routing strategy with a "least loaded picker". A count of streams per connection is maintained and new streams are routed to the connection which has the most capacity. This approach mitigates risks of long-lived streams unevenly saturating a certain connection's capacity and resulting in stream queues which leave requests hanging silently. I believe this is a reasonable improvement and plan to open a separate PR for consideration.
Research has suggested the current bottleneck necessitating connection pooling is a limitation of the
grpc-goarchitecture when used looping back to the same server. One solution is to decouple the gateway from the application server and route requests using thedns:///protocol in a round-robin fashion. This is only relevant if the architecture moves to a separate gateway process, and is more infrastructure complexity but a viable longer term consideration.Research has suggested that replacing our current gateway implementation with https://github.com/connectrpc/connect-go is both feasible and a reasonable alternative architecture to consider. The architecture of this solution eliminates these connection scalability concerns entirely. I have a local branch which has the migration mostly prototyped, but still working through some backward compatibility nuances from the E2E tests. This solution is bigger change, bigger risk, but also worth considering while we're still in an alpha stage.
Summary by CodeRabbit
STREAM_CONN_POOL_SIZEenvironment variable configuration parameter to control the size of gRPC streaming connection pools, enabling operators to tune connection resource allocation for their workload.