feat: add filesystem scrubbing controller#9848
Conversation
| ctrl.schedule[mountpoint].timer.Reset(ctrl.schedule[mountpoint].period) | ||
| } | ||
|
|
||
| ctrl.schedule[mountpoint] = scrubSchedule{ |
There was a problem hiding this comment.
I guess we need to somehow protect from user creating multiple documents referencing the same mountpoint.
| runner.WithSchedulingPolicy(runner.SchedulingPolicyIdle), | ||
| ) | ||
|
|
||
| return r.Run(func(s events.ServiceState, msg string, args ...any) {}) |
There was a problem hiding this comment.
I wonder if we should have a way to cancel running scrub process?
There was a problem hiding this comment.
We can add a channel to signal this once document is deleted. However I haven't studied whether or not it's okay to abort the process and does it terminate safely
smira
left a comment
There was a problem hiding this comment.
as xfs_scrub is experimental, I'd say let's do it the first thing we merge in 1.10
Didn't we consider it in 1.9 planning? Or it wasn't expected to use experimental kernel feature? |
| case <-ctx.Done(): | ||
| return nil | ||
| case mountpoint := <-ctrl.c: | ||
| if err := ctrl.runScrub(mountpoint, []string{}); err != nil { |
There was a problem hiding this comment.
Idea: run scrub in a goroutine (still single-threaded to not run two scrub tasks in parallel) and report when it's started so we can see it's running right now from the status. Current status example (and there's no way to tell whether one for /var is running or not yet, as status is updated on completion only):

|
Other things to consider:
|
| "cap_sys_module": {}, | ||
| } | ||
|
|
||
| // XFSScrubDroppedCapabilities is the set of capabilities to drop for xfs_scrub. |
There was a problem hiding this comment.
I wonder if we could refactor this to first list all capabilities (via libcap), and remove those we want to drop, so that if we new capabilities are introduced, they are dropped as well. (probably taking it out of machinery back to the controller)
There was a problem hiding this comment.
Yes, I also want to do such a thing on the level of our interface to libcap. Actually systemd does manage such a thing, since in xfs scrubbing service there is a list of capabilities to give, not take
|
This PR is stale because it has been open 45 days with no activity. |
b9f9636 to
0626f54
Compare
|
This PR is stale because it has been open 45 days with no activity. |
3dd9898 to
f0b0338
Compare
db20dc9 to
017ed44
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new filesystem scrubbing feature to Talos by introducing config/schema/docs, new block/runtime resources, and controllers that schedule and execute xfs_scrub periodically while exposing status via resources and API definitions.
Changes:
- Introduces
FilesystemScrubConfigmachine config document (schema + docs) and corresponding block resources (FSScrubConfig,FSScrubSchedule,FSScrubStatus). - Adds a generic runtime task model (
Task,TaskStatus,TaskState) and a runtimeTasksControllerto execute scheduled background tasks. - Adds block controllers to translate machine config into scrub schedules, create scrub tasks, and report scrub status; updates API reference/docs and protobuf definitions.
Reviewed changes
Copilot reviewed 38 out of 38 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| website/content/v1.14/schemas/config.schema.json | Adds FilesystemScrubConfig schema and includes it in supported config document set. |
| website/content/v1.14/reference/configuration/block/filesystemscrubconfig.md | Adds generated docs page for FilesystemScrubConfig. |
| website/content/v1.14/reference/api.md | Updates API reference to include new task and FS scrub protos/enums. |
| pkg/machinery/resources/runtime/taskstate_enumer.go | Adds generated TaskState string/text marshaling helpers. |
| pkg/machinery/resources/runtime/task_status.go | Introduces TaskStatus resource spec and TaskState enum. |
| pkg/machinery/resources/runtime/task.go | Introduces Task resource spec used to represent runnable background tasks. |
| pkg/machinery/resources/runtime/runtime.go | Updates deepcopy generation list to include new task specs. |
| pkg/machinery/resources/runtime/deep_copy.generated.go | Adds deepcopy implementations for new runtime task specs. |
| pkg/machinery/resources/block/fs_scrub_status.go | Adds FSScrubStatus block resource for scrub outcome reporting. |
| pkg/machinery/resources/block/fs_scrub_schedule.go | Adds FSScrubSchedule block resource for scrub scheduling. |
| pkg/machinery/resources/block/fs_scrub_config.go | Adds FSScrubConfig block resource mirroring machine config. |
| pkg/machinery/resources/block/deep_copy.generated.go | Adds deepcopy implementations for new block scrub specs. |
| pkg/machinery/resources/block/block.go | Updates deepcopy generation list to include scrub block specs. |
| pkg/machinery/constants/constants.go | Adds capability drop set for xfs_scrub executions. |
| pkg/machinery/config/types/runtime/watchdog_timer.go | Adds FilesystemScrub() stub to satisfy updated RuntimeConfig interface. |
| pkg/machinery/config/types/runtime/kmsg_log.go | Adds FilesystemScrub() stub to satisfy updated RuntimeConfig interface. |
| pkg/machinery/config/types/runtime/event_sink.go | Adds FilesystemScrub() stub to satisfy updated RuntimeConfig interface. |
| pkg/machinery/config/types/block/fs_scrub.go | Adds FilesystemScrubConfig config document type + validation/defaulting. |
| pkg/machinery/config/types/block/deep_copy.generated.go | Adds deepcopy for FilesystemScrubV1Alpha1. |
| pkg/machinery/config/types/block/block_doc.go | Adds docgen output for FilesystemScrubConfig. |
| pkg/machinery/config/types/block/block.go | Updates docgen + deepcopy generation inputs to include fs_scrub.go. |
| pkg/machinery/config/schemas/config.schema.json | Adds FilesystemScrubConfig schema and includes it in supported config document set. |
| pkg/machinery/config/config/runtime.go | Extends RuntimeConfig interface with FilesystemScrub() aggregation. |
| pkg/machinery/api/resource/definitions/runtime/runtime_vtproto.pb.go | Regenerates vtproto for new runtime task proto messages. |
| pkg/machinery/api/resource/definitions/runtime/runtime.pb.go | Regenerates runtime protobuf bindings to include task messages. |
| pkg/machinery/api/resource/definitions/enums/enums.pb.go | Regenerates enums protobuf bindings to include RuntimeTaskState. |
| pkg/machinery/api/resource/definitions/block/block_vtproto.pb.go | Regenerates vtproto for new block scrub proto messages. |
| pkg/machinery/api/resource/definitions/block/block.pb.go | Regenerates block protobuf bindings to include scrub messages. |
| internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go | Registers new runtime/block resources in v1alpha2 state. |
| internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go | Wires new scrub and task controllers into v1alpha2 runtime. |
| internal/app/machined/pkg/controllers/runtime/tasks.go | Adds TasksController to execute background tasks and publish TaskStatus. |
| internal/app/machined/pkg/controllers/block/fs_scrub_schedule.go | Adds schedule controller mapping config+mounted XFS volumes to scrub schedules. |
| internal/app/machined/pkg/controllers/block/fs_scrub_config_test.go | Adds unit tests for FSScrubConfigController machine-config translation. |
| internal/app/machined/pkg/controllers/block/fs_scrub_config.go | Adds controller translating machine config FilesystemScrubConfig to FSScrubConfig resources. |
| internal/app/machined/pkg/controllers/block/fs_scrub.go | Adds controller that turns schedules into tasks and reports scrub status. |
| api/resource/definitions/runtime/runtime.proto | Adds TaskSpec / TaskStatusSpec to runtime proto definitions. |
| api/resource/definitions/enums/enums.proto | Adds RuntimeTaskState enum to API enums. |
| api/resource/definitions/block/block.proto | Adds FS scrub config/schedule/status specs to block proto definitions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 39 out of 39 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| { | ||
| Name: "ExitCode", | ||
| JSONPath: `{.exitCode}`, | ||
| }, |
| case <-r.EventCh(): | ||
| } | ||
|
|
||
| logger.Warn("task controller loop") |
| for s := range taskStatuses.All() { | ||
| // FIXME: filter tasks to only work on ones we created | ||
| mountpoint := s.TypedSpec().ID | ||
|
|
||
| if s.TypedSpec().TaskState == runtimeres.TaskStateCompleted { | ||
| if _, ok := ctrl.tasks[mountpoint]; ok { | ||
| ctrl.tasks[mountpoint] = scrubTask{ | ||
| Args: ctrl.tasks[mountpoint].Args, | ||
| Destroying: true, | ||
| } | ||
| } | ||
|
|
||
| mountStatus, err := ctrl.getMountStatus(ctx, r, mountpoint) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if mountStatus == nil { | ||
| return fmt.Errorf("not mounted") | ||
| } |
| // Run implements controller.Controller interface. | ||
| // | ||
| //nolint:gocyclo,cyclop | ||
| func (ctrl *TasksController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { | ||
| if ctrl.Tasks == nil { | ||
| ctrl.Tasks = make(map[string]task) | ||
| } | ||
|
|
||
| if ctrl.CompleteCh == nil { | ||
| ctrl.CompleteCh = make(chan taskCompletion) | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil | ||
|
|
||
| case c := <-ctrl.CompleteCh: | ||
| t := ctrl.Tasks[c.ID] | ||
| if t.state != runtimeres.TaskStateRunning { | ||
| continue | ||
| } | ||
|
|
||
| logger.Info("Task done", zap.Any("task", t)) | ||
|
|
||
| if err := ctrl.removeFinalizer(ctx, r, c.ID); err != nil { | ||
| return fmt.Errorf("failed to remove finalizer for task %q: %w", c.ID, err) | ||
| } | ||
|
|
||
| // The task has been removed, remove from the map | ||
| if t.stop == nil { | ||
| delete(ctrl.Tasks, c.ID) | ||
| } | ||
|
|
||
| ctrl.Tasks[c.ID] = task{ | ||
| args: t.args, | ||
| selinuxLabel: t.selinuxLabel, | ||
| state: runtimeres.TaskStateCompleted, | ||
| startTime: t.startTime, | ||
| exitTime: c.exitTime, | ||
| err: c.err, | ||
| stop: make(chan any), | ||
| } | ||
| case <-r.EventCh(): | ||
| } | ||
|
|
||
| logger.Warn("task controller loop") | ||
|
|
||
| cfg, err := safe.ReaderListAll[*runtimeres.Task](ctx, r) | ||
| if err != nil && !state.IsNotFoundError(err) { | ||
| return fmt.Errorf("error getting tasks: %w", err) | ||
| } | ||
|
|
||
| for taskspec := range cfg.All() { | ||
| taskspec := taskspec.TypedSpec() | ||
| if _, ok := ctrl.Tasks[taskspec.ID]; !ok || ctrl.Tasks[taskspec.ID].state == runtimeres.TaskStateCreated { | ||
| logger.Warn("creating a task or updating created and not ran task", zap.String("id", taskspec.ID)) | ||
| ctrl.Tasks[taskspec.ID] = task{ | ||
| args: taskspec.Args, | ||
| selinuxLabel: taskspec.SelinuxLabel, | ||
| state: runtimeres.TaskStateCreated, | ||
| startTime: time.Now(), | ||
| exitTime: time.Now(), | ||
| stop: make(chan any), | ||
| } | ||
| } else { | ||
| logger.Warn("task updated while running", zap.String("task", taskspec.ID)) | ||
| } | ||
| } | ||
|
|
||
| for id := range ctrl.Tasks { | ||
| _, err := safe.ReaderGetByID[*runtimeres.Task](ctx, r, id) | ||
| if state.IsNotFoundError(err) { | ||
| logger.Warn("Task removed, stopping", zap.String("id", id)) | ||
| t := ctrl.Tasks[id] | ||
|
|
||
| if t.stop != nil { | ||
| close(t.stop) | ||
| } | ||
|
|
||
| if ctrl.Tasks[id].state != runtimeres.TaskStateCompleted { | ||
| ctrl.Tasks[id] = task{ | ||
| args: t.args, | ||
| selinuxLabel: t.selinuxLabel, | ||
| state: t.state, | ||
| startTime: t.startTime, | ||
| exitTime: t.exitTime, | ||
| err: t.err, | ||
| stop: nil, | ||
| } | ||
| } else { | ||
| delete(ctrl.Tasks, id) | ||
| } | ||
|
|
||
| // After the task has stopped, remove it from the map, and thus remove status | ||
| } | ||
| } | ||
|
|
||
| // if not currently running a task, find the first one to be ran | ||
| for id := range ctrl.Tasks { | ||
| task := ctrl.Tasks[id] | ||
| if task.state == runtimeres.TaskStateCreated { | ||
| // run the task | ||
| logger.Warn("running task", zap.String("id", id)) | ||
|
|
||
| task.state = runtimeres.TaskStateRunning | ||
| task.startTime = time.Now() | ||
| task.exitTime = task.startTime | ||
| ctrl.Tasks[id] = task | ||
|
|
||
| if err = ctrl.addFinalizer(ctx, r, id); err != nil { | ||
| return fmt.Errorf("error adding a finalizer: %w", err) | ||
| } | ||
|
|
||
| runner := process.NewRunner( | ||
| true, // debug | ||
| &runner.Args{ | ||
| ID: "task_runner", | ||
| ProcessArgs: task.args, | ||
| }, | ||
| runner.WithLoggingManager(ctrl.Runtime.Logging()), | ||
| runner.WithEnv(environment.Get(ctrl.Runtime.Config())), | ||
| runner.WithDroppedCapabilities(constants.XFSScrubDroppedCapabilities), | ||
| runner.WithPriority(19), | ||
| runner.WithIOPriority(runner.IoprioClassIdle, 7), | ||
| runner.WithSchedulingPolicy(runner.SchedulingPolicyIdle), | ||
| runner.WithSelinuxLabel(task.selinuxLabel), | ||
| ) | ||
|
|
||
| go (func() { | ||
| err := runner.Run(func(s events.ServiceState, msg string, args ...any) {}, func(serviceName string, pid int32, clearEntry bool) error { return nil }) | ||
|
|
||
| ctrl.CompleteCh <- taskCompletion{ | ||
| ID: id, | ||
| err: err, | ||
| exitTime: time.Now(), | ||
| } | ||
| })() | ||
|
|
||
| go (func() { | ||
| <-task.stop | ||
|
|
||
| if err := runner.Stop(); err != nil { | ||
| logger.Error("Failed to stop task", zap.Error(err)) | ||
| } | ||
|
|
||
| ctrl.CompleteCh <- taskCompletion{ | ||
| ID: id, | ||
| err: fmt.Errorf("Canceled"), | ||
| exitTime: time.Now(), | ||
| } | ||
| })() | ||
|
|
||
| break | ||
| } | ||
| } | ||
|
|
||
| r.ResetRestartBackoff() | ||
|
|
||
| r.StartTrackingOutputs() | ||
|
|
||
| for id, t := range ctrl.Tasks { | ||
| if err := safe.WriterModify(ctx, r, runtimeres.NewTaskStatus(id), func(status *runtimeres.TaskStatus) error { | ||
| status.TypedSpec().ID = id | ||
| status.TypedSpec().Duration = t.exitTime.Sub(t.startTime) | ||
|
|
||
| if t.state == runtimeres.TaskStateCompleted { | ||
| status.TypedSpec().Result = "Success" | ||
| if t.err != nil { | ||
| status.TypedSpec().Result = t.err.Error() | ||
| } | ||
| } | ||
|
|
||
| status.TypedSpec().Start = t.startTime | ||
| status.TypedSpec().TaskState = t.state | ||
|
|
||
| return nil | ||
| }); err != nil { | ||
| return fmt.Errorf("error updating task status: %w", err) | ||
| } | ||
| } | ||
|
|
||
| if err := safe.CleanupOutputs[*runtimeres.TaskStatus](ctx, r); err != nil { | ||
| return err | ||
| } | ||
| } |
| // The task has been removed, remove from the map | ||
| if t.stop == nil { | ||
| delete(ctrl.Tasks, c.ID) | ||
| } | ||
|
|
||
| ctrl.Tasks[c.ID] = task{ | ||
| args: t.args, | ||
| selinuxLabel: t.selinuxLabel, | ||
| state: runtimeres.TaskStateCompleted, | ||
| startTime: t.startTime, | ||
| exitTime: c.exitTime, | ||
| err: c.err, | ||
| stop: make(chan any), | ||
| } |
| go (func() { | ||
| <-task.stop | ||
|
|
||
| if err := runner.Stop(); err != nil { | ||
| logger.Error("Failed to stop task", zap.Error(err)) | ||
| } | ||
|
|
||
| ctrl.CompleteCh <- taskCompletion{ | ||
| ID: id, | ||
| err: fmt.Errorf("Canceled"), | ||
| exitTime: time.Now(), | ||
| } | ||
| })() |
|
|
||
| func (ctrl *TasksController) removeFinalizer(ctx context.Context, r controller.Runtime, id string) error { | ||
| t, err := safe.ReaderGetByID[*runtimeres.Task](ctx, r, id) | ||
| if err != nil { |
| runner := process.NewRunner( | ||
| true, // debug | ||
| &runner.Args{ | ||
| ID: "task_runner", | ||
| ProcessArgs: task.args, | ||
| }, | ||
| runner.WithLoggingManager(ctrl.Runtime.Logging()), | ||
| runner.WithEnv(environment.Get(ctrl.Runtime.Config())), | ||
| runner.WithDroppedCapabilities(constants.XFSScrubDroppedCapabilities), | ||
| runner.WithPriority(19), | ||
| runner.WithIOPriority(runner.IoprioClassIdle, 7), | ||
| runner.WithSchedulingPolicy(runner.SchedulingPolicyIdle), | ||
| runner.WithSelinuxLabel(task.selinuxLabel), |
| } | ||
|
|
||
| // Validate implements config.Validator interface. | ||
| func (s *FilesystemScrubV1Alpha1) Validate(validation.RuntimeMode, ...validation.Option) ([]string, error) { |
| // Run implements controller.Controller interface. | ||
| // | ||
| //nolint:gocyclo | ||
| func (ctrl *FSScrubController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { | ||
| ctrl.init() | ||
|
|
||
| defer func() { | ||
| for _, task := range ctrl.schedule { | ||
| if task.timer != nil { | ||
| task.timer.Stop() | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil | ||
| case mountpoint := <-ctrl.c: | ||
| if err := ctrl.createScrubTask(ctx, logger, mountpoint, []string{}, r); err != nil { | ||
| logger.Error("error running filesystem scrub", zap.Error(err)) | ||
| } | ||
| case <-r.EventCh(): | ||
| err := ctrl.processStatuses(ctx, r) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = ctrl.updateSchedule(ctx, r, logger) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| if err := ctrl.outputTasks(ctx, r); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := ctrl.reportStatus(ctx, r); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } |
|
TODO:
|
Add a new controller to run filesystem scrubbing operations scrub via a schedule use finalizers for FS scrubbing scrub using a task Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Prevent tasks from being removed while running Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
for now, keep machined's context init_t, as the xfs_scrub process has low attack surface and is not exposed to user input Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
TODO
Fixes #9545