Skip to content

Connection pooling for gateway stream connections#1073

Open
s373nZ wants to merge 3 commits into
masterfrom
stream-connection-pool
Open

Connection pooling for gateway stream connections#1073
s373nZ wants to merge 3 commits into
masterfrom
stream-connection-pool

Conversation

@s373nZ
Copy link
Copy Markdown
Contributor

@s373nZ s373nZ commented May 20, 2026

Pools the gateway's streaming RPCs across multiple HTTP/2 connections to multiply the effective MAX_CONCURRENT_STREAMS capacity. Each connection in the pool carries an independent stream budget, and splitConn round-robins NewStream calls across them.

Configurable via ARKD_STREAM_CONN_POOL_SIZE (default 4). A pool size of 1 preserves the previous single-connection behavior.


Good work has already been done on streaming scalability by splitting unary RPCs from streaming RPCs into separate connections. However, they are both still limited to the same MAX_CONCURRENT_STREAMS value per connection.

This PR implements a basic round-robin connection pool for stream connections, allowing us to scale streaming capacity separately from the unary connection.

Alternative Approaches

  1. An enhanced version of this PR can be constructed to replace the round-robin routing strategy with a "least loaded picker". A count of streams per connection is maintained and new streams are routed to the connection which has the most capacity. This approach mitigates risks of long-lived streams unevenly saturating a certain connection's capacity and resulting in stream queues which leave requests hanging silently. I believe this is a reasonable improvement and plan to open a separate PR for consideration.

  2. Research has suggested the current bottleneck necessitating connection pooling is a limitation of the grpc-go architecture when used looping back to the same server. One solution is to decouple the gateway from the application server and route requests using the dns:/// protocol in a round-robin fashion. This is only relevant if the architecture moves to a separate gateway process, and is more infrastructure complexity but a viable longer term consideration.

  3. Research has suggested that replacing our current gateway implementation with https://github.com/connectrpc/connect-go is both feasible and a reasonable alternative architecture to consider. The architecture of this solution eliminates these connection scalability concerns entirely. I have a local branch which has the migration mostly prototyped, but still working through some backward compatibility nuances from the E2E tests. This solution is bigger change, bigger risk, but also worth considering while we're still in an alpha stage.

Summary by CodeRabbit

  • New Features
    • Added STREAM_CONN_POOL_SIZE environment variable configuration parameter to control the size of gRPC streaming connection pools, enabling operators to tune connection resource allocation for their workload.

Review Change Stack

Pool the gateway's streaming RPCs across multiple HTTP/2 connections
to multiply the effective MAX_CONCURRENT_STREAMS capacity. Each
connection in the pool carries an independent stream budget, and
splitConn round-robins NewStream calls across them.

Configurable via ARKD_STREAM_CONN_POOL_SIZE (default 4). A pool
size of 1 preserves the previous single-connection behavior.
@s373nZ s373nZ self-assigned this May 20, 2026
@s373nZ s373nZ added the go Pull requests that update go code label May 20, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Walkthrough

The PR introduces configurable gRPC stream connection pooling. A new StreamConnPoolSize configuration parameter is added to the config system with environment variable binding and bounded defaults. The gRPC service refactors its HTTP gateway connection handling to initialize a separate unary connection and a pool of streaming connections, then routes unary RPCs through the unary connection while distributing streaming RPCs round-robin across the pooled connections. Tests verify correct routing behavior.

Changes

gRPC stream connection pooling

Layer / File(s) Summary
Configuration provisioning and validation
internal/config/config.go, internal/interface/grpc/config.go
StreamConnPoolSize uint32 field added to config struct with STREAM_CONN_POOL_SIZE environment variable binding, default and upper bound constants defined, Viper registration in LoadConfig(), and clamped value assignment (minimum 1, maximum maxStreamConnPoolSize).
gRPC service pool initialization and lifecycle
internal/interface/grpc/service.go
Service struct replaces single streaming connection with streamConns pool. Startup creates unary connection plus sized streaming pool with recovery cleanup, shutdown iterates all pooled connections to close them. splitConn struct now tracks streamPool and atomic round-robin index; NewStream selects next connection modulo pool size instead of using single connection. Admin gateway failure cleanup updated to close entire pool.
Integration wiring and pool routing tests
cmd/arkd/main.go, internal/interface/grpc/service_test.go
Configuration StreamConnPoolSize wired into service initialization. Test file introduces mockConn double implementing grpc.ClientConnInterface with atomic invoke/stream counters. Tests verify splitConn.Invoke routes only to unary connection, splitConn.NewStream distributes round-robin across pool with even distribution under concurrency, and pool size 1 routes all streams to single connection.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant splitConn
  participant unaryConn
  participant streamPool as Stream Pool<br/>(round-robin)
  Client->>splitConn: Invoke (unary RPC)
  splitConn->>unaryConn: Invoke
  unaryConn-->>splitConn: response
  splitConn-->>Client: response
  Client->>splitConn: NewStream (streaming RPC)
  splitConn->>streamPool: next connection<br/>(atomic index % poolSize)
  streamPool->>streamPool: select pooled conn[i]
  streamPool-->>splitConn: pooled connection
  splitConn->>streamPool: NewStream on conn[i]
  streamPool-->>Client: stream
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • arkade-os/arkd#979: Both PRs modify internal/interface/grpc/service.go to refactor splitConn gRPC routing—this PR extends the unary-vs-stream split foundation with a configurable pool mechanism.

Suggested reviewers

  • altafan
  • arkanaai
  • sekulicd
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Connection pooling for gateway stream connections' accurately and concisely summarizes the main change: introducing a connection pool for the gateway's streaming RPCs.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch stream-connection-pool

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@arkanaai arkanaai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Connection pooling for gateway stream connections

Verdict: APPROVE — Clean, well-scoped infrastructure change. Not protocol-critical (no VTXO/signing/forfeit/round/exit paths touched).

What it does

Replaces the single streamConn with a pool of N *grpc.ClientConn, round-robined via atomic.Uint64 in splitConn.NewStream. Each connection gets its own HTTP/2 MAX_CONCURRENT_STREAMS budget, so a pool of 4 gives 4× the stream capacity. Config: ARKD_STREAM_CONN_POOL_SIZE, default 4.

✅ Correctness

  • Round-robin logic (service.go:689-691): atomic.Add(1)-1 then %len(pool) is correct and lock-free. Wraparound at math.MaxUint64 is safe — modulo still distributes evenly.
  • Pool initialization (service.go:383-403): Proper cleanup of already-created connections on mid-loop error. Good.
  • Shutdown (service.go:210-213, 483-488): Iterates all pool connections. Correct.
  • Double guard on pool size: max(1, viper.GetUint32(...)) in config.go:462 AND if poolSize == 0 { poolSize = 1 } in service.go:384. Belt and suspenders — fine.
  • streamPool slice is read-only after construction — no mutation after newServer returns, so concurrent reads in NewStream are safe without a mutex.

✅ Test coverage

  • service_test.go: Tests unary isolation, round-robin distribution, single-pool fallback, and concurrent safety with 100 goroutines × 100 calls. Distribution tolerance of ±20% is reasonable for round-robin under contention.

✅ Cross-repo impact

  • splitConn is unexported, internal to grpcservice package. No downstream consumers.
  • ARKD_STREAM_CONN_POOL_SIZE is a new additive config — no breaking change. Default of 4 changes behavior vs. implicit pool-of-1 before, but this is the stated intent.

⚠️ Nits (non-blocking)

  1. No upper bound on pool size (config.go:462): A misconfigured ARKD_STREAM_CONN_POOL_SIZE=10000 would open 10k gRPC connections to localhost. Consider capping at something sane (e.g., 64) with a warning log, or at minimum validating in Config.Validate().

  2. splitConn has no guard against empty streamPool (service.go:689): len(c.streamPool) == 0 would panic with divide-by-zero. Currently impossible due to the double guard, but a defensive if len(c.streamPool) == 0 panic with a clear message would make future refactors safer.

  3. Log the pool size at startup: Would help operators debug stream capacity issues. A single log.Infof("stream connection pool size: %d", poolSize) after the loop would be useful.

Good work. The round-robin approach is the right first step — least-loaded picker can come later as mentioned in the PR description.

@s373nZ
Copy link
Copy Markdown
Contributor Author

s373nZ commented May 20, 2026

Happy to address Arkana's #2 above, but it necessitates abstracting the connection construction only for a panic and is already protected by the default and max settings.

Copy link
Copy Markdown
Contributor

@arkanaai arkanaai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up review (new commits e4321eab, ba38bdea)

Both commits address nits from my previous review:

  1. Pool size cap (config.go:294-295, 465-467): maxStreamConnPoolSize = 64 with min(max, max(1, input)) — correctly bounds both ends. Good.
  2. Startup log (service.go): log.Infof("stream connection pool size: %d", poolSize) — placed right after pool construction. Good.

No issues. Previous approval stands.

@s373nZ
Copy link
Copy Markdown
Contributor Author

s373nZ commented May 20, 2026

Still exploring performance tests, but the idea is to have this approach available and review-able alongside them.

@s373nZ s373nZ marked this pull request as ready for review May 20, 2026 15:17
Copy link
Copy Markdown
Contributor

@arkanaai arkanaai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Connection pooling for gateway stream connections

Verdict: Approve

Not protocol-critical — this is gateway transport layer only. No VTXO, signing, forfeit, round lifecycle, or exit path code is touched.

What this does

Replaces the single streamConn *grpc.ClientConn with a []*grpc.ClientConn pool, round-robining NewStream calls via atomic.Uint64. Each HTTP/2 connection carries an independent MAX_CONCURRENT_STREAMS budget, so effective stream capacity = MaxConcurrentStreams × StreamConnPoolSize.

Correctness ✅

  • Round-robin atomics (service.go:689-691): streamIndex.Add(1) - 1 with % len(pool) is correct. The uint64 wrap at math.MaxUint64 is a non-issue (once per ~18 quintillion calls, distribution skew of exactly 1 call).
  • Config clamping (config.go:465-467): min(maxStreamConnPoolSize, max(1, ...)) correctly bounds to [1, 64].
  • Defensive zero-check (service.go:386-388): Redundant with config clamping but harmless — good belt-and-suspenders.
  • Error cleanup in newServer (service.go:393-401): Properly closes all previously-opened pool connections on partial failure. Good.
  • stop() cleanup (service.go:211-214): Correctly iterates the slice. Good.
  • Admin conn error path (service.go:486-491): Updated to loop over streamConns. Correct.

Concurrency ✅

  • atomic.Uint64 for the round-robin counter — no mutex needed, correct.
  • streamPool slice is written once during init, read-only after — no race.
  • Tests cover concurrent access with 100 goroutines × 100 calls and verify even distribution within 20% tolerance. Solid.

Tests ✅

  • TestSplitConnInvoke: Verifies unary isolation.
  • TestSplitConnNewStream: Round-robin, single-pool, and concurrent subtests. Good coverage for the unit.

Minor observations (non-blocking)

  1. Operator log could be more informative (service.go:406): Consider logging effective capacity too:

    log.Infof("stream connection pool size: %d (effective capacity: %d streams)", poolSize, poolSize * s.config.MaxConcurrentStreams)
    
  2. Round-robin vs least-loaded: As you noted in the PR description, long-lived streams can cause uneven load. The planned least-loaded follow-up is the right call. Worth tracking as a TODO.

  3. No cross-repo impact: splitConn is internal to the grpc service package. No public API, proto, or interface changes. SDKs unaffected. Confirmed via grep across all repos.

Clean PR. Well-scoped. Good error handling. Good tests.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/interface/grpc/service.go`:
- Around line 693-698: The NewStream method on splitConn can panic when
streamPool is empty because it does idx % uint64(len(c.streamPool)); add a
defensive check at the start of splitConn.NewStream to handle len(c.streamPool)
== 0 (referencing splitConn, NewStream, streamPool, streamIndex) and return a
clear non-nil error (e.g., wrap or return a grpc-friendly error) instead of
performing the modulus; keep the existing index increment behavior but only use
it after confirming the pool length is > 0.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: afbaa972-5d6a-4746-b518-62eb2dbd4709

📥 Commits

Reviewing files that changed from the base of the PR and between 2c9612a and ba38bde.

📒 Files selected for processing (5)
  • cmd/arkd/main.go
  • internal/config/config.go
  • internal/interface/grpc/config.go
  • internal/interface/grpc/service.go
  • internal/interface/grpc/service_test.go

Comment on lines 693 to +698
func (c *splitConn) NewStream(
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
return c.stream.NewStream(ctx, desc, method, opts...)
idx := c.streamIndex.Add(1) - 1
conn := c.streamPool[idx%uint64(len(c.streamPool))]
return conn.NewStream(ctx, desc, method, opts...)
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Guard against empty stream pools to prevent panic.

If streamPool is empty, idx % uint64(len(c.streamPool)) panics. A defensive check keeps this path safe under future refactors/tests.

Proposed fix
 func (c *splitConn) NewStream(
 	ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
 ) (grpc.ClientStream, error) {
+	if len(c.streamPool) == 0 {
+		return nil, fmt.Errorf("stream pool is empty")
+	}
 	idx := c.streamIndex.Add(1) - 1
 	conn := c.streamPool[idx%uint64(len(c.streamPool))]
 	return conn.NewStream(ctx, desc, method, opts...)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *splitConn) NewStream(
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
return c.stream.NewStream(ctx, desc, method, opts...)
idx := c.streamIndex.Add(1) - 1
conn := c.streamPool[idx%uint64(len(c.streamPool))]
return conn.NewStream(ctx, desc, method, opts...)
func (c *splitConn) NewStream(
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
if len(c.streamPool) == 0 {
return nil, fmt.Errorf("stream pool is empty")
}
idx := c.streamIndex.Add(1) - 1
conn := c.streamPool[idx%uint64(len(c.streamPool))]
return conn.NewStream(ctx, desc, method, opts...)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/interface/grpc/service.go` around lines 693 - 698, The NewStream
method on splitConn can panic when streamPool is empty because it does idx %
uint64(len(c.streamPool)); add a defensive check at the start of
splitConn.NewStream to handle len(c.streamPool) == 0 (referencing splitConn,
NewStream, streamPool, streamIndex) and return a clear non-nil error (e.g., wrap
or return a grpc-friendly error) instead of performing the modulus; keep the
existing index increment behavior but only use it after confirming the pool
length is > 0.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order for this to break:

  1. The min / max on the config value must be missed
  2. The condition setting poolSize = 1 if it is already 0 on line 383 must fail
  3. Subsequent stream pool initialization must still not fail

Also, the suggested fix adds a conditional evaluation to every new stream creation. We could fix this better by abstracting a constructor for the splitConn type which validates for an empty pool and panics at startup, but we're already doing this indirectly in three different places.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s373nZ, you're right on all counts. The invariant is upheld by three independent layers:

  1. Config clamping to [1, maxStreamConnPoolSize]
  2. The explicit poolSize = 1 fallback at line 383
  3. Pool construction aborting early (and returning an error) if any grpc.NewClient call fails — so a partial or empty slice never reaches splitConn

For streamPool to be empty at NewStream time, all three would need to fail silently and simultaneously, which is not a realistic scenario with the current construction path.

Your constructor-panic idea is the correct tradeoff if belt-and-suspenders validation is ever desired: validate once at startup, pay zero cost at runtime, and make misconfiguration immediately visible. A simple newSplitConn that panics on len(streamPool) == 0 would be a clean addition without the per-call branch overhead. That said, given the existing guards, it's not necessary here either.

I'll withdraw the suggestion.

(¬_¬ )ノ🐇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go Pull requests that update go code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant