Skip to content

Commit 8e1a1b8

Browse files
authored
Merge pull request #413 from SiaFoundation/matt/graceful-shutdown
Matt/graceful shutdown
2 parents d778a26 + 87232af commit 8e1a1b8

5 files changed

Lines changed: 53 additions & 3 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
default: minor
3+
---
4+
5+
# RHP4 graceful shutdown

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/quic-go/quic-go v0.59.0
77
github.com/quic-go/webtransport-go v0.10.0
88
go.etcd.io/bbolt v1.4.3
9-
go.sia.tech/core v0.19.0
9+
go.sia.tech/core v0.19.1-0.20260323200052-1bfa3facc408
1010
go.sia.tech/mux v1.4.0
1111
go.uber.org/zap v1.27.1
1212
golang.org/x/crypto v0.49.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
1414
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
1515
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
1616
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
17-
go.sia.tech/core v0.19.0 h1:mj/lsixiI25hNTq1FzLHs94BCewTABulkqq2pHSHmdo=
18-
go.sia.tech/core v0.19.0/go.mod h1:Gge/hpiE9m1ugPLz8RR1ZMoYZTPWLEdRWviHr/4rVeA=
17+
go.sia.tech/core v0.19.1-0.20260323200052-1bfa3facc408 h1:euiK0PJ9ChJCK/moH3mqDuhnZq8Oaz42QB0PLE+HLb8=
18+
go.sia.tech/core v0.19.1-0.20260323200052-1bfa3facc408/go.mod h1:HR5UCFehifHhlOc9DadUVFlMLJYRBtqwXX4UbOX8E14=
1919
go.sia.tech/mux v1.4.0 h1:LgsLHtn7l+25MwrgaPaUCaS8f2W2/tfvHIdXps04sVo=
2020
go.sia.tech/mux v1.4.0/go.mod h1:iNFi9ifFb2XhuD+LF4t2HBb4Mvgq/zIPKqwXU/NlqHA=
2121
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

rhp/v4/rpc_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"cmp"
66
"context"
77
"crypto/tls"
8+
"errors"
89
"fmt"
910
"maps"
1011
"math"
@@ -2952,3 +2953,29 @@ func BenchmarkContractUpload(b *testing.B) {
29522953
b.Fatalf("expected %v sectors, got %v", b.N, appendResult.Revision.Filesize/proto4.SectorSize)
29532954
}
29542955
}
2956+
2957+
func TestGracefulShutdown(t *testing.T) {
2958+
n, genesis := testutil.V2Network()
2959+
cm, w := startTestNode(t, n, genesis)
2960+
2961+
ss := testutil.NewEphemeralSectorStore()
2962+
c := testutil.NewEphemeralContractor(cm)
2963+
sr := testutil.NewEphemeralSettingsReporter()
2964+
2965+
hostKey := types.GeneratePrivateKey()
2966+
rs := rhp4.NewServer(hostKey, cm, c, w, sr, ss, rhp4.WithPriceTableValidity(2*time.Minute))
2967+
hostAddr := testutil.ServeSiaMux(t, rs, zap.NewNop())
2968+
2969+
transport, err := siamux.Dial(context.Background(), hostAddr, hostKey.PublicKey())
2970+
if err != nil {
2971+
t.Fatal(err)
2972+
}
2973+
defer transport.Close()
2974+
2975+
rs.Close()
2976+
2977+
_, err = rhp4.RPCSettings(context.Background(), transport)
2978+
if !errors.Is(err, proto4.ErrHostShuttingDown) {
2979+
t.Fatalf("expected ErrHostShuttingDown, got %v", err)
2980+
}
2981+
}

rhp/v4/server.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.sia.tech/core/consensus"
1313
rhp4 "go.sia.tech/core/rhp/v4"
1414
"go.sia.tech/core/types"
15+
"go.sia.tech/coreutils/threadgroup"
1516
"go.sia.tech/coreutils/wallet"
1617
"go.sia.tech/mux/v2"
1718
"go.uber.org/zap"
@@ -128,6 +129,8 @@ type (
128129

129130
// A Server handles incoming RHP4 RPC.
130131
Server struct {
132+
tg *threadgroup.ThreadGroup
133+
131134
hostKey types.PrivateKey
132135
priceTableValidity time.Duration
133136
rpcTimeout time.Duration
@@ -1216,6 +1219,11 @@ func (s *Server) HostKey() types.PrivateKey {
12161219
return s.hostKey
12171220
}
12181221

1222+
// Close stops accepting new RPCs and waits for in-flight handlers to complete.
1223+
func (s *Server) Close() {
1224+
s.tg.Stop()
1225+
}
1226+
12191227
// Serve accepts incoming streams on the provided multiplexer and handles them
12201228
func (s *Server) Serve(t TransportMux, log *zap.Logger) error {
12211229
defer t.Close()
@@ -1230,6 +1238,14 @@ func (s *Server) Serve(t TransportMux, log *zap.Logger) error {
12301238
log := log.With(zap.String("streamID", hex.EncodeToString(frand.Bytes(4))))
12311239
log.Debug("accepted stream")
12321240
go func() {
1241+
done, err := s.tg.Add()
1242+
if err != nil {
1243+
log.Debug("rejected stream, server is shutting down")
1244+
rhp4.WriteResponse(stream, rhp4.ErrHostShuttingDown.(*rhp4.RPCError))
1245+
stream.Close()
1246+
return
1247+
}
1248+
defer done()
12331249
defer func() {
12341250
if err := stream.Close(); err != nil {
12351251
log.Debug("failed to close stream", zap.Error(err))
@@ -1255,6 +1271,8 @@ func errorDecodingError(f string, p ...any) error {
12551271
// NewServer creates a new RHP4 server
12561272
func NewServer(pk types.PrivateKey, cm ChainManager, contracts Contractor, wallet Wallet, settings Settings, sectors Sectors, opts ...ServerOption) *Server {
12571273
s := &Server{
1274+
tg: threadgroup.New(),
1275+
12581276
hostKey: pk,
12591277
priceTableValidity: 30 * time.Minute,
12601278
rpcTimeout: 10 * time.Minute,

0 commit comments

Comments
 (0)