Skip to content

Commit fa556ba

Browse files
committed
Simplify refactor
1 parent 7b9a544 commit fa556ba

8 files changed

Lines changed: 706 additions & 1128 deletions

File tree

pkg/fleet/daemon.go

Lines changed: 29 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
1515
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
16+
"google.golang.org/protobuf/proto"
1617
ctrl "sigs.k8s.io/controller-runtime"
1718
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
1819
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -53,14 +54,8 @@ type Daemon struct {
5354
mu sync.RWMutex
5455
configs map[string]installerConfig // keyed by config ID; replaced on each RC update
5556
taskMu sync.Mutex // serializes task dispatch and package task-state updates
56-
// pendingOps feeds the live in-memory worker. This is intentionally not
57-
// durable: daemon-owned DDA pending annotations are the crash-recovery
58-
// source of truth, while this channel just carries newly accepted ops to the
59-
// current process.
60-
pendingOps chan pendingOperation
61-
// statusUpdates carries DDA informer events into the live worker. The worker
62-
// evaluates reconciler-owned status.experiment and may rebuild pending ops
63-
// from daemon-owned DDA annotations after restart.
57+
// statusUpdates carries DDA informer events to the worker. The worker reads
58+
// status.experiment and pending annotations to update RC task state.
6459
statusUpdates chan ddaStatusSnapshot
6560
}
6661

@@ -71,10 +66,9 @@ func NewDaemon(rcClient remoteconfig.RCClient, mgr manager.Manager, revisionsEna
7166
return &Daemon{
7267
rcClient: rcClient,
7368
client: mgr.GetClient(),
74-
cache: mgr.GetCache(),
69+
cache: mgr.GetCache(), // Informer cache
7570
revisionsEnabled: revisionsEnabled,
7671
configs: make(map[string]installerConfig),
77-
pendingOps: make(chan pendingOperation, 32),
7872
statusUpdates: make(chan ddaStatusSnapshot, 128),
7973
}
8074
}
@@ -86,20 +80,19 @@ func (d *Daemon) Start(ctx context.Context) error {
8680
ctx = ctrl.LoggerInto(ctx, logger)
8781
logger.Info("Starting Fleet daemon")
8882

89-
// Set up the DDA informer used by the background completion worker.
90-
// When cache is nil (unit tests), informer-driven completion is skipped.
91-
if d.cache != nil {
92-
if err := d.installDDAStatusForwarder(ctx); err != nil {
93-
return err
94-
}
95-
logger.Info("Pending operation worker initialized with DDA informer")
83+
if d.cache == nil {
84+
return fmt.Errorf("fleet daemon requires a controller cache")
85+
}
86+
if err := d.installDDAStatusForwarder(ctx); err != nil {
87+
return err
9688
}
89+
logger.Info("DDA status worker initialized")
9790

9891
d.rcClient.Subscribe(state.ProductInstallerConfig, handleInstallerConfigUpdate(ctx, func(configs map[string]installerConfig) error {
9992
return d.handleConfigs(ctx, configs)
10093
}))
101-
d.rcClient.Subscribe(state.ProductUpdaterTask, handleUpdaterTaskUpdate(ctx, func(req remoteAPIRequest, complete func(error)) error {
102-
return d.handleTask(ctx, req, complete)
94+
d.rcClient.Subscribe(state.ProductUpdaterTask, handleUpdaterTaskUpdate(ctx, func(req remoteAPIRequest) error {
95+
return d.handleTask(ctx, req)
10396
}))
10497

10598
<-ctx.Done()
@@ -114,10 +107,11 @@ func (d *Daemon) NeedLeaderElection() bool {
114107
}
115108

116109
// handleTask serializes task dispatch bookkeeping and package task-state updates.
117-
func (d *Daemon) handleTask(ctx context.Context, req remoteAPIRequest, complete func(error)) error {
110+
func (d *Daemon) handleTask(ctx context.Context, req remoteAPIRequest) error {
118111
d.taskMu.Lock()
119-
op, err := d.handleRemoteAPIRequest(ctx, req)
112+
pending, err := d.handleRemoteAPIRequest(ctx, req)
120113
if err != nil {
114+
// Expected and current stable/experiment configs don't match.
121115
var stateErr *stateDoesntMatchError
122116
if errors.As(err, &stateErr) {
123117
d.setTaskState(req.Package, req.ID, pbgo.TaskState_INVALID_STATE, err)
@@ -127,24 +121,18 @@ func (d *Daemon) handleTask(ctx context.Context, req remoteAPIRequest, complete
127121
d.taskMu.Unlock()
128122
return err
129123
}
130-
if op == nil {
131-
// Synchronous no-op/idempotent path: nothing needs background tracking.
124+
// The request is not relevant (stop a terminated experiment) or the desired
125+
// state is already true.
126+
if pending == nil {
127+
// Nothing is left for the worker to wait on.
132128
d.setTaskState(req.Package, req.ID, pbgo.TaskState_DONE, nil)
133129
d.taskMu.Unlock()
134-
if complete != nil {
135-
complete(nil)
136-
}
137130
return nil
138131
}
139-
// Async path: mark the task RUNNING only after the DDA patch/no-op decision
140-
// has succeeded, so a crash cannot leave RC in RUNNING before the durable
141-
// DDA pending record exists.
142-
d.setTaskState(req.Package, req.ID, pbgo.TaskState_RUNNING, nil)
132+
// The DDA annotations are already written. From the task handler's point
133+
// of view dispatch is done; the worker watches DDA status and updates
134+
// Task.State.
143135
d.taskMu.Unlock()
144-
// Async path: the operation was durably recorded on the DDA, so the live
145-
// worker only needs the transient callback before it starts tracking it.
146-
op.complete = complete
147-
d.pendingOps <- *op
148136
return nil
149137
}
150138

@@ -276,11 +264,7 @@ func (d *Daemon) getPackageConfigVersions(pkgName string) (stable, experiment st
276264
return "", ""
277265
}
278266

279-
// setPackageConfigVersions updates the config version fields of the
280-
// package entry in the RC installer state. Only the config variants
281-
// (StableConfigVersion/ExperimentConfigVersion) are set; the package variants
282-
// (StableVersion/ExperimentVersion) are preserved since this is a config
283-
// experiment, not a package upgrade.
267+
// setPackageConfigVersions updates only the config versions for one package.
284268
func (d *Daemon) setPackageConfigVersions(pkgName, stable, experiment string) {
285269
if d.rcClient == nil {
286270
return
@@ -290,18 +274,14 @@ func (d *Daemon) setPackageConfigVersions(pkgName, stable, experiment string) {
290274
found := false
291275
for _, pkg := range current {
292276
if pkg.GetPackage() == pkgName {
293-
updated = append(updated, &pbgo.PackageState{
294-
Package: pkg.GetPackage(),
295-
StableVersion: pkg.GetStableVersion(),
296-
ExperimentVersion: pkg.GetExperimentVersion(),
297-
Task: pkg.GetTask(),
298-
StableConfigVersion: stable,
299-
ExperimentConfigVersion: experiment,
300-
})
277+
next := proto.Clone(pkg).(*pbgo.PackageState)
278+
next.StableConfigVersion = stable
279+
next.ExperimentConfigVersion = experiment
280+
updated = append(updated, next)
301281
found = true
302-
} else {
303-
updated = append(updated, pkg)
282+
continue
304283
}
284+
updated = append(updated, pkg)
305285
}
306286
if !found {
307287
updated = append(updated, &pbgo.PackageState{

0 commit comments

Comments
 (0)