Skip to content

Commit 21d6b00

Browse files
committed
feat(sdk): Use server.Shutdown instead of server.Close for graceful shutdown
1 parent 918c7a0 commit 21d6b00

1 file changed

Lines changed: 41 additions & 8 deletions

File tree

internal/ipc/ipc.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,45 @@ import (
88
"net"
99
"net/http"
1010
"sync"
11+
"time"
1112

1213
"github.com/hashicorp/yamux"
1314
)
1415

15-
func NewPluginIPC(sockConn net.Conn, handler http.Handler, onServerClosed func(error)) (io.Closer, *http.Client, error) {
16+
const (
17+
defaultShutdownTimeout = 2 * time.Second
18+
)
19+
20+
type cfg struct {
21+
shutdownTimeout time.Duration
22+
}
23+
24+
type Option func(*cfg) *cfg
25+
26+
func WithShutdownTimeout(d time.Duration) Option {
27+
return func(in *cfg) *cfg {
28+
in.shutdownTimeout = d
29+
return in
30+
}
31+
}
32+
33+
func NewPluginIPC(sockConn net.Conn, handler http.Handler, onServerClosed func(error), option ...Option) (io.Closer, *http.Client, error) {
1634
// TODO: configure yamux logger
1735
session, err := yamux.Client(sockConn, nil)
1836
if err != nil {
1937
return nil, nil, fmt.Errorf("creating yamux client: %w", err)
2038
}
21-
i, c := newMuxedIPC(session, handler, onServerClosed)
39+
i, c := newMuxedIPC(session, handler, onServerClosed, option...)
2240
return i, c, nil
2341
}
2442

25-
func NewRuntimeIPC(sockConn net.Conn, handler http.Handler, onServerClosed func(error)) (io.Closer, *http.Client, error) {
43+
func NewRuntimeIPC(sockConn net.Conn, handler http.Handler, onServerClosed func(error), option ...Option) (io.Closer, *http.Client, error) {
2644
// TODO: configure yamux logger
2745
session, err := yamux.Server(sockConn, nil)
2846
if err != nil {
2947
return nil, nil, fmt.Errorf("creating yamux server: %w", err)
3048
}
31-
i, c := newMuxedIPC(session, handler, onServerClosed)
49+
i, c := newMuxedIPC(session, handler, onServerClosed, option...)
3250
return i, c, nil
3351
}
3452

@@ -50,18 +68,31 @@ func newIpcServer(l net.Listener, handler http.Handler, onClose func(error) erro
5068
if errors.Is(err, http.ErrServerClosed) { // not an error, client closed the connection
5169
err = nil
5270
}
53-
result.err = errors.Join(err, onClose(err))
71+
result.err = errors.Join(filterEOF(err), onClose(err)) // EOF: only forward to the onClose handler, but filter out internal forwarding
5472
close(result.done)
5573
}()
5674
return result
5775
}
5876

77+
func filterEOF(err error) error {
78+
if errors.Is(err, io.EOF) {
79+
return nil
80+
}
81+
return err
82+
}
83+
5984
type ipcImpl struct {
6085
server *ipcServer
6186
teardown func() error
6287
}
6388

64-
func newMuxedIPC(session *yamux.Session, handler http.Handler, onClose func(error)) (*ipcImpl, *http.Client) {
89+
func newMuxedIPC(session *yamux.Session, handler http.Handler, onClose func(error), option ...Option) (*ipcImpl, *http.Client) {
90+
// Note: Calling session.Close() needs to be done as the very last step as it shuts down all IPC!
91+
92+
cfg := &cfg{shutdownTimeout: defaultShutdownTimeout}
93+
for _, o := range option {
94+
cfg = o(cfg)
95+
}
6596
server := newIpcServer(session, handler, func(err error) error {
6697
if onClose != nil {
6798
onClose(err)
@@ -71,9 +102,11 @@ func newMuxedIPC(session *yamux.Session, handler http.Handler, onClose func(erro
71102
return &ipcImpl{
72103
server: server,
73104
teardown: sync.OnceValue(func() error {
74-
err := errors.Join(server.server.Close(), session.Close())
105+
ctx, cancel := context.WithTimeout(context.Background(), cfg.shutdownTimeout)
106+
defer cancel()
107+
err := server.server.Shutdown(ctx)
75108
<-server.done
76-
return err
109+
return errors.Join(err, session.Close(), server.err)
77110
}),
78111
}, createYamuxedClient(session)
79112
}

0 commit comments

Comments
 (0)