Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ test/coverage
test/secrets/badkeyrevoker_dburl
test/secrets/cert_checker_dburl
test/secrets/incidents_dburl
test/secrets/admin_dburl
test/secrets/revoker_dburl
test/secrets/sa_dburl
test/secrets/sa_ro_dburl
13 changes: 13 additions & 0 deletions cmd/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type admin struct {
rac adminRAClient
sac adminSAClient
saroc sapb.StorageAuthorityReadOnlyClient
saac saAdminClient

clk clock.Clock
log blog.Logger
Expand All @@ -43,6 +44,15 @@ type adminSAClient interface {
UnpauseAccount(context.Context, *sapb.RegistrationID, ...grpc.CallOption) (*sapb.Count, error)
}

// saAdminClient defines the StorageAuthorityAdmin methods that the admin tool
// relies on. This is a separate gRPC service so that admin authorization for
// admin-only methods does not also grant the broader StorageAuthority surface.
type saAdminClient interface {
AddSerialsToIncident(context.Context, ...grpc.CallOption) (grpc.ClientStreamingClient[sapb.AddSerialsToIncidentRequest, emptypb.Empty], error)
CreateIncident(context.Context, *sapb.CreateIncidentRequest, ...grpc.CallOption) (*sapb.Incident, error)
UpdateIncident(context.Context, *sapb.UpdateIncidentRequest, ...grpc.CallOption) (*emptypb.Empty, error)
}

// newAdmin constructs a new admin object on the heap and returns a pointer to
// it.
func newAdmin(configFile string, dryRun bool) (*admin, error) {
Expand Down Expand Up @@ -84,13 +94,16 @@ func newAdmin(configFile string, dryRun bool) (*admin, error) {
saroc := sapb.NewStorageAuthorityReadOnlyClient(saConn)

var sac adminSAClient = dryRunSAC{log: logger}
var saAdmin saAdminClient = dryRunSAAdmin{log: logger}
if !dryRun {
sac = sapb.NewStorageAuthorityClient(saConn)
saAdmin = sapb.NewStorageAuthorityAdminClient(saConn)
}

return &admin{
rac: rac,
sac: sac,
saac: saAdmin,
saroc: saroc,
clk: clk,
log: logger,
Expand Down
38 changes: 38 additions & 0 deletions cmd/admin/dryrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,41 @@ func (d dryRunSAC) UnpauseAccount(_ context.Context, req *sapb.RegistrationID, _
d.log.Infof("dry-run: Unpause account %d", req.Id)
return &sapb.Count{Count: 1}, nil
}

type dryRunSAAdmin struct {
log blog.Logger
}

var _ saAdminClient = (*dryRunSAAdmin)(nil)

func (d dryRunSAAdmin) CreateIncident(_ context.Context, req *sapb.CreateIncidentRequest, _ ...grpc.CallOption) (*sapb.Incident, error) {
d.log.Infof("dry-run: Create incident %q (url=%q, renewBy=%s)", req.SerialTable, req.Url, req.RenewBy.AsTime())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up that the blog PR totally overhauls how the dry-run clients above log, so keep an eye on that depending on whether this lands before or after the blog PR.

return &sapb.Incident{SerialTable: req.SerialTable, Url: req.Url, RenewBy: req.RenewBy, Enabled: false}, nil
}

func (d dryRunSAAdmin) UpdateIncident(_ context.Context, req *sapb.UpdateIncidentRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
d.log.Infof("dry-run: Update incident %q url=%q renewBy=%v enabled=%v", req.SerialTable, req.Url, req.RenewBy, req.Enabled)
return &emptypb.Empty{}, nil
}

func (d dryRunSAAdmin) AddSerialsToIncident(_ context.Context, _ ...grpc.CallOption) (grpc.ClientStreamingClient[sapb.AddSerialsToIncidentRequest, emptypb.Empty], error) {
return &dryRunAddSerialsStream{log: d.log}, nil
}

type dryRunAddSerialsStream struct {
grpc.ClientStream
log blog.Logger
incident string
count int
}

func (d *dryRunAddSerialsStream) Send(req *sapb.AddSerialsToIncidentRequest) error {
d.incident = req.SerialTable
d.count += len(req.Serial)
return nil
}

func (d *dryRunAddSerialsStream) CloseAndRecv() (*emptypb.Empty, error) {
d.log.Infof("dry-run: Add %d serials to incident %q", d.count, d.incident)
return &emptypb.Empty{}, nil
}
272 changes: 272 additions & 0 deletions cmd/admin/incident.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package main

import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync/atomic"
"text/tabwriter"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/letsencrypt/boulder/sa"
sapb "github.com/letsencrypt/boulder/sa/proto"
)

type subcommandCreateIncident struct {
incident string
url string
renewBy string
}

var _ subcommand = (*subcommandCreateIncident)(nil)

func (*subcommandCreateIncident) Desc() string {
return "Create a new incident table and metadata row (starts disabled)."
}

func (s *subcommandCreateIncident) Flags(f *flag.FlagSet) {
f.StringVar(&s.incident, "incident", "", "Incident name (must start with 'incident_'; required)")
f.StringVar(&s.url, "url", "", "URL describing the incident (required)")
f.StringVar(&s.renewBy, "renew-by", "", "RFC3339 timestamp by which affected certs should be renewed (required)")
}

func (s *subcommandCreateIncident) Run(ctx context.Context, a *admin) error {
if s.incident == "" || s.url == "" || s.renewBy == "" {
return errors.New("-incident, -url, and -renew-by are all required")
}
if !sa.ValidIncidentTableRegexp.MatchString(s.incident) {
return fmt.Errorf("invalid incident %q (must match %s)", s.incident, sa.ValidIncidentTableRegexp)
}
renewBy, err := time.Parse(time.RFC3339, s.renewBy)
if err != nil {
return fmt.Errorf("parsing -renew-by as RFC3339: %w", err)
}

inc, err := a.saac.CreateIncident(ctx, &sapb.CreateIncidentRequest{
SerialTable: s.incident,
Url: s.url,
RenewBy: timestamppb.New(renewBy),
})
if err != nil {
return fmt.Errorf("creating incident: %w", err)
}
a.log.Infof("Created incident %q url=%q renewBy=%s enabled=%t",
inc.SerialTable, inc.Url, inc.RenewBy.AsTime(), inc.Enabled)
return nil
}

type subcommandListIncidents struct{}

var _ subcommand = (*subcommandListIncidents)(nil)

func (*subcommandListIncidents) Desc() string {
return "List all incidents and their enabled status."
}

func (*subcommandListIncidents) Flags(_ *flag.FlagSet) {}

func (*subcommandListIncidents) Run(ctx context.Context, a *admin) error {
resp, err := a.saroc.ListIncidents(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("listing incidents: %w", err)
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "NAME\tENABLED\tRENEW BY\tURL")
for _, inc := range resp.Incidents {
fmt.Fprintf(w, "%s\t%t\t%s\t%s\n",
inc.SerialTable, inc.Enabled,
inc.RenewBy.AsTime().Format(time.RFC3339), inc.Url)
}
return w.Flush()
}

type subcommandUpdateIncident struct {
incident string
url string
renewBy string
enable string
}

var _ subcommand = (*subcommandUpdateIncident)(nil)

func (*subcommandUpdateIncident) Desc() string {
return "Update the url, renew-by, and/or enable fields of an existing incident."
}

func (s *subcommandUpdateIncident) Flags(f *flag.FlagSet) {
f.StringVar(&s.incident, "incident", "", "Incident name (must start with 'incident_'; required)")
f.StringVar(&s.url, "url", "", "URL describing the incident (leave unset to keep the existing value)")
f.StringVar(&s.renewBy, "renew-by", "", "RFC3339 timestamp by which affected certs should be renewed (leave unset to keep the existing value)")
f.StringVar(&s.enable, "enable", "", `"true" to enable, "false" to disable (leave unset to keep the existing value)`)
}

func (s *subcommandUpdateIncident) Run(ctx context.Context, a *admin) error {
if s.incident == "" {
return errors.New("-incident is required")
}
if !sa.ValidIncidentTableRegexp.MatchString(s.incident) {
return fmt.Errorf("invalid incident %q (must match %s)", s.incident, sa.ValidIncidentTableRegexp)
}
if s.url == "" && s.renewBy == "" && s.enable == "" {
return errors.New("at least one of -url, -renew-by, or -enable must be set")
}
req := &sapb.UpdateIncidentRequest{SerialTable: s.incident, Url: s.url}
if s.renewBy != "" {
t, err := time.Parse(time.RFC3339, s.renewBy)
if err != nil {
return fmt.Errorf("parsing -renew-by as RFC3339: %w", err)
}
req.RenewBy = timestamppb.New(t)
}
if s.enable != "" {
v, err := strconv.ParseBool(s.enable)
if err != nil {
return fmt.Errorf("parsing -enable as bool: %w", err)
}
req.Enabled = &v
}
_, err := a.saac.UpdateIncident(ctx, req)
if err != nil {
return fmt.Errorf("updating incident %q: %w", s.incident, err)
}
a.log.Infof("Updated incident %q", s.incident)
return nil
}

type subcommandLoadIncidentSerials struct {
incident string
serialsFile string
parallelism uint
}

var _ subcommand = (*subcommandLoadIncidentSerials)(nil)

func (*subcommandLoadIncidentSerials) Desc() string {
return "Load serials from a file into an existing incident."
}

func (s *subcommandLoadIncidentSerials) Flags(f *flag.FlagSet) {
f.StringVar(&s.incident, "incident", "", "Incident name (must start with 'incident_'; required)")
f.StringVar(&s.serialsFile, "serials-file", "", "File of hex serials, one per line (required)")
f.UintVar(&s.parallelism, "parallelism", 10, "Parallel workers, each with its own stream to the SA")
}

// serialsBatchMax is the number of serials each worker accumulates before
// emitting one Send on its gRPC stream. Sized to match the SA's flush batch so
// each Recv on the server roughly maps to one transaction. Each message is
// ~320KB at full batch (10000 × ~32-byte serials), well under the gRPC default
// 4MB max.
const serialsBatchMax = 10000

func (s *subcommandLoadIncidentSerials) Run(ctx context.Context, a *admin) error {
if s.incident == "" || s.serialsFile == "" {
return errors.New("-incident and -serials-file are required")
}
if !sa.ValidIncidentTableRegexp.MatchString(s.incident) {
return fmt.Errorf("invalid incident %q", s.incident)
}
if s.parallelism == 0 {
return errors.New("-parallelism must be > 0")
}

file, err := os.Open(s.serialsFile)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea: if s.serialsFile is -, then open stdin. Alternatively, add documentation noting that a user could pass /dev/stdin as the file to read from if they want to stream serials on stdin.

if err != nil {
return fmt.Errorf("opening serials file: %w", err)
}
defer file.Close()

a.log.Infof("Loading serials from %q into incident %q with parallelism=%d.",
s.serialsFile, s.incident, s.parallelism)

var totalSent atomic.Uint64
work := make(chan string, s.parallelism)
g, gctx := errgroup.WithContext(ctx)

g.Go(func() error {
defer close(work)
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
raw := scanner.Text()
if strings.TrimSpace(raw) == "" {
continue
}
cleaned, err := cleanSerials([]string{raw})
if err != nil {
return fmt.Errorf("line %d: %w", lineNum, err)
}
select {
case work <- cleaned[0]:
case <-gctx.Done():
return gctx.Err()
}
}
return scanner.Err()
})

for range s.parallelism {
g.Go(func() error {
stream, err := a.saac.AddSerialsToIncident(gctx)
if err != nil {
return fmt.Errorf("opening stream: %w", err)
}
var buf []string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: calling this buf feels weird, because I expect that to be a byte buffer, not a list of strings. Maybe pool or batch or something like that.

Larger comment: how much performance win are we getting from batching both here and at the SA level? Batching here (and unbatching at the SA, before immediately rebatching with a batch size that happens to be the same but can't be assumed to be the same) adds a lot of complexity. We haven't decided that we need that level of complexity for producing CRLs, which have to handle roughly the same number of serials as this, since any serials we load here will end up on CRLs a few minutes later. If a load test has shown that gRPC between the admin tool and the SA is a bottleneck we need to overcome, then great, this is worth it. But if doing 10k-row inserts into the database is still going to be the primary bottleneck, I'd have a preference for keeping this simple.

flushSerials := func() error {
if len(buf) == 0 {
return nil
}
err := stream.Send(&sapb.AddSerialsToIncidentRequest{
SerialTable: s.incident,
Serial: buf,
})
if err != nil {
buf = buf[:0]
return err
}
n := totalSent.Add(uint64(len(buf)))
prev := n - uint64(len(buf))
if prev/100000 != n/100000 {
a.log.Infof("Sent %d serials total", n)
}
buf = buf[:0]
return nil
}
for serial := range work {
buf = append(buf, serial)
if len(buf) >= serialsBatchMax {
err := flushSerials()
if err != nil {
return fmt.Errorf("sending batch: %w", err)
}
}
}
err = flushSerials()
if err != nil {
return fmt.Errorf("sending final batch: %w", err)
}
_, err = stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("closing stream: %w", err)
}
return nil
})
}

err = g.Wait()
if err != nil {
return fmt.Errorf("loading serials: %w", err)
}
a.log.Infof("Done. Sent %d serials from %q into incident %q.",
totalSent.Load(), s.serialsFile, s.incident)
return nil
}
Loading
Loading