diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index aec5eea..b360374 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -18,6 +18,4 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 - name: golangci-lint - uses: golangci/golangci-lint-action@1481404843c368bc19ca9406f87d6e0fc97bdcfd # v7.0.0 - with: - version: v2.0.2 + uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0 diff --git a/config.go b/config.go index ca8607c..599df85 100644 --- a/config.go +++ b/config.go @@ -63,7 +63,9 @@ func (c config) LogAttrs() []slog.Attr { // applying defaults where necessary. func loadConfig() (*config, error) { var cfg config - if err := env.Parse(&cfg); err != nil { + + err := env.Parse(&cfg) + if err != nil { return nil, fmt.Errorf("parse env: %w", err) } diff --git a/config_test.go b/config_test.go index 9c5c84d..379efd2 100644 --- a/config_test.go +++ b/config_test.go @@ -14,6 +14,7 @@ func clearConfigEnv(t *testing.T) { t.Helper() var cfg config + typ := reflect.TypeOf(cfg) for i := range typ.NumField() { field := typ.Field(i) @@ -85,6 +86,7 @@ func Test_loadConfig(t *testing.T) { } { t.Run("invalid-"+name, func(t *testing.T) { t.Setenv(name, "invalid") + _, err := loadConfig() require.Error(t, err) }) diff --git a/main.go b/main.go index 6a8f591..6a372ba 100644 --- a/main.go +++ b/main.go @@ -20,7 +20,8 @@ func run() error { return fmt.Errorf("new reaper: %w", err) } - if err = r.run(ctx); err != nil { + err = r.run(ctx) + if err != nil { return fmt.Errorf("run: %w", err) } @@ -28,7 +29,8 @@ func run() error { } func main() { - if err := run(); err != nil { + err := run() + if err != nil { slog.Error("run", fieldError, err) os.Exit(1) } diff --git a/reaper.go b/reaper.go index c22ae77..5a5524f 100644 --- a/reaper.go +++ b/reaper.go @@ -102,7 +102,8 @@ func newReaper(ctx context.Context, options ...reaperOption) (*reaper, error) { } for _, option := range options { - if err := option(r); err != nil { + err := option(r) + if err != nil { return nil, fmt.Errorf("option: %w", err) } } @@ -110,7 +111,8 @@ func newReaper(ctx context.Context, options ...reaperOption) (*reaper, error) { var err error if r.client == nil { // Default client configured from the environment. - if r.client, err = client.NewClientWithOpts(client.FromEnv); err != nil { + r.client, err = client.NewClientWithOpts(client.FromEnv) + if err != nil { return nil, fmt.Errorf("new client: %w", err) } } @@ -119,7 +121,8 @@ func newReaper(ctx context.Context, options ...reaperOption) (*reaper, error) { if r.cfg == nil { // Default configuration loaded from the environment. - if r.cfg, err = loadConfig(); err != nil { + r.cfg, err = loadConfig() + if err != nil { return nil, fmt.Errorf("load config: %w", err) } } @@ -131,12 +134,17 @@ func newReaper(ctx context.Context, options ...reaperOption) (*reaper, error) { pingCtx, cancel := context.WithTimeout(ctx, r.cfg.RequestTimeout) defer cancel() - if _, err = r.client.Ping(pingCtx); err != nil { + _, err = r.client.Ping(pingCtx) + if err != nil { return nil, fmt.Errorf("ping: %w", err) } r.logger.LogAttrs(ctx, slog.LevelInfo, "starting", r.cfg.LogAttrs()...) - if r.listener, err = net.Listen("tcp", fmt.Sprintf(":%d", r.cfg.Port)); err != nil { + + lc := &net.ListenConfig{} + + r.listener, err = lc.Listen(ctx, "tcp", fmt.Sprintf(":%d", r.cfg.Port)) + if err != nil { return nil, fmt.Errorf("listen: %w", err) } @@ -158,7 +166,8 @@ func (r *reaper) run(ctx context.Context) error { go r.processClients() // Wait for all tasks to complete. - if err := r.pruner(ctx); err != nil { + err := r.pruner(ctx) + if err != nil { if errors.Is(err, context.Canceled) { return nil } @@ -172,12 +181,14 @@ func (r *reaper) run(ctx context.Context) error { // pruner waits for a prune condition to be triggered then runs a prune. func (r *reaper) pruner(ctx context.Context) error { var errs []error + resources, err := r.pruneWait(ctx) if err != nil { errs = append(errs, fmt.Errorf("prune wait: %w", err)) } - if err = r.prune(resources); err != nil { //nolint:contextcheck // Prune needs its own context to ensure clean up completes. + err = r.prune(resources) //nolint:contextcheck // Prune needs its own context to ensure clean up completes. + if err != nil { errs = append(errs, fmt.Errorf("prune: %w", err)) } @@ -197,6 +208,7 @@ func (r *reaper) processClients() { } r.logger.Error("accept", fieldError, err) + continue } @@ -211,6 +223,7 @@ func (r *reaper) processClients() { // to retry and get a new reaper. r.logger.Warn("shutdown, aborting client", fieldAddress, addr) conn.Close() + return } @@ -222,8 +235,10 @@ func (r *reaper) processClients() { // the client and adding them to our filter. func (r *reaper) handle(conn net.Conn) { addr := conn.RemoteAddr().String() + defer func() { conn.Close() + r.disconnected <- addr }() @@ -239,21 +254,27 @@ func (r *reaper) handle(conn net.Conn) { logger.Warn("empty filter received") continue default: - if err := r.addFilter(msg); err != nil { + err := r.addFilter(msg) + if err != nil { logger.Error("add filter", fieldError, err) - if _, err = conn.Write(ackResponse); err != nil { + + _, err = conn.Write(ackResponse) + if err != nil { logger.Error("ack write", fieldError, err) } + continue } - if _, err := conn.Write(ackResponse); err != nil { + _, err = conn.Write(ackResponse) + if err != nil { logger.Error("ack write", fieldError, err) } } } - if err := scanner.Err(); err != nil { + err := scanner.Err() + if err != nil { logger.Error("scan", fieldError, err) } } @@ -286,18 +307,22 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { clients := 0 pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout) done := ctx.Done() + var shutdownDeadline time.Time + for { select { case addr := <-r.connected: clients++ r.logger.Info("client connected", fieldAddress, addr, fieldClients, clients) + if clients == 1 { pruneCheck.Stop() } case addr := <-r.disconnected: clients-- r.logger.Info("client disconnected", fieldAddress, addr, fieldClients, clients) + if clients == 0 { // No clients connected, trigger prune check overriding // any timeout set by shutdown signal. @@ -310,6 +335,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { // to nil so we don't enter this case again. r.shutdownListener() shutdownDeadline = time.Now().Add(r.cfg.ShutdownTimeout) + timeout := r.cfg.ShutdownTimeout if clients == 0 { // No clients connected, shutdown immediately. @@ -317,12 +343,14 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { } pruneCheck.Reset(timeout) + done = nil case now := <-pruneCheck.C: level := slog.LevelInfo if clients > 0 { level = slog.LevelWarn } + r.logger.Log(context.Background(), level, "prune check", fieldClients, clients) //nolint:contextcheck // Ensure log is written. resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes. @@ -331,6 +359,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { if shutdownDeadline.IsZero() || now.Before(shutdownDeadline) { r.logger.Warn("change detected, waiting again", fieldError, err) pruneCheck.Reset(r.cfg.ChangesRetryInterval) + continue } @@ -349,15 +378,19 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { // resources returns the resources that match the collected filters // for which there are no changes detected. func (r *reaper) resources(since time.Time) (*resources, error) { - var ret resources - var errs []error + var ( + ret resources + errs []error + ) // We combine errors so we can do best effort removal. + for _, args := range r.filterArgs() { containers, err := r.affectedContainers(since, args) if err != nil { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected containers", fieldError, err) } + errs = append(errs, fmt.Errorf("affected containers: %w", err)) } @@ -368,6 +401,7 @@ func (r *reaper) resources(since time.Time) (*resources, error) { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected networks", fieldError, err) } + errs = append(errs, fmt.Errorf("affected networks: %w", err)) } @@ -378,6 +412,7 @@ func (r *reaper) resources(since time.Time) (*resources, error) { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected volumes", fieldError, err) } + errs = append(errs, fmt.Errorf("affected volumes: %w", err)) } @@ -388,6 +423,7 @@ func (r *reaper) resources(since time.Time) (*resources, error) { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected images", fieldError, err) } + errs = append(errs, fmt.Errorf("affected images: %w", err)) } @@ -407,12 +443,14 @@ func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]strin // List all containers including stopped ones. options := container.ListOptions{All: true, Filters: args} r.logger.Debug("listing containers", "filter", options) + containers, err := r.client.ContainerList(ctx, options) if err != nil { return nil, fmt.Errorf("container list: %w", err) } var errChanges []error + containerIDs := make([]string, 0, len(containers)) for _, container := range containers { if container.Labels[ryukLabel] == "true" { @@ -458,12 +496,14 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, options := network.ListOptions{Filters: args} r.logger.Debug("listing networks", "options", options) + report, err := r.client.NetworkList(ctx, options) if err != nil { return nil, fmt.Errorf("network list: %w", err) } var errChanges []error + networks := make([]string, 0, len(report)) for _, network := range report { changed := network.Created.After(since) @@ -496,12 +536,14 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, options := volume.ListOptions{Filters: args} r.logger.Debug("listing volumes", "filter", options) + report, err := r.client.VolumeList(ctx, options) if err != nil { return nil, fmt.Errorf("volume list: %w", err) } var errChanges []error + volumes := make([]string, 0, len(report.Volumes)) for _, volume := range report.Volumes { created, perr := time.Parse(time.RFC3339, volume.CreatedAt) @@ -541,12 +583,14 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e options := image.ListOptions{Filters: args} r.logger.Debug("listing images", "filter", options) + report, err := r.client.ImageList(ctx, options) if err != nil { return nil, fmt.Errorf("image list: %w", err) } var errChanges []error + images := make([]string, 0, len(report)) for _, image := range report { created := time.Unix(image.Created, 0) @@ -580,8 +624,10 @@ func (r *reaper) addFilter(msg string) error { } args := filters.NewArgs() + for filterType, values := range query { r.logger.Info("adding filter", "type", filterType, "values", values) + for _, value := range values { args.Add(filterType, value) } @@ -625,8 +671,10 @@ func (r *reaper) filterArgs() []filters.Args { // prune removes the specified resources. func (r *reaper) prune(resources *resources) error { - var containers, networks, volumes, images int - var errs []error + var ( + containers, networks, volumes, images int + errs []error + ) // Containers must be removed first. errs = append(errs, r.remove("container", resources.containers, &containers, func(ctx context.Context, id string) error { @@ -671,6 +719,7 @@ func (r *reaper) remove(resourceType string, resources []string, count *int, fn for attempt := 1; attempt <= r.cfg.RemoveRetries; attempt++ { var retry bool + for id := range todo { itemLogger := logger.With("id", id, "attempt", attempt) @@ -678,7 +727,9 @@ func (r *reaper) remove(resourceType string, resources []string, count *int, fn defer cancel() itemLogger.Debug("remove") - if err := fn(ctx, id); err != nil { + + err := fn(ctx, id) + if err != nil { if errdefs.IsNotFound(err) { // Already removed. itemLogger.Debug("not found") @@ -686,11 +737,14 @@ func (r *reaper) remove(resourceType string, resources []string, count *int, fn } itemLogger.Error("remove", fieldError, err) + retry = true + continue } delete(todo, id) + *count++ } @@ -698,6 +752,7 @@ func (r *reaper) remove(resourceType string, resources []string, count *int, fn if attempt < r.cfg.RemoveRetries { time.Sleep(time.Second) } + continue } diff --git a/reaper_test.go b/reaper_test.go index 8f2d9ad..d8e2621 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -86,6 +86,7 @@ var ( func Test_newReaper(t *testing.T) { ctx := context.Background() + t.Run("basic", func(t *testing.T) { r, err := newReaper(ctx, discardLogger, testConfig) require.NoError(t, err) @@ -120,6 +121,7 @@ func testConnect(ctx context.Context, t *testing.T, endpoint string, labels map[ t.Helper() var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", endpoint) require.NoError(t, err) @@ -138,6 +140,7 @@ func testConnect(ctx context.Context, t *testing.T, endpoint string, labels map[ go func() { defer conn.Close() + <-ctx.Done() }() } @@ -171,6 +174,7 @@ type runTest struct { // newRunTest returns a new runTest with created at times set in the past. func newRunTest() *runTest { now := time.Now().Add(-time.Minute) + return &runTest{ createdAt1: now, containerCreated2: now, @@ -199,6 +203,7 @@ func newMockClient(tc *runTest) *mockClient { // Mock the container list and remove calls. filters1 := filterArgs(testLabels1) filters2 := filterArgs(testLabels2) + cli.On("ContainerList", mockContext, container.ListOptions{All: true, Filters: filters1}).Return([]container.Summary{ { ID: containerID1, @@ -290,6 +295,7 @@ func testReaperRun(t *testing.T, tc *runTest) (string, error) { t.Cleanup(cancel) var buf safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -298,6 +304,7 @@ func testReaperRun(t *testing.T, tc *runTest) (string, error) { require.NoError(t, err) errCh := make(chan error, 1) + go func() { errCh <- r.run(ctx) }() @@ -514,6 +521,7 @@ func TestAbortedClient(t *testing.T) { t.Cleanup(cancel) var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -531,6 +539,7 @@ func TestAbortedClient(t *testing.T) { addr := r.listener.Addr().String() var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", addr) require.NoError(t, err) @@ -542,6 +551,7 @@ func TestAbortedClient(t *testing.T) { buf := make([]byte, 4) n, err := conn.Read(buf) require.Error(t, err) + switch { case errors.Is(err, io.EOF), errors.Is(err, syscall.ECONNRESET): @@ -549,6 +559,7 @@ func TestAbortedClient(t *testing.T) { default: t.Fatal("unexpected read error:", err) } + require.Zero(t, n) require.Contains(t, log.String(), "shutdown, aborting client") } @@ -559,6 +570,7 @@ func TestShutdownSignal(t *testing.T) { t.Cleanup(cancel) var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -570,6 +582,7 @@ func TestShutdownSignal(t *testing.T) { errCh := make(chan error, 1) runCtx, runCancel := context.WithCancel(ctx) t.Cleanup(runCancel) + go func() { errCh <- r.run(runCtx) }() @@ -595,6 +608,7 @@ func TestShutdownSignal(t *testing.T) { t.Cleanup(cancel) var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -606,6 +620,7 @@ func TestShutdownSignal(t *testing.T) { errCh := make(chan error, 1) runCtx, runCancel := context.WithCancel(ctx) t.Cleanup(runCancel) + go func() { errCh <- r.run(runCtx) }() @@ -633,6 +648,7 @@ func TestShutdownSignal(t *testing.T) { t.Cleanup(cancel) var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -644,9 +660,11 @@ func TestShutdownSignal(t *testing.T) { errCh := make(chan error, 1) runCtx, runCancel := context.WithCancel(ctx) t.Cleanup(runCancel) + go func() { errCh <- r.run(runCtx) }() + runCancel() select { @@ -667,6 +685,7 @@ func TestShutdownSignal(t *testing.T) { t.Cleanup(cancel) var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -680,6 +699,7 @@ func TestShutdownSignal(t *testing.T) { errCh := make(chan error, 1) runCtx, runCancel := context.WithCancel(ctx) t.Cleanup(runCancel) + go func() { errCh <- r.run(runCtx) }() @@ -711,6 +731,7 @@ func TestReapContainer(t *testing.T) { // Run two containers with different labels. cli := testClient(t) + ids := make([]string, 2) for i, labels := range []map[string]string{testLabels1, testLabels2} { config := &container.Config{ @@ -718,20 +739,25 @@ func TestReapContainer(t *testing.T) { Cmd: []string{"sleep", "10"}, Labels: labels, } + resp, err := cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) if errdefs.IsNotFound(err) { // Image not found, pull it. var rc io.ReadCloser + rc, err = cli.ImagePull(ctx, testImage, image.PullOptions{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, rc.Close()) }) + _, err = io.Copy(io.Discard, rc) require.NoError(t, err) resp, err = cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) } + require.NoError(t, err) + ids[i] = resp.ID t.Cleanup(func() { @@ -760,12 +786,14 @@ func TestReapNetwork(t *testing.T) { // Create two networks with different labels. cli := testClient(t) + ids := make([]string, 2) for i, labels := range []map[string]string{testLabels1, testLabels2} { resp, err := cli.NetworkCreate(ctx, testID(), network.CreateOptions{ Labels: labels, }) require.NoError(t, err) + ids[i] = resp.ID t.Cleanup(func() { @@ -789,12 +817,14 @@ func TestReapVolume(t *testing.T) { // Create two volumes with different labels. cli := testClient(t) + ids := make([]string, 2) for i, labels := range []map[string]string{testLabels1, testLabels2} { resp, err := cli.VolumeCreate(ctx, volume.CreateOptions{ Labels: labels, }) require.NoError(t, err) + ids[i] = resp.Name t.Cleanup(func() { @@ -818,6 +848,7 @@ func TestReapImage(t *testing.T) { // Create two images with different labels. cli := testClient(t) + ids := make([]string, 2) for i, labels := range []map[string]string{testLabels1, testLabels2} { context, err := archive.Tar("testdata", archive.Uncompressed) @@ -839,13 +870,17 @@ func TestReapImage(t *testing.T) { // Process the build output, discarding it so we catch any // errors and get the image ID. var imageID string + auxCallback := func(msg jsonmessage.JSONMessage) { if msg.ID != imageBuildResult { return } + var result build.Result + err = json.Unmarshal(*msg.Aux, &result) require.NoError(t, err) + imageID = result.ID } err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, auxCallback) @@ -901,6 +936,7 @@ func testReaper(ctx context.Context, t *testing.T, expect ...string) { // Start the reaper. var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ Level: slog.LevelDebug, }))) @@ -908,6 +944,7 @@ func testReaper(ctx context.Context, t *testing.T, expect ...string) { require.NoError(t, err) reaperErr := make(chan error, 1) + go func() { reaperErr <- r.run(ctx) }()