From 8ad9fccc62f34a06d1a15b869225f53f0f1b1b07 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 22 May 2026 09:42:00 -0700 Subject: [PATCH 1/3] this is a terrible approach need to write tmp+move and does not work as is --- client/state/db_json.go | 552 +++++++++++++++++++++++++++++++++ helper/flock/flock.go | 19 ++ helper/flock/flock_linux.go | 32 ++ helper/flock/flock_nonlinux.go | 11 + 4 files changed, 614 insertions(+) create mode 100644 client/state/db_json.go create mode 100644 helper/flock/flock.go create mode 100644 helper/flock/flock_linux.go create mode 100644 helper/flock/flock_nonlinux.go diff --git a/client/state/db_json.go b/client/state/db_json.go new file mode 100644 index 00000000000..470ee293ffe --- /dev/null +++ b/client/state/db_json.go @@ -0,0 +1,552 @@ +// Copyright IBM Corp. 2015, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package state + +import ( + "bytes" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "io" + "maps" + "os" + "sync" + + "github.com/hashicorp/go-hclog" + arstate "github.com/hashicorp/nomad/client/allocrunner/state" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" + driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + "github.com/hashicorp/nomad/client/serviceregistration/checks" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/flock" + "github.com/hashicorp/nomad/nomad/structs" +) + +// JsonDB implements a StateDB that stores data in memory and should only be +// used for testing. All methods are safe for concurrent use. +type JsonDB struct { + // alloc_id -> value + Allocs map[string]*structs.Allocation + + // alloc_id -> value + DeployState map[string]*structs.AllocDeploymentStatus + + // alloc_id -> value + NetworkStatus map[string]*structs.AllocNetworkStatus + + // alloc_id -> value + AckState map[string]*arstate.State + + // alloc_id -> value + AllocVolStates map[string]*arstate.AllocVolumes + + // alloc_id -> task_name -> value + LocalTaskState map[string]map[string]*state.LocalState + TaskState map[string]map[string]*structs.TaskState + + // alloc_id -> check_id -> result + Checks checks.ClientResults + + // alloc_id -> []Identities + Identities map[string][]*structs.SignedWorkloadIdentity + + // alloc_id -> []consulAclTokens + ConsulACLTokens map[string][]*cstructs.ConsulACLToken + + // devicemanager -> plugin-state + DevManagerPs *dmstate.PluginState + + // drivermanager -> plugin-state + DriverManagerPs *driverstate.PluginState + + // dynamicmanager -> registry-state + DynManagerPs *dynamicplugins.RegistryState + + // key -> value or nil + NodeMeta map[string]*string + + NodeReg *cstructs.NodeRegistration + + DynHostVols map[string]*cstructs.HostVolumeState + + // ClientIdent is the persisted identity of the client. + ClientIdent string + + logger hclog.Logger + + file *os.File + + mu sync.RWMutex +} + +type JsonDBMeta struct { + Sha256sum []byte +} + +func NewJsonDB(logger hclog.Logger, stateDir string) (StateDB, error) { + // Open root + root, err := os.OpenRoot(stateDir) + if err != nil { + return nil, fmt.Errorf("error opening state dir: %w", err) + } + + // Open/create file + fi, err := root.OpenFile("state.json", os.O_RDWR|os.O_CREATE, 0640) + if err != nil { + return nil, err + } + + // Lock file + if err := flock.FLock(fi); err != nil { + return nil, err + } + + // Initialize struct to empty values + db := &JsonDB{ + Allocs: make(map[string]*structs.Allocation), + DeployState: make(map[string]*structs.AllocDeploymentStatus), + NetworkStatus: make(map[string]*structs.AllocNetworkStatus), + AckState: make(map[string]*arstate.State), + LocalTaskState: make(map[string]map[string]*state.LocalState), + TaskState: make(map[string]map[string]*structs.TaskState), + Checks: make(checks.ClientResults), + Identities: make(map[string][]*structs.SignedWorkloadIdentity), + ConsulACLTokens: make(map[string][]*cstructs.ConsulACLToken), + DynHostVols: make(map[string]*cstructs.HostVolumeState), + } + db.logger = logger.Named(db.Name()) + + // Decode client state + dec := json.NewDecoder(fi) + if err := dec.Decode(db); err != nil { + if errors.Is(err, io.EOF) { + // New file, nothing more to do + return db, nil + } + + // Unexpected error, funlock and return + _ = flock.FUnlock(fi) + _ = fi.Close() + return nil, fmt.Errorf("unable to decode client state: %w", err) + } + + // Record the end of the first object + off := dec.InputOffset() + + // Decode checksum record + meta := &JsonDBMeta{} + if err := dec.Decode(meta); err != nil { + return nil, fmt.Errorf("unable to decode client state metadata: %w", err) + } + if n := len(meta.Sha256sum); n != sha256.Size { + if n == 0 { + return nil, fmt.Errorf("no checksum for client state") + } else { + return nil, fmt.Errorf("client state checksum is the wrong size. expected: %d, found: %d", sha256.Size, n) + } + } + hasher := sha256.New() + + // Rewind and hash first object + if _, err := fi.Seek(0, 0); err != nil { + return nil, fmt.Errorf("failed to rewind client state file: %s", err) + } + + lr := &io.LimitedReader{ + R: fi, + N: off, + } + if n, err := io.Copy(hasher, lr); err != nil { + return nil, fmt.Errorf("error checksumming client state file: %w", err) + } else if n != off { + return nil, fmt.Errorf("unexpected amount of client state checksummed. expected: %d, found: %d", off, n) + } + + // No need for constant time comparison as hash is only used to validate + // file. Anyone able to udpate the file should also be able to update the + // checksum. + if !bytes.Equal(hasher.Sum(nil), meta.Sha256sum) { + return nil, fmt.Errorf("client state file failed checksum. expected: %x, found: %x", meta.Sha256sum, hasher.Sum(nil)) + } + + db.file = fi + + return db, nil +} + +func (db *JsonDB) save() error { + // Encode + buf, err := json.MarshalIndent(db, "", " ") + if err != nil { + return fmt.Errorf("error encoding client state as json: %w", err) + } + + // Checksum + sum := sha256.Sum256(buf) + meta := &JsonDBMeta{ + Sha256sum: sum[:], + } + + if _, err := db.file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to rewind client state file: %w", err) + } + + // Newline delimit records + buf = append(buf, '\n') + + if _, err := db.file.Write(buf); err != nil { + return fmt.Errorf("failed to write client state: %w", err) + } + if err := json.NewEncoder(db.file).Encode(meta); err != nil { + return fmt.Errorf("failed to write client state metadata: %w", err) + } + + if err := db.file.Sync(); err != nil { + return fmt.Errorf("failed to sync client state: %w", err) + } + + return nil +} + +func (db *JsonDB) Name() string { + return "jsondb" +} + +func (db *JsonDB) Upgrade() error { + return nil +} + +func (db *JsonDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { + db.mu.RLock() + defer db.mu.RUnlock() + + allocs := make([]*structs.Allocation, 0, len(db.Allocs)) + for _, v := range db.Allocs { + allocs = append(allocs, v) + } + + return allocs, map[string]error{}, nil +} + +func (db *JsonDB) PutAllocation(alloc *structs.Allocation, _ ...WriteOption) error { + db.mu.Lock() + defer db.mu.Unlock() + db.Allocs[alloc.ID] = alloc + return db.save() +} + +func (db *JsonDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.DeployState[allocID], nil +} + +func (db *JsonDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error { + db.mu.Lock() + db.DeployState[allocID] = ds + defer db.mu.Unlock() + return db.save() +} + +func (db *JsonDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.NetworkStatus[allocID], nil +} + +func (db *JsonDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, _ ...WriteOption) error { + db.mu.Lock() + db.NetworkStatus[allocID] = ns + defer db.mu.Unlock() + return db.save() +} + +func (db *JsonDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error { + db.mu.Lock() + db.AckState[allocID] = state + defer db.mu.Unlock() + return db.save() +} + +func (db *JsonDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.AckState[allocID], nil +} + +func (db *JsonDB) PutAllocVolumes(allocID string, state *arstate.AllocVolumes, opts ...WriteOption) error { + db.mu.Lock() + db.AllocVolStates[allocID] = state + defer db.mu.Unlock() + return db.save() +} + +func (db *JsonDB) GetAllocVolumes(allocID string) (*arstate.AllocVolumes, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.AllocVolStates[allocID], nil +} + +func (db *JsonDB) PutAllocIdentities(allocID string, identities []*structs.SignedWorkloadIdentity, _ ...WriteOption) error { + db.mu.Lock() + defer db.mu.Unlock() + db.Identities[allocID] = identities + return db.save() +} + +func (db *JsonDB) GetAllocIdentities(allocID string) ([]*structs.SignedWorkloadIdentity, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.Identities[allocID], nil +} + +func (db *JsonDB) PutAllocConsulACLTokens(allocID string, tokens []*cstructs.ConsulACLToken, opts ...WriteOption) error { + + db.mu.Lock() + defer db.mu.Unlock() + db.ConsulACLTokens[allocID] = tokens + return db.save() +} + +func (db *JsonDB) GetAllocConsulACLTokens(allocID string) ([]*cstructs.ConsulACLToken, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.ConsulACLTokens[allocID], nil +} + +func (db *JsonDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { + db.mu.RLock() + defer db.mu.RUnlock() + + var ls *state.LocalState + var ts *structs.TaskState + + // Local Task State + allocLocalTS := db.LocalTaskState[allocID] + if len(allocLocalTS) != 0 { + ls = allocLocalTS[taskName] + } + + // Task State + allocTS := db.TaskState[allocID] + if len(allocTS) != 0 { + ts = allocTS[taskName] + } + + return ls, ts, nil +} + +func (db *JsonDB) PutTaskRunnerLocalState(allocID string, taskName string, val *state.LocalState) error { + db.mu.Lock() + defer db.mu.Unlock() + + if alts, ok := db.LocalTaskState[allocID]; ok { + alts[taskName] = val.Copy() + return db.save() + } + + db.LocalTaskState[allocID] = map[string]*state.LocalState{ + taskName: val.Copy(), + } + + return db.save() +} + +func (db *JsonDB) PutTaskState(allocID string, taskName string, state *structs.TaskState) error { + db.mu.Lock() + defer db.mu.Unlock() + + if ats, ok := db.TaskState[allocID]; ok { + ats[taskName] = state.Copy() + return db.save() + } + + db.TaskState[allocID] = map[string]*structs.TaskState{ + taskName: state.Copy(), + } + + return db.save() +} + +func (db *JsonDB) DeleteTaskBucket(allocID, taskName string) error { + db.mu.Lock() + defer db.mu.Unlock() + + if ats, ok := db.TaskState[allocID]; ok { + delete(ats, taskName) + } + + if alts, ok := db.LocalTaskState[allocID]; ok { + delete(alts, taskName) + } + + return db.save() +} + +func (db *JsonDB) DeleteAllocationBucket(allocID string, _ ...WriteOption) error { + db.mu.Lock() + defer db.mu.Unlock() + + delete(db.Allocs, allocID) + delete(db.TaskState, allocID) + delete(db.LocalTaskState, allocID) + delete(db.Identities, allocID) + + return db.save() +} + +func (db *JsonDB) PutDevicePluginState(ps *dmstate.PluginState) error { + db.mu.Lock() + defer db.mu.Unlock() + db.DevManagerPs = ps + return db.save() +} + +// GetDevicePluginState stores the device manager's plugin state or returns an +// error. +func (db *JsonDB) GetDevicePluginState() (*dmstate.PluginState, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.DevManagerPs, nil +} + +func (db *JsonDB) GetDriverPluginState() (*driverstate.PluginState, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.DriverManagerPs, nil +} + +func (db *JsonDB) PutDriverPluginState(ps *driverstate.PluginState) error { + db.mu.Lock() + defer db.mu.Unlock() + db.DriverManagerPs = ps + return db.save() +} + +func (db *JsonDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.DynManagerPs, nil +} + +func (db *JsonDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + db.mu.Lock() + defer db.mu.Unlock() + db.DynManagerPs = ps + return db.save() +} + +func (db *JsonDB) PutCheckResult(allocID string, qr *structs.CheckQueryResult) error { + db.mu.Lock() + defer db.mu.Unlock() + + if _, exists := db.Checks[allocID]; !exists { + db.Checks[allocID] = make(checks.AllocationResults) + } + + db.Checks[allocID][qr.ID] = qr + return db.save() +} + +func (db *JsonDB) GetCheckResults() (checks.ClientResults, error) { + db.mu.Lock() + defer db.mu.Unlock() + return maps.Clone(db.Checks), nil +} + +func (db *JsonDB) DeleteCheckResults(allocID string, checkIDs []structs.CheckID) error { + db.mu.Lock() + defer db.mu.Unlock() + for _, id := range checkIDs { + delete(db.Checks[allocID], id) + } + return db.save() +} + +func (db *JsonDB) PurgeCheckResults(allocID string) error { + db.mu.Lock() + defer db.mu.Unlock() + delete(db.Checks, allocID) + return db.save() +} + +func (db *JsonDB) PutNodeMeta(nm map[string]*string) error { + db.mu.Lock() + defer db.mu.Unlock() + db.NodeMeta = nm + return db.save() +} + +func (db *JsonDB) GetNodeMeta() (map[string]*string, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.NodeMeta, nil +} + +func (db *JsonDB) PutNodeRegistration(reg *cstructs.NodeRegistration) error { + db.mu.Lock() + defer db.mu.Unlock() + db.NodeReg = reg + return db.save() +} + +func (db *JsonDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.NodeReg, nil +} + +func (db *JsonDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error { + db.mu.Lock() + defer db.mu.Unlock() + db.DynHostVols[vol.ID] = vol + return db.save() +} +func (db *JsonDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + db.mu.Lock() + defer db.mu.Unlock() + var vols []*cstructs.HostVolumeState + for _, vol := range db.DynHostVols { + vols = append(vols, vol) + } + return vols, nil +} +func (db *JsonDB) DeleteDynamicHostVolume(s string) error { + db.mu.Lock() + defer db.mu.Unlock() + delete(db.DynHostVols, s) + return db.save() +} + +func (db *JsonDB) PutNodeIdentity(identity string) error { + db.mu.Lock() + defer db.mu.Unlock() + db.ClientIdent = identity + return db.save() +} + +func (db *JsonDB) GetNodeIdentity() (string, error) { + db.mu.Lock() + defer db.mu.Unlock() + return db.ClientIdent, nil +} + +func (db *JsonDB) Close() error { + db.mu.Lock() + defer db.mu.Unlock() + + if err := db.save(); err != nil { + return fmt.Errorf("error saving before close: %w", err) + } + + // Set everything to nil to blow up on further use + db.Allocs = nil + db.TaskState = nil + db.LocalTaskState = nil + + return nil +} diff --git a/helper/flock/flock.go b/helper/flock/flock.go new file mode 100644 index 00000000000..2fa48e38ce6 --- /dev/null +++ b/helper/flock/flock.go @@ -0,0 +1,19 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: MPL-2.0 + +package flock + +import ( + "errors" + "os" +) + +var ErrLocked = errors.New("file locked") + +func FLock(file *os.File) error { + return flock(file) +} + +func FUnlock(file *os.File) error { + return funlock(file) +} diff --git a/helper/flock/flock_linux.go b/helper/flock/flock_linux.go new file mode 100644 index 00000000000..f1b6ee1fdea --- /dev/null +++ b/helper/flock/flock_linux.go @@ -0,0 +1,32 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: MPL-2.0 + +//go:build linux + +package flock + +import ( + "fmt" + "os" + "syscall" +) + +func flock(file *os.File) error { + err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + if err == syscall.EWOULDBLOCK { + return fmt.Errorf("%w: %q", ErrLocked, file.Name()) + } + + return fmt.Errorf("flock error: %w: %q", err, file.Name()) + } + + return nil +} + +func funlock(file *os.File) error { + if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil { + return fmt.Errorf("funlock error: %w: %q", err, file.Name()) + } + return nil +} diff --git a/helper/flock/flock_nonlinux.go b/helper/flock/flock_nonlinux.go new file mode 100644 index 00000000000..2f84a57707b --- /dev/null +++ b/helper/flock/flock_nonlinux.go @@ -0,0 +1,11 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: MPL-2.0 + +//go:build !linux + +package flock + +import "os" + +func flock(file *os.File) error { return nil } +func funlock(file *os.File) error { return nil } From e4f9db6a586f270423ff11d79bfefb631957ea5f Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 22 May 2026 13:28:37 -0700 Subject: [PATCH 2/3] the abomination lives writes clients state to a file as json uses a temporary file + rename to perform an atomic write on unix uses a lock file to hopefully prevent disaster --- client/state/db_json.go | 152 ++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/client/state/db_json.go b/client/state/db_json.go index 470ee293ffe..ade377e3a8d 100644 --- a/client/state/db_json.go +++ b/client/state/db_json.go @@ -4,8 +4,6 @@ package state import ( - "bytes" - "crypto/sha256" "encoding/json" "errors" "fmt" @@ -13,6 +11,7 @@ import ( "maps" "os" "sync" + "time" "github.com/hashicorp/go-hclog" arstate "github.com/hashicorp/nomad/client/allocrunner/state" @@ -76,9 +75,11 @@ type JsonDB struct { // ClientIdent is the persisted identity of the client. ClientIdent string - logger hclog.Logger + lockFile *os.File + root *os.Root + pid int - file *os.File + logger hclog.Logger mu sync.RWMutex } @@ -91,20 +92,51 @@ func NewJsonDB(logger hclog.Logger, stateDir string) (StateDB, error) { // Open root root, err := os.OpenRoot(stateDir) if err != nil { + defer root.Close() return nil, fmt.Errorf("error opening state dir: %w", err) } - // Open/create file - fi, err := root.OpenFile("state.json", os.O_RDWR|os.O_CREATE, 0640) + // Open lock file + lockFile, err := root.OpenFile("state.json.lock", os.O_RDWR|os.O_CREATE, 0640) if err != nil { + defer root.Close() return nil, err } // Lock file - if err := flock.FLock(fi); err != nil { + if err := flock.FLock(lockFile); err != nil { + defer root.Close() + defer lockFile.Close() + if errors.Is(err, flock.ErrLocked) { + buf := make([]byte, 1000) + n, ferr := lockFile.Read(buf) + if ferr != nil { + return nil, fmt.Errorf("client lock file %s locked. error reading: %w", lockFile.Name(), ferr) + } + return nil, fmt.Errorf("client lock file %s locked. contents: %q", lockFile.Name(), string(buf[0:n])) + } return nil, err } + // Write info for concurrent openers + pid := os.Getpid() + lockmsg := []byte(fmt.Sprintf("%d at %s", pid, time.Now())) + if err := lockFile.Truncate(int64(len(lockmsg))); err != nil { + defer root.Close() + defer lockFile.Close() + return nil, fmt.Errorf("error truncating client lock file: %w", err) + } + if _, err := lockFile.Seek(0, io.SeekStart); err != nil { + defer root.Close() + defer lockFile.Close() + return nil, fmt.Errorf("error seeking in client lock file: %w", err) + } + if n, err := lockFile.Write(lockmsg); err != nil { + defer root.Close() + defer lockFile.Close() + return nil, fmt.Errorf("error writing client lock file (%d bytes written): %w", n, err) + } + // Initialize struct to empty values db := &JsonDB{ Allocs: make(map[string]*structs.Allocation), @@ -117,96 +149,60 @@ func NewJsonDB(logger hclog.Logger, stateDir string) (StateDB, error) { Identities: make(map[string][]*structs.SignedWorkloadIdentity), ConsulACLTokens: make(map[string][]*cstructs.ConsulACLToken), DynHostVols: make(map[string]*cstructs.HostVolumeState), + lockFile: lockFile, + root: root, + pid: pid, } db.logger = logger.Named(db.Name()) - // Decode client state - dec := json.NewDecoder(fi) - if err := dec.Decode(db); err != nil { - if errors.Is(err, io.EOF) { - // New file, nothing more to do - return db, nil - } - - // Unexpected error, funlock and return - _ = flock.FUnlock(fi) - _ = fi.Close() - return nil, fmt.Errorf("unable to decode client state: %w", err) - } - - // Record the end of the first object - off := dec.InputOffset() - - // Decode checksum record - meta := &JsonDBMeta{} - if err := dec.Decode(meta); err != nil { - return nil, fmt.Errorf("unable to decode client state metadata: %w", err) - } - if n := len(meta.Sha256sum); n != sha256.Size { - if n == 0 { - return nil, fmt.Errorf("no checksum for client state") - } else { - return nil, fmt.Errorf("client state checksum is the wrong size. expected: %d, found: %d", sha256.Size, n) - } + if err := db.load(); err != nil { + defer root.Close() + return nil, err } - hasher := sha256.New() - // Rewind and hash first object - if _, err := fi.Seek(0, 0); err != nil { - return nil, fmt.Errorf("failed to rewind client state file: %s", err) - } + return db, nil +} - lr := &io.LimitedReader{ - R: fi, - N: off, +func (db *JsonDB) load() error { + stateFile, err := db.root.Open("state.json") + if errors.Is(err, os.ErrNotExist) { + // Nothing to load + return nil } - if n, err := io.Copy(hasher, lr); err != nil { - return nil, fmt.Errorf("error checksumming client state file: %w", err) - } else if n != off { - return nil, fmt.Errorf("unexpected amount of client state checksummed. expected: %d, found: %d", off, n) + if err != nil { + return fmt.Errorf("error loading client state: %w", err) } + defer stateFile.Close() - // No need for constant time comparison as hash is only used to validate - // file. Anyone able to udpate the file should also be able to update the - // checksum. - if !bytes.Equal(hasher.Sum(nil), meta.Sha256sum) { - return nil, fmt.Errorf("client state file failed checksum. expected: %x, found: %x", meta.Sha256sum, hasher.Sum(nil)) + // Decode client state + dec := json.NewDecoder(stateFile) + if err := dec.Decode(db); err != nil { + return fmt.Errorf("unable to decode client state: %w", err) } - db.file = fi - - return db, nil + return nil } func (db *JsonDB) save() error { - // Encode - buf, err := json.MarshalIndent(db, "", " ") + tmpfn := fmt.Sprintf("state.json.%d.%d", db.pid, time.Now().UnixMilli()) + stateFile, err := db.root.OpenFile(tmpfn, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) if err != nil { - return fmt.Errorf("error encoding client state as json: %w", err) + return fmt.Errorf("error opening client sate file for writing: %w", err) } - // Checksum - sum := sha256.Sum256(buf) - meta := &JsonDBMeta{ - Sha256sum: sum[:], + enc := json.NewEncoder(stateFile) + enc.SetIndent("", " ") + if err := enc.Encode(db); err != nil { + _ = stateFile.Close() + return fmt.Errorf("error writing client state to %q: %w", tmpfn, err) } - if _, err := db.file.Seek(0, 0); err != nil { - return fmt.Errorf("failed to rewind client state file: %w", err) + if err := stateFile.Close(); err != nil { + return fmt.Errorf("error closing client state file %q: %w", tmpfn, err) } - // Newline delimit records - buf = append(buf, '\n') - - if _, err := db.file.Write(buf); err != nil { - return fmt.Errorf("failed to write client state: %w", err) - } - if err := json.NewEncoder(db.file).Encode(meta); err != nil { - return fmt.Errorf("failed to write client state metadata: %w", err) - } - - if err := db.file.Sync(); err != nil { - return fmt.Errorf("failed to sync client state: %w", err) + if err := db.root.Rename(tmpfn, "state.json"); err != nil { + return fmt.Errorf("error moving client state file: %w", err) } return nil @@ -543,6 +539,10 @@ func (db *JsonDB) Close() error { return fmt.Errorf("error saving before close: %w", err) } + _ = flock.FUnlock(db.lockFile) + _ = db.lockFile.Close() + _ = db.root.Close() + // Set everything to nil to blow up on further use db.Allocs = nil db.TaskState = nil From 70a8375e7870bd5e40af352b9c0cd3ca58f9309b Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 22 May 2026 14:28:28 -0700 Subject: [PATCH 3/3] fsyncs! --- client/state/db_json.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/client/state/db_json.go b/client/state/db_json.go index ade377e3a8d..220388088eb 100644 --- a/client/state/db_json.go +++ b/client/state/db_json.go @@ -136,6 +136,11 @@ func NewJsonDB(logger hclog.Logger, stateDir string) (StateDB, error) { defer lockFile.Close() return nil, fmt.Errorf("error writing client lock file (%d bytes written): %w", n, err) } + if err := lockFile.Sync(); err != nil { + defer root.Close() + defer lockFile.Close() + return nil, fmt.Errorf("error syncing client lock file: %w", err) + } // Initialize struct to empty values db := &JsonDB{ @@ -197,6 +202,10 @@ func (db *JsonDB) save() error { return fmt.Errorf("error writing client state to %q: %w", tmpfn, err) } + if err := stateFile.Sync(); err != nil { + return fmt.Errorf("error syncing client state file %q: %w", tmpfn, err) + } + if err := stateFile.Close(); err != nil { return fmt.Errorf("error closing client state file %q: %w", tmpfn, err) }