Skip to content

Commit 19df207

Browse files
authored
Merge pull request #8 from AxeForging/feature/grpc-ws-checks
Add gRPC and WebSocket readiness checks
2 parents 98e8adf + ec29a17 commit 19df207

9 files changed

Lines changed: 419 additions & 0 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pipekit assert env-exists DEPLOY_TOKEN CLUSTER_NAME IMAGE_TAG
4949

5050
# Wait for a service to be ready
5151
pipekit wait url http://localhost:8080/healthz --timeout 150s
52+
pipekit wait grpc localhost:50051 --service my.package.Worker --timeout 60s
53+
pipekit wait ws ws://localhost:8080/events --timeout 60s
5254

5355
# Retry a flaky command with exponential backoff
5456
pipekit retry run --attempts 5 --delay 5s --backoff -- helm upgrade --install myapp ./chart

actions/assert.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,52 @@ func AssertCommand() cli.Command {
131131
return nil
132132
},
133133
},
134+
{
135+
Name: "grpc",
136+
Usage: "assert a gRPC health endpoint reports SERVING",
137+
ArgsUsage: "HOST:PORT",
138+
Flags: []cli.Flag{
139+
cli.StringFlag{Name: "service", Usage: "gRPC health service name"},
140+
cli.StringFlag{Name: "timeout", Value: "10s", Usage: "request timeout"},
141+
cli.BoolFlag{Name: "tls", Usage: "use TLS for the gRPC connection"},
142+
},
143+
Action: func(c *cli.Context) error {
144+
address := c.Args().First()
145+
if address == "" {
146+
return cli.NewExitError("address (host:port) required", 1)
147+
}
148+
timeout, err := time.ParseDuration(c.String("timeout"))
149+
if err != nil {
150+
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
151+
}
152+
if err := services.AssertGRPCHealth(address, c.String("service"), timeout, c.Bool("tls")); err != nil {
153+
return cli.NewExitError(err.Error(), 1)
154+
}
155+
return nil
156+
},
157+
},
158+
{
159+
Name: "ws",
160+
Usage: "assert a WebSocket endpoint accepts an upgrade",
161+
ArgsUsage: "WS_URL",
162+
Flags: []cli.Flag{
163+
cli.StringFlag{Name: "timeout", Value: "10s", Usage: "request timeout"},
164+
},
165+
Action: func(c *cli.Context) error {
166+
urlStr := c.Args().First()
167+
if urlStr == "" {
168+
return cli.NewExitError("WebSocket URL required", 1)
169+
}
170+
timeout, err := time.ParseDuration(c.String("timeout"))
171+
if err != nil {
172+
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
173+
}
174+
if err := services.AssertWebSocket(urlStr, timeout); err != nil {
175+
return cli.NewExitError(err.Error(), 1)
176+
}
177+
return nil
178+
},
179+
},
134180
{
135181
Name: "path",
136182
Usage: "assert one or more paths exist (file or directory)",

actions/wait.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,61 @@ func WaitCommand() cli.Command {
8787
return nil
8888
},
8989
},
90+
{
91+
Name: "grpc",
92+
Usage: "poll a gRPC health endpoint until it reports SERVING",
93+
ArgsUsage: "HOST:PORT",
94+
Flags: append(commonFlags,
95+
cli.StringFlag{Name: "service", Usage: "gRPC health service name"},
96+
cli.BoolFlag{Name: "tls", Usage: "use TLS for the gRPC connection"},
97+
),
98+
Action: func(c *cli.Context) error {
99+
address := c.Args().First()
100+
if address == "" {
101+
return cli.NewExitError("address (host:port) required", 1)
102+
}
103+
timeout, err := time.ParseDuration(c.String("timeout"))
104+
if err != nil {
105+
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
106+
}
107+
interval, err := time.ParseDuration(c.String("interval"))
108+
if err != nil {
109+
return cli.NewExitError("invalid interval: "+err.Error(), 1)
110+
}
111+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
112+
defer cancel()
113+
if err := services.WaitForGRPCHealth(ctx, address, c.String("service"), interval, c.Bool("backoff"), c.Bool("quiet"), c.Bool("tls")); err != nil {
114+
return cli.NewExitError(err.Error(), 1)
115+
}
116+
return nil
117+
},
118+
},
119+
{
120+
Name: "ws",
121+
Usage: "poll a WebSocket endpoint until it accepts an upgrade",
122+
ArgsUsage: "WS_URL",
123+
Flags: commonFlags,
124+
Action: func(c *cli.Context) error {
125+
urlStr := c.Args().First()
126+
if urlStr == "" {
127+
return cli.NewExitError("WebSocket URL required", 1)
128+
}
129+
timeout, err := time.ParseDuration(c.String("timeout"))
130+
if err != nil {
131+
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
132+
}
133+
interval, err := time.ParseDuration(c.String("interval"))
134+
if err != nil {
135+
return cli.NewExitError("invalid interval: "+err.Error(), 1)
136+
}
137+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
138+
defer cancel()
139+
if err := services.WaitForWebSocket(ctx, urlStr, interval, c.Bool("backoff"), c.Bool("quiet")); err != nil {
140+
return cli.NewExitError(err.Error(), 1)
141+
}
142+
return nil
143+
},
144+
},
90145
{
91146
Name: "command",
92147
Usage: "retry a shell command until it exits 0",

docs/COMMANDS.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,12 @@ pipekit assert compare 2.0.0 gt 1.5.0
308308
# URL returns one of the expected statuses
309309
pipekit assert url https://api.example.com/health --expected-status 200,204
310310

311+
# gRPC health endpoint reports SERVING
312+
pipekit assert grpc localhost:50051 --service my.package.Worker
313+
314+
# WebSocket endpoint accepts an upgrade
315+
pipekit assert ws ws://localhost:8080/events
316+
311317
# Path exists (file or directory)
312318
pipekit assert path /etc/myapp/config.yaml /var/lib/myapp
313319

@@ -404,6 +410,12 @@ pipekit wait url http://localhost:8080/healthz --expected-body "healthy"
404410
# TCP port
405411
pipekit wait tcp localhost:5432 --timeout 60s
406412

413+
# gRPC health endpoint
414+
pipekit wait grpc localhost:50051 --service my.package.Worker --timeout 60s
415+
416+
# WebSocket endpoint
417+
pipekit wait ws ws://localhost:8080/events --timeout 60s
418+
407419
# Arbitrary command (exit 0 = ready)
408420
pipekit wait command "pg_isready -h localhost" --timeout 30s --backoff
409421

@@ -419,6 +431,8 @@ pipekit wait url http://localhost:8080/healthz --quiet
419431
| `--quiet` | Suppress per-attempt output | `false` |
420432
| `--expected-status` | Acceptable HTTP codes (csv) | `200` |
421433
| `--expected-body` | Substring to look for in response body ||
434+
| `--service` | gRPC health service name ||
435+
| `--tls` | Use TLS for gRPC health checks | `false` |
422436
423437
</details>
424438

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ require (
88
github.com/Masterminds/semver/v3 v3.4.0
99
github.com/bmatcuk/doublestar/v4 v4.10.0
1010
github.com/google/uuid v1.6.0
11+
github.com/gorilla/websocket v1.5.3
1112
github.com/itchyny/gojq v0.12.18
1213
github.com/pelletier/go-toml/v2 v2.3.1
1314
github.com/rs/zerolog v1.34.0
1415
github.com/urfave/cli v1.22.17
16+
google.golang.org/grpc v1.72.2
1517
gopkg.in/yaml.v3 v3.0.1
1618
)
1719

@@ -21,5 +23,9 @@ require (
2123
github.com/mattn/go-colorable v0.1.13 // indirect
2224
github.com/mattn/go-isatty v0.0.20 // indirect
2325
github.com/russross/blackfriday/v2 v2.1.0 // indirect
26+
golang.org/x/net v0.39.0 // indirect
2427
golang.org/x/sys v0.38.0 // indirect
28+
golang.org/x/text v0.24.0 // indirect
29+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
30+
google.golang.org/protobuf v1.36.6 // indirect
2531
)

go.sum

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,19 @@ github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N
99
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1010
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1111
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
12+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
13+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
14+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
15+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
1216
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
17+
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
18+
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
19+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
20+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1321
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1422
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
23+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
24+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
1525
github.com/itchyny/gojq v0.12.18 h1:gFGHyt/MLbG9n6dqnvlliiya2TaMMh6FFaR2b1H6Drc=
1626
github.com/itchyny/gojq v0.12.18/go.mod h1:4hPoZ/3lN9fDL1D+aK7DY1f39XZpY9+1Xpjz8atrEkg=
1727
github.com/itchyny/timefmt-go v0.1.7 h1:xyftit9Tbw+Dc/huSSPJaEmX1TVL8lw5vxjJLK4GMMA=
@@ -43,11 +53,33 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
4353
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
4454
github.com/urfave/cli v1.22.17 h1:SYzXoiPfQjHBbkYxbew5prZHS1TOLT3ierW8SYLqtVQ=
4555
github.com/urfave/cli v1.22.17/go.mod h1:b0ht0aqgH/6pBYzzxURyrM4xXNgsoT/n2ZzwQiEhNVo=
56+
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
57+
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
58+
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
59+
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
60+
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
61+
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
62+
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
63+
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
64+
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
65+
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
66+
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
67+
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
68+
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
69+
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
4670
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4771
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4872
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4973
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
5074
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
75+
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
76+
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
77+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
78+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
79+
google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8=
80+
google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
81+
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
82+
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
5183
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
5284
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
5385
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

services/probe_service.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package services
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/gorilla/websocket"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials"
12+
"google.golang.org/grpc/credentials/insecure"
13+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
14+
)
15+
16+
// AssertGRPCHealth checks the standard gRPC health service once.
17+
func AssertGRPCHealth(address string, serviceName string, timeout time.Duration, useTLS bool) error {
18+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
19+
defer cancel()
20+
21+
opts := []grpc.DialOption{grpc.WithBlock()}
22+
if useTLS {
23+
opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
24+
} else {
25+
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
26+
}
27+
28+
conn, err := grpc.DialContext(ctx, address, opts...)
29+
if err != nil {
30+
return fmt.Errorf("connecting to gRPC %s: %w", address, err)
31+
}
32+
defer conn.Close()
33+
34+
resp, err := healthpb.NewHealthClient(conn).Check(ctx, &healthpb.HealthCheckRequest{Service: serviceName})
35+
if err != nil {
36+
return fmt.Errorf("checking gRPC health %s: %w", address, err)
37+
}
38+
if resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
39+
return fmt.Errorf("gRPC %s health status %s, expected SERVING", address, resp.GetStatus())
40+
}
41+
return nil
42+
}
43+
44+
// AssertWebSocket checks that a WebSocket endpoint accepts an upgrade once.
45+
func AssertWebSocket(urlStr string, timeout time.Duration) error {
46+
dialer := websocket.Dialer{HandshakeTimeout: timeout}
47+
conn, resp, err := dialer.Dial(urlStr, nil)
48+
if err != nil {
49+
if resp != nil {
50+
return fmt.Errorf("connecting to WebSocket %s: status %d: %w", urlStr, resp.StatusCode, err)
51+
}
52+
return fmt.Errorf("connecting to WebSocket %s: %w", urlStr, err)
53+
}
54+
defer conn.Close()
55+
56+
if resp == nil || resp.StatusCode != http.StatusSwitchingProtocols {
57+
status := 0
58+
if resp != nil {
59+
status = resp.StatusCode
60+
}
61+
return fmt.Errorf("WebSocket %s returned status %d, expected 101", urlStr, status)
62+
}
63+
return nil
64+
}

0 commit comments

Comments
 (0)