From 8bceab57fc9e299bcbfb63528246f983a82543fa Mon Sep 17 00:00:00 2001 From: Leonardo Araujo Date: Sat, 11 Apr 2026 18:19:45 -0300 Subject: [PATCH 1/2] update Makefile --- Makefile | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Makefile b/Makefile index 835fce5..e94efc9 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,17 @@ .PHONY: test build +fmt: + go fmt ./... + test: go test ./... -race test-coverage: go test ./... -race -coverprofile=coverage.out +vet: + go vet ./... + build: mkdir -p bin go build -o bin/agentctl ./cmd/agentctl From 9b416500f65e254093639bcaa134c222d98490e4 Mon Sep 17 00:00:00 2001 From: Leonardo Araujo Date: Sat, 11 Apr 2026 18:19:58 -0300 Subject: [PATCH 2/2] feat(apply): persist plan to deployment store in one transaction Add Applier.ApplyPlan and executor to upsert/delete applied_resources and upsert applied_projects using env and project spec_hash as version. Extend plan.Operation with SpecHash and NormalizedSpecJSON; add ProjectDeploymentMeta. DeploymentStore gains DeleteAppliedResource; sqlite refactors deployment SQL into deployment_ops and implements TransactionalDeployment via RunDeploymentTx. Tests cover apply then list/get and delete removing rows (issue #15). Made-with: Cursor --- internal/apply/applier.go | 59 +++++++++++ internal/apply/applier_test.go | 129 ++++++++++++++++++++++++ internal/apply/apply.go | 26 ----- internal/apply/doc.go | 3 + internal/apply/executor.go | 51 ++++++++++ internal/plan/plan.go | 9 +- internal/plan/plan_test.go | 4 + internal/plan/planner.go | 16 ++- internal/plan/planner_test.go | 4 + internal/plan/project_meta.go | 25 +++++ internal/state/doc.go | 3 +- internal/state/sqlite/deployment_ops.go | 125 +++++++++++++++++++++++ internal/state/sqlite/deployment_tx.go | 58 +++++++++++ internal/state/sqlite/interfaces.go | 5 +- internal/state/sqlite/store.go | 94 ++--------------- internal/state/store.go | 7 ++ 16 files changed, 500 insertions(+), 118 deletions(-) create mode 100644 internal/apply/applier.go create mode 100644 internal/apply/applier_test.go delete mode 100644 internal/apply/apply.go create mode 100644 internal/apply/executor.go create mode 100644 internal/plan/project_meta.go create mode 100644 internal/state/sqlite/deployment_ops.go create mode 100644 internal/state/sqlite/deployment_tx.go diff --git a/internal/apply/applier.go b/internal/apply/applier.go new file mode 100644 index 0000000..6240e71 --- /dev/null +++ b/internal/apply/applier.go @@ -0,0 +1,59 @@ +package apply + +import ( + "context" + "errors" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/plan" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +// Applier mutates deployment state from an apply operation (design doc §5.2, §12.2 D). +type Applier struct { + Deploy state.DeploymentStore +} + +// NewApplier returns an applier backed by dep. +func NewApplier(dep state.DeploymentStore) *Applier { + return &Applier{Deploy: dep} +} + +// RecordAppliedResource upserts one applied resource row. +func (a *Applier) RecordAppliedResource(ctx context.Context, r state.AppliedResource) error { + if a == nil || a.Deploy == nil { + return errors.New("apply: nil applier or deployment store") + } + return a.Deploy.UpsertAppliedResource(ctx, r) +} + +// ApplyPlan persists all plan operations and updates applied_projects for env (issue #15). +// When dep implements [state.TransactionalDeployment] (e.g. SQLite), the whole apply runs in one transaction. +func (a *Applier) ApplyPlan(ctx context.Context, env string, g *spec.ProjectGraph, p *plan.Plan, at time.Time) error { + if a == nil || a.Deploy == nil { + return errors.New("apply: nil applier or deployment store") + } + if g == nil { + return errors.New("apply: nil project graph") + } + if p == nil { + return errors.New("apply: nil plan") + } + if env == "" { + return errors.New("apply: empty env") + } + projectName, projectVersion, err := plan.ProjectDeploymentMeta(g) + if err != nil { + return err + } + at = at.UTC() + + run := func(ctx context.Context, dep state.DeploymentStore) error { + return executePlan(ctx, dep, env, p, at, projectName, projectVersion) + } + if tx, ok := a.Deploy.(state.TransactionalDeployment); ok { + return tx.RunDeploymentTx(ctx, run) + } + return run(ctx, a.Deploy) +} diff --git a/internal/apply/applier_test.go b/internal/apply/applier_test.go new file mode 100644 index 0000000..f559c32 --- /dev/null +++ b/internal/apply/applier_test.go @@ -0,0 +1,129 @@ +package apply + +import ( + "context" + "database/sql" + "errors" + "path/filepath" + "testing" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/plan" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" +) + +func minimalGraph() *spec.ProjectGraph { + return &spec.ProjectGraph{ + Meta: spec.Metadata{Name: "acme"}, + Spec: spec.ProjectSpec{}, + Agents: map[string]*spec.AgentResource{}, + Tools: map[string]*spec.ToolResource{}, + Workflows: map[string]*spec.WorkflowResource{}, + Policies: map[string]*spec.PolicyResource{}, + Environments: map[string]*spec.EnvironmentResource{}, + } +} + +func graphWithAgent() *spec.ProjectGraph { + g := minimalGraph() + g.Agents["rev"] = &spec.AgentResource{ + APIVersion: spec.APIVersionV0, + Kind: spec.KindAgent, + Metadata: spec.Metadata{Name: "rev"}, + Spec: spec.AgentSpec{Model: "m", Policy: "default"}, + } + return g +} + +func TestApplyPlan_thenListShowsResources(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "apply.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + g := minimalGraph() + pl := plan.NewPlanner(st) + p, err := pl.ComputePlan(ctx, "dev", g) + if err != nil { + t.Fatal(err) + } + at := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + ap := NewApplier(st) + if err := ap.ApplyPlan(ctx, "dev", g, p, at); err != nil { + t.Fatal(err) + } + + list, err := st.ListAppliedResourcesByEnv(ctx, "dev") + if err != nil { + t.Fatal(err) + } + if len(list) != 1 { + t.Fatalf("resources: %+v", list) + } + if list[0].Kind != spec.KindProject || list[0].Name != "acme" { + t.Fatalf("got %+v", list[0]) + } + got, err := st.GetAppliedResource(ctx, "dev", spec.ResourceID{Kind: spec.KindProject, Name: "acme"}) + if err != nil { + t.Fatal(err) + } + if got.SpecHash == "" || got.NormalizedSpecJSON == "" { + t.Fatalf("missing spec material: %+v", got) + } + + proj, err := st.GetAppliedProject(ctx, "dev", "acme") + if err != nil { + t.Fatal(err) + } + if proj.Version == "" { + t.Fatalf("applied_projects.version empty: %+v", proj) + } +} + +func TestApplyPlan_deleteRemovesRow(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "apply-del.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + pl := plan.NewPlanner(st) + ap := NewApplier(st) + t0 := time.Date(2026, 4, 11, 10, 0, 0, 0, time.UTC) + t1 := t0.Add(time.Hour) + + gFull := graphWithAgent() + p1, err := pl.ComputePlan(ctx, "dev", gFull) + if err != nil { + t.Fatal(err) + } + if err := ap.ApplyPlan(ctx, "dev", gFull, p1, t0); err != nil { + t.Fatal(err) + } + + gOnly := minimalGraph() + p2, err := pl.ComputePlan(ctx, "dev", gOnly) + if err != nil { + t.Fatal(err) + } + if err := ap.ApplyPlan(ctx, "dev", gOnly, p2, t1); err != nil { + t.Fatal(err) + } + + _, err = st.GetAppliedResource(ctx, "dev", spec.ResourceID{Kind: spec.KindAgent, Name: "rev"}) + if !errors.Is(err, sql.ErrNoRows) { + t.Fatalf("agent row should be gone: %v", err) + } + + list, err := st.ListAppliedResourcesByEnv(ctx, "dev") + if err != nil { + t.Fatal(err) + } + if len(list) != 1 || list[0].Kind != spec.KindProject { + t.Fatalf("want project only, got %+v", list) + } +} diff --git a/internal/apply/apply.go b/internal/apply/apply.go deleted file mode 100644 index 459b31c..0000000 --- a/internal/apply/apply.go +++ /dev/null @@ -1,26 +0,0 @@ -package apply - -import ( - "context" - "errors" - - "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" -) - -// Applier mutates deployment state from an apply operation (design doc §5.2). -type Applier struct { - Deploy state.DeploymentStore -} - -// NewApplier returns an applier backed by dep. -func NewApplier(dep state.DeploymentStore) *Applier { - return &Applier{Deploy: dep} -} - -// RecordAppliedResource upserts one applied resource row. -func (a *Applier) RecordAppliedResource(ctx context.Context, r state.AppliedResource) error { - if a == nil || a.Deploy == nil { - return errors.New("apply: nil deployment store") - } - return a.Deploy.UpsertAppliedResource(ctx, r) -} diff --git a/internal/apply/doc.go b/internal/apply/doc.go index 83019f0..f377c89 100644 --- a/internal/apply/doc.go +++ b/internal/apply/doc.go @@ -1,2 +1,5 @@ // Package apply applies plans to control-plane and runtime state. +// +// [Applier.ApplyPlan] writes applied_resources and applied_projects using [plan.Plan] operations. +// SQLite uses a single transaction via [state.TransactionalDeployment] (issue #15). package apply diff --git a/internal/apply/executor.go b/internal/apply/executor.go new file mode 100644 index 0000000..4a645df --- /dev/null +++ b/internal/apply/executor.go @@ -0,0 +1,51 @@ +package apply + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/plan" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +func executePlan( + ctx context.Context, + dep state.DeploymentStore, + env string, + p *plan.Plan, + at time.Time, + projectName, projectVersion string, +) error { + for _, op := range p.Operations { + switch op.Action { + case plan.ActionCreate, plan.ActionUpdate: + if strings.TrimSpace(op.SpecHash) == "" || strings.TrimSpace(op.NormalizedSpecJSON) == "" { + return fmt.Errorf("apply: %s operation for %s missing spec hash or normalized JSON", op.Action, op.Target.String()) + } + if err := dep.UpsertAppliedResource(ctx, state.AppliedResource{ + Kind: op.Target.Kind, + Name: op.Target.Name, + Env: env, + SpecHash: op.SpecHash, + NormalizedSpecJSON: op.NormalizedSpecJSON, + AppliedAt: at, + }); err != nil { + return err + } + case plan.ActionDelete: + if err := dep.DeleteAppliedResource(ctx, env, op.Target); err != nil { + return err + } + default: + return fmt.Errorf("apply: unknown operation action %q", op.Action) + } + } + return dep.UpsertAppliedProject(ctx, state.AppliedProject{ + ProjectName: projectName, + Env: env, + Version: projectVersion, + AppliedAt: at, + }) +} diff --git a/internal/plan/plan.go b/internal/plan/plan.go index f6acad8..2d58aed 100644 --- a/internal/plan/plan.go +++ b/internal/plan/plan.go @@ -22,10 +22,13 @@ type Plan struct { } // Operation is one create, update, or delete against a resource identity. +// SpecHash and NormalizedSpecJSON are set for create and update (apply material); they are empty for delete. type Operation struct { - Action string - Target spec.ResourceID - Diff []FieldChange + Action string + Target spec.ResourceID + Diff []FieldChange + SpecHash string + NormalizedSpecJSON string } // FieldChange is one normalized field-level delta for updates (§10.2 plan output). diff --git a/internal/plan/plan_test.go b/internal/plan/plan_test.go index 16dbe70..6965247 100644 --- a/internal/plan/plan_test.go +++ b/internal/plan/plan_test.go @@ -35,6 +35,10 @@ func (s *stubDeploy) GetAppliedProject(context.Context, string, string) (*state. return nil, errors.New("stub") } +func (s *stubDeploy) DeleteAppliedResource(context.Context, string, spec.ResourceID) error { + return errors.New("stub") +} + func TestPlanner_listAppliedResources_usesDeploymentStoreOnly(t *testing.T) { st := &stubDeploy{list: []state.AppliedResource{{Name: "agent-a", Env: "dev", Kind: "Agent"}}} p := plan.NewPlanner(st) diff --git a/internal/plan/planner.go b/internal/plan/planner.go index fd9af63..138eb24 100644 --- a/internal/plan/planner.go +++ b/internal/plan/planner.go @@ -54,7 +54,13 @@ func (p *Planner) ComputePlan(ctx context.Context, env string, g *spec.ProjectGr key := resourceMapKey(d.id.Kind, d.id.Name) prev, ok := appliedByID[key] if !ok { - ops = append(ops, Operation{Action: ActionCreate, Target: d.id, Diff: nil}) + ops = append(ops, Operation{ + Action: ActionCreate, + Target: d.id, + Diff: nil, + SpecHash: d.hash, + NormalizedSpecJSON: d.json, + }) continue } if prev.SpecHash == d.hash { @@ -68,7 +74,13 @@ func (p *Planner) ComputePlan(ctx context.Context, env string, g *spec.ProjectGr if err != nil { return nil, err } - ops = append(ops, Operation{Action: ActionUpdate, Target: d.id, Diff: diff}) + ops = append(ops, Operation{ + Action: ActionUpdate, + Target: d.id, + Diff: diff, + SpecHash: d.hash, + NormalizedSpecJSON: d.json, + }) } for _, r := range applied { diff --git a/internal/plan/planner_test.go b/internal/plan/planner_test.go index 996cb08..957cec9 100644 --- a/internal/plan/planner_test.go +++ b/internal/plan/planner_test.go @@ -34,6 +34,10 @@ func (f *fakeDeploy) GetAppliedProject(context.Context, string, string) (*state. return nil, nil } +func (f *fakeDeploy) DeleteAppliedResource(context.Context, string, spec.ResourceID) error { + return nil +} + func minimalGraph() *spec.ProjectGraph { return &spec.ProjectGraph{ Meta: spec.Metadata{Name: "acme"}, diff --git a/internal/plan/project_meta.go b/internal/plan/project_meta.go new file mode 100644 index 0000000..865361e --- /dev/null +++ b/internal/plan/project_meta.go @@ -0,0 +1,25 @@ +package plan + +import ( + "fmt" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" +) + +// ProjectDeploymentMeta returns the Project resource name and spec_hash for applied_projects.version +// after the same normalization as [Planner.ComputePlan] (issue #15). +func ProjectDeploymentMeta(g *spec.ProjectGraph) (projectName string, projectSpecHash string, err error) { + if g == nil { + return "", "", fmt.Errorf("plan: nil project graph") + } + rows, err := desiredRows(g) + if err != nil { + return "", "", err + } + for _, r := range rows { + if r.id.Kind == spec.KindProject { + return r.id.Name, r.hash, nil + } + } + return "", "", fmt.Errorf("plan: graph has no Project resource") +} diff --git a/internal/state/doc.go b/internal/state/doc.go index 8e4d4e3..ceb4180 100644 --- a/internal/state/doc.go +++ b/internal/state/doc.go @@ -1,6 +1,7 @@ // Package state stores deployment state and runtime metadata (design doc §5.2, §14). // -// [DeploymentStore] and [RuntimeStore] are the boundaries plan, apply, and runtime code should use +// [DeploymentStore], [TransactionalDeployment], and [RuntimeStore] are the boundaries plan, apply, +// and runtime code should use // so callers do not depend on a specific SQL backend. MVP implements them in internal/state/sqlite. // // Thread-safety: interfaces assume a single-process CLI unless a concrete backend documents diff --git a/internal/state/sqlite/deployment_ops.go b/internal/state/sqlite/deployment_ops.go new file mode 100644 index 0000000..4b5127a --- /dev/null +++ b/internal/state/sqlite/deployment_ops.go @@ -0,0 +1,125 @@ +package sqlite + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +// querier is implemented by *sql.DB and *sql.Tx for deployment table access. +type querier interface { + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) + QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row +} + +func upsertAppliedResource(ctx context.Context, q querier, r state.AppliedResource) error { + at := r.AppliedAt.UTC().Format(time.RFC3339Nano) + _, err := q.ExecContext(ctx, ` +INSERT INTO applied_resources (kind, name, env, spec_hash, normalized_spec_json, applied_at) +VALUES (?, ?, ?, ?, ?, ?) +ON CONFLICT(kind, name, env) DO UPDATE SET + spec_hash = excluded.spec_hash, + normalized_spec_json = excluded.normalized_spec_json, + applied_at = excluded.applied_at +`, r.Kind, r.Name, r.Env, r.SpecHash, r.NormalizedSpecJSON, at) + return err +} + +func getAppliedResource(ctx context.Context, q querier, env string, id spec.ResourceID) (*state.AppliedResource, error) { + row := q.QueryRowContext(ctx, ` +SELECT kind, name, env, spec_hash, normalized_spec_json, applied_at +FROM applied_resources +WHERE env = ? AND kind = ? AND name = ? +`, env, id.Kind, id.Name) + var r state.AppliedResource + var at string + if err := row.Scan(&r.Kind, &r.Name, &r.Env, &r.SpecHash, &r.NormalizedSpecJSON, &at); err != nil { + return nil, err + } + t, err := parseSQLiteTime(at) + if err != nil { + return nil, fmt.Errorf("applied_at: %w", err) + } + r.AppliedAt = t + return &r, nil +} + +func listAppliedResourcesByEnv(ctx context.Context, q querier, env string) ([]state.AppliedResource, error) { + rows, err := q.QueryContext(ctx, ` +SELECT kind, name, env, spec_hash, normalized_spec_json, applied_at +FROM applied_resources +WHERE env = ? +ORDER BY kind, name +`, env) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []state.AppliedResource + for rows.Next() { + var r state.AppliedResource + var at string + if err := rows.Scan(&r.Kind, &r.Name, &r.Env, &r.SpecHash, &r.NormalizedSpecJSON, &at); err != nil { + return nil, err + } + t, err := parseSQLiteTime(at) + if err != nil { + return nil, err + } + r.AppliedAt = t + out = append(out, r) + } + return out, rows.Err() +} + +func deleteAppliedResource(ctx context.Context, q querier, env string, id spec.ResourceID) error { + _, err := q.ExecContext(ctx, ` +DELETE FROM applied_resources +WHERE env = ? AND kind = ? AND name = ? +`, env, id.Kind, id.Name) + return err +} + +func upsertAppliedProject(ctx context.Context, q querier, p state.AppliedProject) error { + at := p.AppliedAt.UTC().Format(time.RFC3339Nano) + _, err := q.ExecContext(ctx, ` +INSERT INTO applied_projects (project_name, env, version, applied_at) +VALUES (?, ?, ?, ?) +ON CONFLICT(project_name, env) DO UPDATE SET + version = excluded.version, + applied_at = excluded.applied_at +`, p.ProjectName, p.Env, p.Version, at) + return err +} + +func getAppliedProject(ctx context.Context, q querier, env, projectName string) (*state.AppliedProject, error) { + row := q.QueryRowContext(ctx, ` +SELECT project_name, env, version, applied_at +FROM applied_projects +WHERE env = ? AND project_name = ? +`, env, projectName) + var p state.AppliedProject + var at string + if err := row.Scan(&p.ProjectName, &p.Env, &p.Version, &at); err != nil { + return nil, err + } + t, err := parseSQLiteTime(at) + if err != nil { + return nil, err + } + p.AppliedAt = t + return &p, nil +} + +func parseSQLiteTime(s string) (time.Time, error) { + if t, err := time.Parse(time.RFC3339Nano, s); err == nil { + return t, nil + } + return time.Parse(time.RFC3339, s) +} diff --git a/internal/state/sqlite/deployment_tx.go b/internal/state/sqlite/deployment_tx.go new file mode 100644 index 0000000..a33c6e8 --- /dev/null +++ b/internal/state/sqlite/deployment_tx.go @@ -0,0 +1,58 @@ +package sqlite + +import ( + "context" + "database/sql" + "errors" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +// deploymentStoreTx implements [state.DeploymentStore] using a single SQL transaction. +type deploymentStoreTx struct { + tx *sql.Tx +} + +func (t *deploymentStoreTx) UpsertAppliedResource(ctx context.Context, r state.AppliedResource) error { + return upsertAppliedResource(ctx, t.tx, r) +} + +func (t *deploymentStoreTx) GetAppliedResource(ctx context.Context, env string, id spec.ResourceID) (*state.AppliedResource, error) { + return getAppliedResource(ctx, t.tx, env, id) +} + +func (t *deploymentStoreTx) ListAppliedResourcesByEnv(ctx context.Context, env string) ([]state.AppliedResource, error) { + return listAppliedResourcesByEnv(ctx, t.tx, env) +} + +func (t *deploymentStoreTx) DeleteAppliedResource(ctx context.Context, env string, id spec.ResourceID) error { + return deleteAppliedResource(ctx, t.tx, env, id) +} + +func (t *deploymentStoreTx) UpsertAppliedProject(ctx context.Context, p state.AppliedProject) error { + return upsertAppliedProject(ctx, t.tx, p) +} + +func (t *deploymentStoreTx) GetAppliedProject(ctx context.Context, env, projectName string) (*state.AppliedProject, error) { + return getAppliedProject(ctx, t.tx, env, projectName) +} + +// RunDeploymentTx runs fn with a [state.DeploymentStore] backed by one SQLite transaction. +// The transaction commits only if fn returns nil. +func (s *Store) RunDeploymentTx(ctx context.Context, fn func(ctx context.Context, dep state.DeploymentStore) error) error { + if s == nil || s.db == nil { + return errors.New("sqlite: nil store") + } + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + w := &deploymentStoreTx{tx: tx} + if err := fn(ctx, w); err != nil { + return err + } + return tx.Commit() +} diff --git a/internal/state/sqlite/interfaces.go b/internal/state/sqlite/interfaces.go index 786f136..a0211a9 100644 --- a/internal/state/sqlite/interfaces.go +++ b/internal/state/sqlite/interfaces.go @@ -4,6 +4,7 @@ import "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state // Compile-time check: *Store implements state facades (issue #11). var ( - _ state.DeploymentStore = (*Store)(nil) - _ state.RuntimeStore = (*Store)(nil) + _ state.DeploymentStore = (*Store)(nil) + _ state.TransactionalDeployment = (*Store)(nil) + _ state.RuntimeStore = (*Store)(nil) ) diff --git a/internal/state/sqlite/store.go b/internal/state/sqlite/store.go index f234753..061627a 100644 --- a/internal/state/sqlite/store.go +++ b/internal/state/sqlite/store.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" @@ -52,104 +51,31 @@ func (s *Store) Close() error { // UpsertAppliedResource inserts or replaces a row for (kind, name, env). func (s *Store) UpsertAppliedResource(ctx context.Context, r state.AppliedResource) error { - at := r.AppliedAt.UTC().Format(time.RFC3339Nano) - _, err := s.db.ExecContext(ctx, ` -INSERT INTO applied_resources (kind, name, env, spec_hash, normalized_spec_json, applied_at) -VALUES (?, ?, ?, ?, ?, ?) -ON CONFLICT(kind, name, env) DO UPDATE SET - spec_hash = excluded.spec_hash, - normalized_spec_json = excluded.normalized_spec_json, - applied_at = excluded.applied_at -`, r.Kind, r.Name, r.Env, r.SpecHash, r.NormalizedSpecJSON, at) - return err + return upsertAppliedResource(ctx, s.db, r) } // GetAppliedResource returns the row for env and ResourceID, or sql.ErrNoRows. func (s *Store) GetAppliedResource(ctx context.Context, env string, id spec.ResourceID) (*state.AppliedResource, error) { - row := s.db.QueryRowContext(ctx, ` -SELECT kind, name, env, spec_hash, normalized_spec_json, applied_at -FROM applied_resources -WHERE env = ? AND kind = ? AND name = ? -`, env, id.Kind, id.Name) - var r state.AppliedResource - var at string - if err := row.Scan(&r.Kind, &r.Name, &r.Env, &r.SpecHash, &r.NormalizedSpecJSON, &at); err != nil { - return nil, err - } - t, err := parseSQLiteTime(at) - if err != nil { - return nil, fmt.Errorf("applied_at: %w", err) - } - r.AppliedAt = t - return &r, nil + return getAppliedResource(ctx, s.db, env, id) } // ListAppliedResourcesByEnv lists all applied resources for the given environment. func (s *Store) ListAppliedResourcesByEnv(ctx context.Context, env string) ([]state.AppliedResource, error) { - rows, err := s.db.QueryContext(ctx, ` -SELECT kind, name, env, spec_hash, normalized_spec_json, applied_at -FROM applied_resources -WHERE env = ? -ORDER BY kind, name -`, env) - if err != nil { - return nil, err - } - defer rows.Close() + return listAppliedResourcesByEnv(ctx, s.db, env) +} - var out []state.AppliedResource - for rows.Next() { - var r state.AppliedResource - var at string - if err := rows.Scan(&r.Kind, &r.Name, &r.Env, &r.SpecHash, &r.NormalizedSpecJSON, &at); err != nil { - return nil, err - } - t, err := parseSQLiteTime(at) - if err != nil { - return nil, err - } - r.AppliedAt = t - out = append(out, r) - } - return out, rows.Err() +// DeleteAppliedResource removes one applied_resources row. It is idempotent: deleting a +// non-existent row returns nil. +func (s *Store) DeleteAppliedResource(ctx context.Context, env string, id spec.ResourceID) error { + return deleteAppliedResource(ctx, s.db, env, id) } // UpsertAppliedProject inserts or replaces a row for (project_name, env). func (s *Store) UpsertAppliedProject(ctx context.Context, p state.AppliedProject) error { - at := p.AppliedAt.UTC().Format(time.RFC3339Nano) - _, err := s.db.ExecContext(ctx, ` -INSERT INTO applied_projects (project_name, env, version, applied_at) -VALUES (?, ?, ?, ?) -ON CONFLICT(project_name, env) DO UPDATE SET - version = excluded.version, - applied_at = excluded.applied_at -`, p.ProjectName, p.Env, p.Version, at) - return err + return upsertAppliedProject(ctx, s.db, p) } // GetAppliedProject returns the row for project name and env, or sql.ErrNoRows. func (s *Store) GetAppliedProject(ctx context.Context, env, projectName string) (*state.AppliedProject, error) { - row := s.db.QueryRowContext(ctx, ` -SELECT project_name, env, version, applied_at -FROM applied_projects -WHERE env = ? AND project_name = ? -`, env, projectName) - var p state.AppliedProject - var at string - if err := row.Scan(&p.ProjectName, &p.Env, &p.Version, &at); err != nil { - return nil, err - } - t, err := parseSQLiteTime(at) - if err != nil { - return nil, err - } - p.AppliedAt = t - return &p, nil -} - -func parseSQLiteTime(s string) (time.Time, error) { - if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return t, nil - } - return time.Parse(time.RFC3339, s) + return getAppliedProject(ctx, s.db, env, projectName) } diff --git a/internal/state/store.go b/internal/state/store.go index 9c11b63..97eeab6 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -15,10 +15,17 @@ type DeploymentStore interface { UpsertAppliedResource(ctx context.Context, r AppliedResource) error GetAppliedResource(ctx context.Context, env string, id spec.ResourceID) (*AppliedResource, error) ListAppliedResourcesByEnv(ctx context.Context, env string) ([]AppliedResource, error) + DeleteAppliedResource(ctx context.Context, env string, id spec.ResourceID) error UpsertAppliedProject(ctx context.Context, p AppliedProject) error GetAppliedProject(ctx context.Context, env, projectName string) (*AppliedProject, error) } +// TransactionalDeployment runs deployment mutations in a single atomic transaction when supported +// (design doc §12.2 D apply, issue #15). +type TransactionalDeployment interface { + RunDeploymentTx(ctx context.Context, fn func(ctx context.Context, dep DeploymentStore) error) error +} + // RuntimeStore persists execution rows from design doc §14.2 (runs, run_steps, trace_events). // // Thread-safety: same expectations as [DeploymentStore].