Skip to content

Commit a8317a5

Browse files
authored
fix(proxy): SSH close race that drops agent's exec reply (#41)
Track in-flight agent-to-upstream requests with a mutex+cond inflightBarrier so the agent's exec reply is fully written before sluice closes srcChan. Fixes the EOF on session.Output regression that landed on e2e-linux push runs since PR #37.
1 parent 55092f4 commit a8317a5

2 files changed

Lines changed: 327 additions & 5 deletions

File tree

internal/proxy/ssh.go

Lines changed: 137 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net"
1212
"os"
1313
"path/filepath"
14+
"sync"
1415

1516
"github.com/nemirovsky/sluice/internal/vault"
1617
"golang.org/x/crypto/ssh"
@@ -263,8 +264,33 @@ func sshHandleChannel(newChan ssh.NewChannel, dst ssh.Conn) {
263264
// from upstream finish, each signaling on this channel.
264265
upstreamDone := make(chan struct{}, 3)
265266

266-
// Forward per-channel requests bidirectionally.
267-
go sshForwardChannelRequests(srcReqs, dstChan)
267+
// Track agent-to-upstream requests that are mid-flight. Each request
268+
// the agent sends has to be forwarded to upstream, awaited for a
269+
// reply (when WantReply is true), and replied to on the agent side
270+
// before sluice may close srcChan. Without this barrier, a fast
271+
// upstream that replies + writes data + sends exit-status + closes
272+
// in one burst lets sluice drain all three upstream-to-agent
273+
// goroutines and close srcChan while this forwarder is still
274+
// mid-reply for the original exec request. The agent then observes
275+
// SSH_MSG_CHANNEL_CLOSE before its SendRequest("exec", true, ...)
276+
// receives a SUCCESS/FAILURE on ch.msg, and the gossh client
277+
// surfaces the closed ch.msg as io.EOF.
278+
//
279+
// sync.WaitGroup is the wrong primitive here because Add and Wait
280+
// are not safe to call concurrently when the counter is at zero
281+
// (Go runtime panics with "sync: WaitGroup misuse"). The forwarder
282+
// goroutine ranges over srcReqs and could enter a new iteration at
283+
// any moment, racing the main goroutine's drain. We use a mutex +
284+
// cond + draining flag instead: once draining is set, the forwarder
285+
// rejects further requests so Wait() can converge.
286+
barrier := &inflightBarrier{}
287+
barrier.cond = sync.NewCond(&barrier.mu)
288+
289+
// Forward per-channel requests bidirectionally. The agent-to-upstream
290+
// loop reports each request via barrier so sluice's pre-close
291+
// drain knows when none are pending. The upstream-to-agent loop
292+
// signals upstreamDone when dstReqs closes.
293+
go sshForwardAgentRequests(srcReqs, dstChan, barrier)
268294
go func() {
269295
sshForwardChannelRequests(dstReqs, srcChan)
270296
upstreamDone <- struct{}{}
@@ -304,10 +330,24 @@ func sshHandleChannel(newChan ssh.NewChannel, dst ssh.Conn) {
304330
<-upstreamDone
305331
<-upstreamDone
306332

333+
// Also drain any agent-to-upstream request that is mid-flight. A
334+
// pending WantReply=true request is waiting on dst.SendRequest to
335+
// return, after which it still has to call req.Reply on the agent
336+
// side. Closing srcChan before that reply is written would let the
337+
// agent see channel-close before the SUCCESS/FAILURE message on
338+
// ch.msg, which gossh surfaces as io.EOF from
339+
// session.SendRequest("exec", true, ...).
340+
//
341+
// Drain sets a draining flag (so the forwarder rejects any further
342+
// request without bumping the counter) and waits on the cond for
343+
// the current iteration, if any, to finish.
344+
barrier.drain()
345+
307346
// Now that exit-status has been forwarded (the dstReqs goroutine
308-
// has finished), signal stdout EOF to the agent and close the
309-
// channel. The agent's session.Wait() now sees the documented
310-
// order: data, exit-status, EOF, close.
347+
// has finished) and every pending agent-side reply has been
348+
// written, signal stdout EOF to the agent and close the channel.
349+
// The agent's session.Wait() now sees the documented order:
350+
// data, exit-status, EOF, close.
311351
_ = srcChan.CloseWrite()
312352
_ = srcChan.Close()
313353
_ = dstChan.Close()
@@ -328,3 +368,95 @@ func sshForwardChannelRequests(reqs <-chan *ssh.Request, dst ssh.Channel) {
328368
}
329369
}
330370
}
371+
372+
// inflightBarrier serializes the agent-to-upstream request forwarder
373+
// with sshHandleChannel's pre-close drain. The forwarder calls enter()
374+
// before forwarding a request to upstream and leave() after replying to
375+
// the agent. sshHandleChannel calls drain() once the upstream side has
376+
// fully closed: drain sets the draining flag (so any further enter()
377+
// returns false and the forwarder rejects the request without waiting
378+
// on a closed upstream) and blocks until count reaches zero.
379+
//
380+
// The mutex+cond pattern avoids the Add/Wait race that a sync.WaitGroup
381+
// would have: with a WaitGroup the forwarder's loop could call Add(1)
382+
// at the same instant sshHandleChannel called Wait() with the counter
383+
// at zero, and the Go runtime panics on that interleaving.
384+
type inflightBarrier struct {
385+
mu sync.Mutex
386+
cond *sync.Cond
387+
count int
388+
draining bool
389+
}
390+
391+
// enter reports the start of a request handler. Returns false if drain
392+
// has already begun, in which case the caller must NOT proceed to
393+
// forward the request to a possibly-closed upstream.
394+
func (b *inflightBarrier) enter() bool {
395+
b.mu.Lock()
396+
defer b.mu.Unlock()
397+
if b.draining {
398+
return false
399+
}
400+
b.count++
401+
return true
402+
}
403+
404+
// leave matches a successful enter. When the counter reaches zero
405+
// during draining, the waiter is signaled.
406+
func (b *inflightBarrier) leave() {
407+
b.mu.Lock()
408+
b.count--
409+
if b.count == 0 && b.draining {
410+
b.cond.Broadcast()
411+
}
412+
b.mu.Unlock()
413+
}
414+
415+
// drain sets the draining flag (locking out new enters) and blocks
416+
// until any currently in-flight handlers call leave.
417+
func (b *inflightBarrier) drain() {
418+
b.mu.Lock()
419+
b.draining = true
420+
for b.count > 0 {
421+
b.cond.Wait()
422+
}
423+
b.mu.Unlock()
424+
}
425+
426+
// sshForwardAgentRequests is the agent-to-upstream variant of
427+
// sshForwardChannelRequests. It coordinates with sshHandleChannel's
428+
// pre-close drain via inflightBarrier so the reply on the agent
429+
// direction (req.Reply on srcChan) is fully written before sluice
430+
// closes srcChan. Otherwise an agent that called
431+
// session.SendRequest("exec", WantReply=true, ...) can observe
432+
// SSH_MSG_CHANNEL_CLOSE before its ch.msg receives the
433+
// CHANNEL_REQUEST_SUCCESS reply — gossh surfaces a closed ch.msg as
434+
// io.EOF, and `session.Output("cmd")` fails with EOF even though the
435+
// upstream replied successfully.
436+
//
437+
// When drain has already begun, the request is rejected without being
438+
// forwarded to upstream: the upstream channel is closing, so any reply
439+
// from upstream would never arrive. Replying false to the agent on a
440+
// WantReply request unblocks any caller waiting on ch.msg.
441+
func sshForwardAgentRequests(reqs <-chan *ssh.Request, dst ssh.Channel, barrier *inflightBarrier) {
442+
for req := range reqs {
443+
if !barrier.enter() {
444+
if req.WantReply {
445+
_ = req.Reply(false, nil)
446+
}
447+
continue
448+
}
449+
ok, err := dst.SendRequest(req.Type, req.WantReply, req.Payload)
450+
if err != nil {
451+
if req.WantReply {
452+
_ = req.Reply(false, nil)
453+
}
454+
barrier.leave()
455+
continue
456+
}
457+
if req.WantReply {
458+
_ = req.Reply(ok, nil)
459+
}
460+
barrier.leave()
461+
}
462+
}

internal/proxy/ssh_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,196 @@ func TestResolveHostKeyCallbackNoKnownHosts(t *testing.T) {
406406
}
407407
}
408408

409+
// TestSSHJumpHost_BurstCloseDoesNotDropExecReply is a focused regression
410+
// test for the race where an upstream that replies + writes data + sends
411+
// exit-status + closes in one burst (no sleep between exit-status and
412+
// close) caused sluice to close srcChan before the agent-to-upstream
413+
// forwarder finished writing the SUCCESS reply for the agent's
414+
// session.SendRequest("exec", true, ...). gossh closes ch.msg on
415+
// SSH_MSG_CHANNEL_CLOSE, so the blocked SendRequest returns io.EOF and
416+
// session.Output(...) fails with "exec command via SSH: EOF".
417+
//
418+
// startTestSSHServer (used by other tests) papers over the race with a
419+
// 50ms sleep before returning from the channel handler. This test
420+
// spins up its own burst-close server with no such sleep, so the race
421+
// is deterministically triggered without the inflightBarrier fix.
422+
func TestSSHJumpHost_BurstCloseDoesNotDropExecReply(t *testing.T) {
423+
pubKey, privPEM := generateTestSSHKey(t)
424+
dir := t.TempDir()
425+
store, err := vault.NewStore(dir)
426+
if err != nil {
427+
t.Fatal(err)
428+
}
429+
if _, err := store.Add("ssh_key", string(privPEM)); err != nil {
430+
t.Fatal(err)
431+
}
432+
433+
sshServer := startBurstCloseSSHServer(t, pubKey)
434+
defer func() { _ = sshServer.Close() }()
435+
436+
proxyHostKey, err := GenerateSSHHostKey()
437+
if err != nil {
438+
t.Fatal(err)
439+
}
440+
441+
binding := vault.Binding{
442+
Credential: "ssh_key",
443+
Template: "testuser",
444+
Protocols: []string{"ssh"},
445+
}
446+
447+
jumpHost := NewSSHJumpHost(store, proxyHostKey)
448+
jumpHost.HostKeyCallback = ssh.InsecureIgnoreHostKey()
449+
450+
// Run the test many times in a single process to maximize the
451+
// chance the close race fires if the fix regresses. Each iteration
452+
// runs through a fresh proxy connection + fresh agent session.
453+
const iterations = 50
454+
for i := 0; i < iterations; i++ {
455+
agentConn, proxyConn := tcpConnPair(t)
456+
457+
ready := make(chan error, 1)
458+
errCh := make(chan error, 1)
459+
go func() {
460+
errCh <- jumpHost.HandleConnection(proxyConn, []string{sshServer.Addr().String()}, sshServer.Addr().String(), binding, ready)
461+
}()
462+
463+
if setupErr := <-ready; setupErr != nil {
464+
t.Fatalf("iter %d: handler setup: %v", i, setupErr)
465+
}
466+
467+
agentSSH, agentChans, agentReqs, err := ssh.NewClientConn(agentConn, "proxy", &ssh.ClientConfig{
468+
User: "ignored",
469+
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
470+
})
471+
if err != nil {
472+
t.Fatalf("iter %d: agent SSH handshake: %v", i, err)
473+
}
474+
475+
client := ssh.NewClient(agentSSH, agentChans, agentReqs)
476+
session, err := client.NewSession()
477+
if err != nil {
478+
t.Fatalf("iter %d: open session: %v", i, err)
479+
}
480+
481+
output, err := session.Output("whoami")
482+
if err != nil {
483+
t.Fatalf("iter %d: exec: %v (this is the EOF symptom of the close race)", i, err)
484+
}
485+
if string(output) != "ssh-injection-ok\n" {
486+
t.Errorf("iter %d: expected 'ssh-injection-ok', got %q", i, string(output))
487+
}
488+
_ = session.Close()
489+
_ = client.Close()
490+
_ = agentSSH.Close()
491+
_ = agentConn.Close()
492+
493+
// Wait for HandleConnection to return so a leaked handler
494+
// goroutine (or a connection that fails to teardown after
495+
// close) surfaces as a test timeout rather than as silent
496+
// resource exhaustion on the next iteration. HandleConnection
497+
// returns nil on graceful agent disconnect; a non-nil error
498+
// here would mean the teardown path produced an unexpected
499+
// failure that a future regression could mask.
500+
select {
501+
case handlerErr := <-errCh:
502+
if handlerErr != nil {
503+
t.Fatalf("iter %d: HandleConnection returned error: %v", i, handlerErr)
504+
}
505+
case <-time.After(5 * time.Second):
506+
t.Fatalf("iter %d: HandleConnection did not return within 5s after close", i)
507+
}
508+
}
509+
}
510+
511+
// startBurstCloseSSHServer is a test SSH server that, on the first exec
512+
// request, replies + writes output + sends exit-status + closes the
513+
// channel with no delay between exit-status and Close. The lack of any
514+
// sleep is intentional: it deterministically triggers the close race
515+
// in sluice's SSH jump host when the inflightBarrier fix is absent.
516+
func startBurstCloseSSHServer(t *testing.T, authorizedKey ssh.PublicKey) net.Listener {
517+
t.Helper()
518+
519+
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
520+
if err != nil {
521+
t.Fatal(err)
522+
}
523+
hostSigner, err := ssh.NewSignerFromKey(key)
524+
if err != nil {
525+
t.Fatal(err)
526+
}
527+
528+
config := &ssh.ServerConfig{
529+
PublicKeyCallback: func(_ ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) {
530+
if bytes.Equal(pubKey.Marshal(), authorizedKey.Marshal()) {
531+
return &ssh.Permissions{}, nil
532+
}
533+
return nil, fmt.Errorf("unknown public key")
534+
},
535+
}
536+
config.AddHostKey(hostSigner)
537+
538+
ln, err := net.Listen("tcp", "127.0.0.1:0")
539+
if err != nil {
540+
t.Fatal(err)
541+
}
542+
543+
go func() {
544+
for {
545+
conn, err := ln.Accept()
546+
if err != nil {
547+
return
548+
}
549+
go func(c net.Conn) {
550+
sshConn, chans, reqs, err := ssh.NewServerConn(c, config)
551+
if err != nil {
552+
_ = c.Close()
553+
return
554+
}
555+
defer func() { _ = sshConn.Close() }()
556+
go ssh.DiscardRequests(reqs)
557+
for newChan := range chans {
558+
if newChan.ChannelType() != "session" {
559+
_ = newChan.Reject(ssh.UnknownChannelType, "unsupported")
560+
continue
561+
}
562+
ch, chReqs, err := newChan.Accept()
563+
if err != nil {
564+
continue
565+
}
566+
go func(ch ssh.Channel, reqs <-chan *ssh.Request) {
567+
// Defer close so a request loop that exits without
568+
// hitting the exec path (early agent close,
569+
// non-exec request only) still releases the
570+
// server-side channel.
571+
defer func() { _ = ch.Close() }()
572+
for req := range reqs {
573+
if req.Type != "exec" {
574+
if req.WantReply {
575+
_ = req.Reply(false, nil)
576+
}
577+
continue
578+
}
579+
if req.WantReply {
580+
_ = req.Reply(true, nil)
581+
}
582+
_, _ = ch.Write([]byte("ssh-injection-ok\n"))
583+
_, _ = ch.SendRequest("exit-status", false, ssh.Marshal(struct{ Status uint32 }{0}))
584+
_ = ch.CloseWrite()
585+
// NO time.Sleep here. This is the whole point of
586+
// the test: close immediately after exit-status
587+
// to maximally tighten the race window in
588+
// sluice's sshHandleChannel.
589+
return
590+
}
591+
}(ch, chReqs)
592+
}
593+
}(conn)
594+
}
595+
}()
596+
return ln
597+
}
598+
409599
// TestGenerateSSHHostKey tests SSH host key generation.
410600
func TestGenerateSSHHostKey(t *testing.T) {
411601
signer, err := GenerateSSHHostKey()

0 commit comments

Comments
 (0)