Skip to content

Commit 49070af

Browse files
authored
server/grpc: improve graceful stop behavior (#2892)
* server/grpc: improve graceful stop behavior * server/grpc: add graceful stop example and test * examples/graceful-stop: document verification steps
1 parent 03f4759 commit 49070af

5 files changed

Lines changed: 349 additions & 12 deletions

File tree

examples/graceful-stop/README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Graceful Stop Demo
2+
3+
This example demonstrates the intended shutdown behavior after the gRPC graceful-stop patch.
4+
5+
## Run
6+
7+
```bash
8+
go run ./examples/graceful-stop
9+
```
10+
11+
## Expected behavior
12+
13+
- one long-running RPC starts
14+
- shutdown begins while that RPC is still running
15+
- new RPCs stop being accepted shortly after shutdown starts
16+
- the in-flight RPC is allowed to finish
17+
18+
Typical output:
19+
20+
```text
21+
long RPC is running; starting shutdown
22+
new RPC rejected after shutdown began: ...
23+
long RPC completed: slept for 1500ms
24+
done
25+
```
26+
27+
There may be a small race window where the first post-stop RPC is still accepted once before subsequent new RPCs are rejected. The important part is that in-flight RPCs are drained while new RPCs are cut off.
28+
29+
## Automated check
30+
31+
```bash
32+
go test ./server/grpc -run TestGracefulStopRejectsNewRPCsButAllowsInFlightRPCs -v
33+
```
34+
35+
## Environment
36+
37+
- no special environment variables are required
38+
- the demo may print a TLS warning from `go-micro`; it is unrelated to this change

examples/graceful-stop/main.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"net"
8+
"time"
9+
10+
micro "go-micro.dev/v5"
11+
"go-micro.dev/v5/client"
12+
grpcclient "go-micro.dev/v5/client/grpc"
13+
"go-micro.dev/v5/registry"
14+
"go-micro.dev/v5/server"
15+
grpcserver "go-micro.dev/v5/server/grpc"
16+
)
17+
18+
type SleepRequest struct {
19+
DelayMS int `json:"delay_ms"`
20+
}
21+
22+
type SleepResponse struct {
23+
Message string `json:"message"`
24+
}
25+
26+
type Sleeper struct {
27+
started chan struct{}
28+
}
29+
30+
func (s *Sleeper) Sleep(ctx context.Context, req *SleepRequest, rsp *SleepResponse) error {
31+
select {
32+
case s.started <- struct{}{}:
33+
default:
34+
}
35+
36+
timer := time.NewTimer(time.Duration(req.DelayMS) * time.Millisecond)
37+
defer timer.Stop()
38+
39+
select {
40+
case <-ctx.Done():
41+
return ctx.Err()
42+
case <-timer.C:
43+
}
44+
45+
rsp.Message = fmt.Sprintf("slept for %dms", req.DelayMS)
46+
return nil
47+
}
48+
49+
func main() {
50+
listener, err := net.Listen("tcp", "127.0.0.1:0")
51+
if err != nil {
52+
log.Fatal(err)
53+
}
54+
defer listener.Close()
55+
56+
addr := listener.Addr().String()
57+
reg := registry.NewMemoryRegistry()
58+
handler := &Sleeper{started: make(chan struct{}, 1)}
59+
60+
service := micro.New("grace-demo",
61+
micro.HandleSignal(false),
62+
micro.Registry(reg),
63+
micro.Server(grpcserver.NewServer(
64+
server.Registry(reg),
65+
server.Name("grace-demo"),
66+
server.Address(addr),
67+
grpcserver.Listener(listener),
68+
grpcserver.GracefulStopTimeout(3*time.Second),
69+
)),
70+
micro.Client(grpcclient.NewClient(
71+
client.Registry(reg),
72+
client.ContentType("application/grpc+json"),
73+
client.DialTimeout(200*time.Millisecond),
74+
client.RequestTimeout(5*time.Second),
75+
)),
76+
)
77+
78+
if err := service.Handle(handler); err != nil {
79+
log.Fatal(err)
80+
}
81+
if err := service.Start(); err != nil {
82+
log.Fatal(err)
83+
}
84+
85+
log.Printf("service started on %s", addr)
86+
87+
longDone := make(chan error, 1)
88+
go func() {
89+
req := service.Client().NewRequest("grace-demo", "Sleeper.Sleep", &SleepRequest{DelayMS: 1500})
90+
rsp := &SleepResponse{}
91+
longDone <- service.Client().Call(context.Background(), req, rsp, client.WithAddress(addr))
92+
if rsp.Message != "" {
93+
log.Printf("long RPC completed: %s", rsp.Message)
94+
}
95+
}()
96+
97+
<-handler.started
98+
log.Printf("long RPC is running; starting shutdown")
99+
100+
stopDone := make(chan error, 1)
101+
go func() {
102+
stopDone <- service.Stop()
103+
}()
104+
105+
deadline := time.Now().Add(500 * time.Millisecond)
106+
for time.Now().Before(deadline) {
107+
callCtx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
108+
req := service.Client().NewRequest("grace-demo", "Sleeper.Sleep", &SleepRequest{DelayMS: 50})
109+
rsp := &SleepResponse{}
110+
err = service.Client().Call(callCtx, req, rsp, client.WithAddress(addr))
111+
cancel()
112+
if err != nil {
113+
log.Printf("new RPC rejected after shutdown began: %v", err)
114+
break
115+
}
116+
117+
log.Printf("new RPC still accepted during shutdown: %s", rsp.Message)
118+
time.Sleep(10 * time.Millisecond)
119+
}
120+
121+
if err := <-longDone; err != nil {
122+
log.Fatalf("long RPC failed: %v", err)
123+
}
124+
if err := <-stopDone; err != nil {
125+
log.Fatalf("service stop failed: %v", err)
126+
}
127+
128+
log.Printf("done")
129+
}

server/grpc/graceful_stop_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"net"
6+
"testing"
7+
"time"
8+
9+
micro "go-micro.dev/v5"
10+
"go-micro.dev/v5/client"
11+
grpcclient "go-micro.dev/v5/client/grpc"
12+
"go-micro.dev/v5/registry"
13+
"go-micro.dev/v5/server"
14+
)
15+
16+
type SleepRequest struct {
17+
DelayMS int `json:"delay_ms"`
18+
}
19+
20+
type SleepResponse struct {
21+
Message string `json:"message"`
22+
}
23+
24+
type SleepHandler struct {
25+
started chan struct{}
26+
}
27+
28+
func (h *SleepHandler) Sleep(ctx context.Context, req *SleepRequest, rsp *SleepResponse) error {
29+
select {
30+
case h.started <- struct{}{}:
31+
default:
32+
}
33+
34+
timer := time.NewTimer(time.Duration(req.DelayMS) * time.Millisecond)
35+
defer timer.Stop()
36+
37+
select {
38+
case <-ctx.Done():
39+
return ctx.Err()
40+
case <-timer.C:
41+
}
42+
43+
rsp.Message = "ok"
44+
return nil
45+
}
46+
47+
func TestGracefulStopRejectsNewRPCsButAllowsInFlightRPCs(t *testing.T) {
48+
listener, err := net.Listen("tcp", "127.0.0.1:0")
49+
if err != nil {
50+
t.Fatalf("listen: %v", err)
51+
}
52+
defer listener.Close()
53+
54+
addr := listener.Addr().String()
55+
reg := registry.NewMemoryRegistry()
56+
handler := &SleepHandler{started: make(chan struct{}, 1)}
57+
58+
svc := micro.New("grace-demo",
59+
micro.HandleSignal(false),
60+
micro.Registry(reg),
61+
micro.Server(NewServer(
62+
server.Registry(reg),
63+
server.Name("grace-demo"),
64+
server.Address(addr),
65+
Listener(listener),
66+
GracefulStopTimeout(3*time.Second),
67+
)),
68+
micro.Client(grpcclient.NewClient(
69+
client.Registry(reg),
70+
client.ContentType("application/grpc+json"),
71+
client.DialTimeout(200*time.Millisecond),
72+
client.RequestTimeout(5*time.Second),
73+
)),
74+
)
75+
76+
if err := svc.Handle(handler); err != nil {
77+
t.Fatalf("handle: %v", err)
78+
}
79+
if err := svc.Start(); err != nil {
80+
t.Fatalf("start: %v", err)
81+
}
82+
83+
stopped := false
84+
defer func() {
85+
if !stopped {
86+
_ = svc.Stop()
87+
}
88+
}()
89+
90+
longDone := make(chan error, 1)
91+
go func() {
92+
req := svc.Client().NewRequest("grace-demo", "SleepHandler.Sleep", &SleepRequest{DelayMS: 1000})
93+
rsp := &SleepResponse{}
94+
longDone <- svc.Client().Call(context.Background(), req, rsp, client.WithAddress(addr))
95+
}()
96+
97+
select {
98+
case <-handler.started:
99+
case <-time.After(2 * time.Second):
100+
t.Fatal("timed out waiting for long RPC to start")
101+
}
102+
103+
stopDone := make(chan error, 1)
104+
go func() {
105+
stopDone <- svc.Stop()
106+
}()
107+
108+
freshReq := svc.Client().NewRequest("grace-demo", "SleepHandler.Sleep", &SleepRequest{DelayMS: 10})
109+
freshRsp := &SleepResponse{}
110+
var rejectErr error
111+
112+
deadline := time.Now().Add(300 * time.Millisecond)
113+
for time.Now().Before(deadline) {
114+
callCtx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
115+
err := svc.Client().Call(callCtx, freshReq, freshRsp, client.WithAddress(addr))
116+
cancel()
117+
if err != nil {
118+
rejectErr = err
119+
break
120+
}
121+
time.Sleep(10 * time.Millisecond)
122+
}
123+
124+
if rejectErr == nil {
125+
t.Fatal("expected a new RPC to be rejected shortly after shutdown started")
126+
}
127+
128+
select {
129+
case err := <-longDone:
130+
if err != nil {
131+
t.Fatalf("long RPC failed during graceful stop: %v", err)
132+
}
133+
case <-time.After(2 * time.Second):
134+
t.Fatal("timed out waiting for in-flight RPC to finish")
135+
}
136+
137+
select {
138+
case err := <-stopDone:
139+
if err != nil {
140+
t.Fatalf("stop: %v", err)
141+
}
142+
stopped = true
143+
case <-time.After(2 * time.Second):
144+
t.Fatal("timed out waiting for server stop")
145+
}
146+
}

server/grpc/grpc.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ import (
1818
"go-micro.dev/v5/broker"
1919
"go-micro.dev/v5/cmd"
2020
"go-micro.dev/v5/errors"
21-
"go-micro.dev/v5/logger"
22-
meta "go-micro.dev/v5/metadata"
23-
"go-micro.dev/v5/registry"
24-
"go-micro.dev/v5/server"
2521
"go-micro.dev/v5/internal/util/addr"
2622
"go-micro.dev/v5/internal/util/backoff"
2723
mgrpc "go-micro.dev/v5/internal/util/grpc"
2824
mnet "go-micro.dev/v5/internal/util/net"
25+
"go-micro.dev/v5/logger"
26+
meta "go-micro.dev/v5/metadata"
27+
"go-micro.dev/v5/registry"
28+
"go-micro.dev/v5/server"
2929
"golang.org/x/net/netutil"
3030

3131
"google.golang.org/grpc"
@@ -213,6 +213,19 @@ func (g *grpcServer) getGrpcServer() *grpc.Server {
213213
return nil
214214
}
215215

216+
func (g *grpcServer) getGracefulStopTimeout() time.Duration {
217+
if g.opts.Context == nil {
218+
return time.Second
219+
}
220+
221+
timeout, ok := g.opts.Context.Value(gracefulStopTimeoutKey{}).(time.Duration)
222+
if !ok || timeout <= 0 {
223+
return time.Second
224+
}
225+
226+
return timeout
227+
}
228+
216229
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
217230
if g.wg != nil {
218231
g.wg.Add(1)
@@ -970,23 +983,27 @@ func (g *grpcServer) Start() error {
970983
log.Log(logger.ErrorLevel, "Server deregister error: ", err)
971984
}
972985

973-
// wait for waitgroup
974-
if g.wg != nil {
975-
g.wg.Wait()
976-
}
977-
978-
// stop the grpc server
979-
exit := make(chan bool)
986+
gracefulStopTimeout := g.getGracefulStopTimeout()
987+
exit := make(chan struct{})
980988

981989
go func() {
982990
g.srv.GracefulStop()
983991
close(exit)
984992
}()
985993

994+
timer := time.NewTimer(gracefulStopTimeout)
995+
defer timer.Stop()
996+
986997
select {
987998
case <-exit:
988-
case <-time.After(time.Second):
999+
case <-timer.C:
1000+
log.Logf(logger.ErrorLevel, "gRPC Server graceful stop timed out after %s, forcing stop", gracefulStopTimeout)
9891001
g.srv.Stop()
1002+
<-exit
1003+
}
1004+
1005+
if g.wg != nil {
1006+
g.wg.Wait()
9901007
}
9911008

9921009
log.Logf(logger.InfoLevel, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())

0 commit comments

Comments
 (0)