Skip to content

Commit 24c16e5

Browse files
admin: Support creating incidents and adding serials
1 parent 231c89e commit 24c16e5

26 files changed

Lines changed: 2347 additions & 611 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ test/coverage
5050
test/secrets/badkeyrevoker_dburl
5151
test/secrets/cert_checker_dburl
5252
test/secrets/incidents_dburl
53+
test/secrets/admin_dburl
5354
test/secrets/revoker_dburl
5455
test/secrets/sa_dburl
5556
test/secrets/sa_ro_dburl

cmd/admin/admin.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type admin struct {
2323
rac adminRAClient
2424
sac adminSAClient
2525
saroc sapb.StorageAuthorityReadOnlyClient
26+
saac saAdminClient
2627

2728
clk clock.Clock
2829
log blog.Logger
@@ -43,6 +44,15 @@ type adminSAClient interface {
4344
UnpauseAccount(context.Context, *sapb.RegistrationID, ...grpc.CallOption) (*sapb.Count, error)
4445
}
4546

47+
// saAdminClient defines the StorageAuthorityAdmin methods that the admin tool
48+
// relies on. This is a separate gRPC service so that admin authorization for
49+
// admin-only methods does not also grant the broader StorageAuthority surface.
50+
type saAdminClient interface {
51+
AddSerialsToIncident(context.Context, ...grpc.CallOption) (grpc.ClientStreamingClient[sapb.AddSerialsToIncidentRequest, emptypb.Empty], error)
52+
CreateIncident(context.Context, *sapb.CreateIncidentRequest, ...grpc.CallOption) (*sapb.Incident, error)
53+
UpdateIncident(context.Context, *sapb.UpdateIncidentRequest, ...grpc.CallOption) (*emptypb.Empty, error)
54+
}
55+
4656
// newAdmin constructs a new admin object on the heap and returns a pointer to
4757
// it.
4858
func newAdmin(configFile string, dryRun bool) (*admin, error) {
@@ -84,13 +94,16 @@ func newAdmin(configFile string, dryRun bool) (*admin, error) {
8494
saroc := sapb.NewStorageAuthorityReadOnlyClient(saConn)
8595

8696
var sac adminSAClient = dryRunSAC{log: logger}
97+
var saAdmin saAdminClient = dryRunSAAdmin{log: logger}
8798
if !dryRun {
8899
sac = sapb.NewStorageAuthorityClient(saConn)
100+
saAdmin = sapb.NewStorageAuthorityAdminClient(saConn)
89101
}
90102

91103
return &admin{
92104
rac: rac,
93105
sac: sac,
106+
saac: saAdmin,
94107
saroc: saroc,
95108
clk: clk,
96109
log: logger,

cmd/admin/dryrun.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,41 @@ func (d dryRunSAC) UnpauseAccount(_ context.Context, req *sapb.RegistrationID, _
6262
d.log.Infof("dry-run: Unpause account %d", req.Id)
6363
return &sapb.Count{Count: 1}, nil
6464
}
65+
66+
type dryRunSAAdmin struct {
67+
log blog.Logger
68+
}
69+
70+
var _ saAdminClient = (*dryRunSAAdmin)(nil)
71+
72+
func (d dryRunSAAdmin) CreateIncident(_ context.Context, req *sapb.CreateIncidentRequest, _ ...grpc.CallOption) (*sapb.Incident, error) {
73+
d.log.Infof("dry-run: Create incident %q (url=%q, renewBy=%s)", req.SerialTable, req.Url, req.RenewBy.AsTime())
74+
return &sapb.Incident{SerialTable: req.SerialTable, Url: req.Url, RenewBy: req.RenewBy, Enabled: false}, nil
75+
}
76+
77+
func (d dryRunSAAdmin) UpdateIncident(_ context.Context, req *sapb.UpdateIncidentRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
78+
d.log.Infof("dry-run: Update incident %q url=%q renewBy=%v enabled=%v", req.SerialTable, req.Url, req.RenewBy, req.Enabled)
79+
return &emptypb.Empty{}, nil
80+
}
81+
82+
func (d dryRunSAAdmin) AddSerialsToIncident(_ context.Context, _ ...grpc.CallOption) (grpc.ClientStreamingClient[sapb.AddSerialsToIncidentRequest, emptypb.Empty], error) {
83+
return &dryRunAddSerialsStream{log: d.log}, nil
84+
}
85+
86+
type dryRunAddSerialsStream struct {
87+
grpc.ClientStream
88+
log blog.Logger
89+
incident string
90+
count int
91+
}
92+
93+
func (d *dryRunAddSerialsStream) Send(req *sapb.AddSerialsToIncidentRequest) error {
94+
d.incident = req.SerialTable
95+
d.count += len(req.Serial)
96+
return nil
97+
}
98+
99+
func (d *dryRunAddSerialsStream) CloseAndRecv() (*emptypb.Empty, error) {
100+
d.log.Infof("dry-run: Add %d serials to incident %q", d.count, d.incident)
101+
return &emptypb.Empty{}, nil
102+
}

cmd/admin/incident.go

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"errors"
7+
"flag"
8+
"fmt"
9+
"os"
10+
"strconv"
11+
"strings"
12+
"sync/atomic"
13+
"text/tabwriter"
14+
"time"
15+
16+
"golang.org/x/sync/errgroup"
17+
"google.golang.org/protobuf/types/known/emptypb"
18+
"google.golang.org/protobuf/types/known/timestamppb"
19+
20+
"github.com/letsencrypt/boulder/sa"
21+
sapb "github.com/letsencrypt/boulder/sa/proto"
22+
)
23+
24+
type subcommandCreateIncident struct {
25+
incident string
26+
url string
27+
renewBy string
28+
}
29+
30+
var _ subcommand = (*subcommandCreateIncident)(nil)
31+
32+
func (*subcommandCreateIncident) Desc() string {
33+
return "Create a new incident table and metadata row (starts disabled)."
34+
}
35+
36+
func (s *subcommandCreateIncident) Flags(f *flag.FlagSet) {
37+
f.StringVar(&s.incident, "incident", "", "Incident name (must start with 'incident_'; required)")
38+
f.StringVar(&s.url, "url", "", "URL describing the incident (required)")
39+
f.StringVar(&s.renewBy, "renew-by", "", "RFC3339 timestamp by which affected certs should be renewed (required)")
40+
}
41+
42+
func (s *subcommandCreateIncident) Run(ctx context.Context, a *admin) error {
43+
if s.incident == "" || s.url == "" || s.renewBy == "" {
44+
return errors.New("-incident, -url, and -renew-by are all required")
45+
}
46+
if !sa.ValidIncidentTableRegexp.MatchString(s.incident) {
47+
return fmt.Errorf("invalid incident %q (must match %s)", s.incident, sa.ValidIncidentTableRegexp)
48+
}
49+
renewBy, err := time.Parse(time.RFC3339, s.renewBy)
50+
if err != nil {
51+
return fmt.Errorf("parsing -renew-by as RFC3339: %w", err)
52+
}
53+
54+
inc, err := a.saac.CreateIncident(ctx, &sapb.CreateIncidentRequest{
55+
SerialTable: s.incident,
56+
Url: s.url,
57+
RenewBy: timestamppb.New(renewBy),
58+
})
59+
if err != nil {
60+
return fmt.Errorf("creating incident: %w", err)
61+
}
62+
a.log.Infof("Created incident %q url=%q renewBy=%s enabled=%t",
63+
inc.SerialTable, inc.Url, inc.RenewBy.AsTime(), inc.Enabled)
64+
return nil
65+
}
66+
67+
type subcommandListIncidents struct{}
68+
69+
var _ subcommand = (*subcommandListIncidents)(nil)
70+
71+
func (*subcommandListIncidents) Desc() string {
72+
return "List all incidents and their enabled status."
73+
}
74+
75+
func (*subcommandListIncidents) Flags(_ *flag.FlagSet) {}
76+
77+
func (*subcommandListIncidents) Run(ctx context.Context, a *admin) error {
78+
resp, err := a.saroc.ListIncidents(ctx, &emptypb.Empty{})
79+
if err != nil {
80+
return fmt.Errorf("listing incidents: %w", err)
81+
}
82+
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
83+
fmt.Fprintln(w, "NAME\tENABLED\tRENEW BY\tURL")
84+
for _, inc := range resp.Incidents {
85+
fmt.Fprintf(w, "%s\t%t\t%s\t%s\n",
86+
inc.SerialTable, inc.Enabled,
87+
inc.RenewBy.AsTime().Format(time.RFC3339), inc.Url)
88+
}
89+
return w.Flush()
90+
}
91+
92+
type subcommandUpdateIncident struct {
93+
incident string
94+
url string
95+
renewBy string
96+
enable string
97+
}
98+
99+
var _ subcommand = (*subcommandUpdateIncident)(nil)
100+
101+
func (*subcommandUpdateIncident) Desc() string {
102+
return "Update the url, renew-by, and/or enable fields of an existing incident."
103+
}
104+
105+
func (s *subcommandUpdateIncident) Flags(f *flag.FlagSet) {
106+
f.StringVar(&s.incident, "incident", "", "Incident name (must start with 'incident_'; required)")
107+
f.StringVar(&s.url, "url", "", "URL describing the incident (leave unset to keep the existing value)")
108+
f.StringVar(&s.renewBy, "renew-by", "", "RFC3339 timestamp by which affected certs should be renewed (leave unset to keep the existing value)")
109+
f.StringVar(&s.enable, "enable", "", `"true" to enable, "false" to disable (leave unset to keep the existing value)`)
110+
}
111+
112+
func (s *subcommandUpdateIncident) Run(ctx context.Context, a *admin) error {
113+
if s.incident == "" {
114+
return errors.New("-incident is required")
115+
}
116+
if !sa.ValidIncidentTableRegexp.MatchString(s.incident) {
117+
return fmt.Errorf("invalid incident %q (must match %s)", s.incident, sa.ValidIncidentTableRegexp)
118+
}
119+
if s.url == "" && s.renewBy == "" && s.enable == "" {
120+
return errors.New("at least one of -url, -renew-by, or -enable must be set")
121+
}
122+
req := &sapb.UpdateIncidentRequest{SerialTable: s.incident, Url: s.url}
123+
if s.renewBy != "" {
124+
t, err := time.Parse(time.RFC3339, s.renewBy)
125+
if err != nil {
126+
return fmt.Errorf("parsing -renew-by as RFC3339: %w", err)
127+
}
128+
req.RenewBy = timestamppb.New(t)
129+
}
130+
if s.enable != "" {
131+
v, err := strconv.ParseBool(s.enable)
132+
if err != nil {
133+
return fmt.Errorf("parsing -enable as bool: %w", err)
134+
}
135+
req.Enabled = &v
136+
}
137+
_, err := a.saac.UpdateIncident(ctx, req)
138+
if err != nil {
139+
return fmt.Errorf("updating incident %q: %w", s.incident, err)
140+
}
141+
a.log.Infof("Updated incident %q", s.incident)
142+
return nil
143+
}
144+
145+
type subcommandLoadIncidentSerials struct {
146+
incident string
147+
serialsFile string
148+
parallelism uint
149+
}
150+
151+
var _ subcommand = (*subcommandLoadIncidentSerials)(nil)
152+
153+
func (*subcommandLoadIncidentSerials) Desc() string {
154+
return "Load serials from a file into an existing incident."
155+
}
156+
157+
func (s *subcommandLoadIncidentSerials) Flags(f *flag.FlagSet) {
158+
f.StringVar(&s.incident, "incident", "", "Incident name (must start with 'incident_'; required)")
159+
f.StringVar(&s.serialsFile, "serials-file", "", "File of hex serials, one per line (required)")
160+
f.UintVar(&s.parallelism, "parallelism", 10, "Parallel workers, each with its own stream to the SA")
161+
}
162+
163+
// serialsBatchMax is the number of serials each worker accumulates before
164+
// emitting one Send on its gRPC stream. Sized to match the SA's flush batch so
165+
// each Recv on the server roughly maps to one transaction. Each message is
166+
// ~320KB at full batch (10000 × ~32-byte serials), well under the gRPC default
167+
// 4MB max.
168+
const serialsBatchMax = 10000
169+
170+
func (s *subcommandLoadIncidentSerials) Run(ctx context.Context, a *admin) error {
171+
if s.incident == "" || s.serialsFile == "" {
172+
return errors.New("-incident and -serials-file are required")
173+
}
174+
if !sa.ValidIncidentTableRegexp.MatchString(s.incident) {
175+
return fmt.Errorf("invalid incident %q", s.incident)
176+
}
177+
if s.parallelism == 0 {
178+
return errors.New("-parallelism must be > 0")
179+
}
180+
181+
file, err := os.Open(s.serialsFile)
182+
if err != nil {
183+
return fmt.Errorf("opening serials file: %w", err)
184+
}
185+
defer file.Close()
186+
187+
a.log.Infof("Loading serials from %q into incident %q with parallelism=%d.",
188+
s.serialsFile, s.incident, s.parallelism)
189+
190+
var totalSent atomic.Uint64
191+
work := make(chan string, s.parallelism)
192+
g, gctx := errgroup.WithContext(ctx)
193+
194+
g.Go(func() error {
195+
defer close(work)
196+
scanner := bufio.NewScanner(file)
197+
lineNum := 0
198+
for scanner.Scan() {
199+
lineNum++
200+
raw := scanner.Text()
201+
if strings.TrimSpace(raw) == "" {
202+
continue
203+
}
204+
cleaned, err := cleanSerials([]string{raw})
205+
if err != nil {
206+
return fmt.Errorf("line %d: %w", lineNum, err)
207+
}
208+
select {
209+
case work <- cleaned[0]:
210+
case <-gctx.Done():
211+
return gctx.Err()
212+
}
213+
}
214+
return scanner.Err()
215+
})
216+
217+
for range s.parallelism {
218+
g.Go(func() error {
219+
stream, err := a.saac.AddSerialsToIncident(gctx)
220+
if err != nil {
221+
return fmt.Errorf("opening stream: %w", err)
222+
}
223+
var buf []string
224+
flushSerials := func() error {
225+
if len(buf) == 0 {
226+
return nil
227+
}
228+
err := stream.Send(&sapb.AddSerialsToIncidentRequest{
229+
SerialTable: s.incident,
230+
Serial: buf,
231+
})
232+
if err != nil {
233+
buf = buf[:0]
234+
return err
235+
}
236+
n := totalSent.Add(uint64(len(buf)))
237+
prev := n - uint64(len(buf))
238+
if prev/100000 != n/100000 {
239+
a.log.Infof("Sent %d serials total", n)
240+
}
241+
buf = buf[:0]
242+
return nil
243+
}
244+
for serial := range work {
245+
buf = append(buf, serial)
246+
if len(buf) >= serialsBatchMax {
247+
err := flushSerials()
248+
if err != nil {
249+
return fmt.Errorf("sending batch: %w", err)
250+
}
251+
}
252+
}
253+
err = flushSerials()
254+
if err != nil {
255+
return fmt.Errorf("sending final batch: %w", err)
256+
}
257+
_, err = stream.CloseAndRecv()
258+
if err != nil {
259+
return fmt.Errorf("closing stream: %w", err)
260+
}
261+
return nil
262+
})
263+
}
264+
265+
err = g.Wait()
266+
if err != nil {
267+
return fmt.Errorf("loading serials: %w", err)
268+
}
269+
a.log.Infof("Done. Sent %d serials from %q into incident %q.",
270+
totalSent.Load(), s.serialsFile, s.incident)
271+
return nil
272+
}

0 commit comments

Comments
 (0)