Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
golang.org/x/crypto v0.53.0
golang.org/x/mod v0.37.0
golang.org/x/sync v0.21.0
google.golang.org/grpc v1.81.1
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -320,7 +321,6 @@ require (
golang.org/x/tools v0.45.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect
google.golang.org/grpc v1.81.1 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
66 changes: 66 additions & 0 deletions jd/changesets/jobs/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package jobs

import (
"errors"
"fmt"

cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations"
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"

jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
)

var _ cldf.ChangeSetV2[DeleteJobsInput] = DeleteJobsChangeset{}

// DeleteJobsInput is the serializable input of DeleteJobsChangeset.
type DeleteJobsInput = jdops.DeleteJobsInput

// DeleteJobsChangeset deletes jobs.
// Only jobs with proposals in PROPOSED, APPROVED, or PENDING state are eligible.
type DeleteJobsChangeset struct{}

func (DeleteJobsChangeset) VerifyPreconditions(env cldf.Environment, cfg DeleteJobsInput) error {
if len(cfg.JobIDs) == 0 {
return errors.New("no job_ids provided")
}
seen := make(map[string]struct{}, len(cfg.JobIDs))
for _, id := range cfg.JobIDs {
if id == "" {
return errors.New("job id cannot be empty")
}
if _, ok := seen[id]; ok {
return fmt.Errorf("duplicate job id %q", id)
}
seen[id] = struct{}{}
}

jobs, err := env.Offchain.ListJobs(env.GetContext(), &jobv1.ListJobsRequest{
Filter: &jobv1.ListJobsRequest_Filter{Ids: cfg.JobIDs, IncludeDeleted: true},
})
if err != nil {
return fmt.Errorf("failed to list jobs: %w", err)
}
for _, j := range jobs.Jobs {
if j.DeletedAt != nil {
return fmt.Errorf("job %q is already deleted", j.Id)
}
}
if len(jobs.Jobs) != len(cfg.JobIDs) {
found := make([]string, 0, len(jobs.Jobs))
for _, j := range jobs.Jobs {
found = append(found, j.Id)
}

return fmt.Errorf("not all jobs found: requested %v, found %v", cfg.JobIDs, found)
}

return nil
}

func (DeleteJobsChangeset) Apply(e cldf.Environment, input DeleteJobsInput) (cldf.ChangesetOutput, error) {
deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name}
_, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDDeleteJobs, deps, input)

return cldf.ChangesetOutput{}, err
}
82 changes: 82 additions & 0 deletions jd/changesets/jobs/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package jobs

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"
)

func TestDeleteJobsChangeset(t *testing.T) {
t.Parallel()

t.Run("deletes eligible jobs", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

nodeID := registerTestNode(t, rt)
jobID := proposeJob(t, rt, nodeID)

_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
JobIDs: []string{jobID},
})
require.NoError(t, err)
})

t.Run("precondition — empty job list rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{})
require.ErrorContains(t, err, "no job_ids provided")
})

t.Run("precondition — duplicate job id rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
JobIDs: []string{"job_1", "job_1"},
})
require.ErrorContains(t, err, "duplicate job id")
})

t.Run("precondition — non-existent job rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
JobIDs: []string{"job_does_not_exist"},
})
require.Error(t, err)
})

t.Run("precondition — already-deleted job rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

nodeID := registerTestNode(t, rt)
jobID := proposeJob(t, rt, nodeID)

_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
JobIDs: []string{jobID},
})
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
JobIDs: []string{jobID},
})
require.ErrorContains(t, err, "already deleted")
})
}
25 changes: 25 additions & 0 deletions jd/changesets/jobs/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Package jobs provides changesets for managing Job Distributor jobs.
//
// # Usage
//
// import "github.com/smartcontractkit/cld-changesets/jd/changesets/jobs"
//
// // Propose jobs to nodes matched by domain and label:
// _, err := runtime.ExecChangeset(rt, jobs.ProposeJobsChangeset{}, jobs.ProposeJobsInput{
// Domain: "keystone",
// Jobs: []jobs.JobSpec{{
// NodeLabels: map[string]string{"target": "oracle-1"},
// JobspecTOML: "...",
// }},
// })
//
// // Revoke jobs by ID (idempotent — already-absent IDs are not an error):
// _, err = runtime.ExecChangeset(rt, jobs.RevokeJobsChangeset{}, jobs.RevokeJobsInput{
// JobIDs: []string{"job-id-1"},
// })
//
// // Delete jobs by ID (preconditions verify existence and eligibility):
// _, err = runtime.ExecChangeset(rt, jobs.DeleteJobsChangeset{}, jobs.DeleteJobsInput{
// JobIDs: []string{"job-id-1"},
// })
package jobs
49 changes: 49 additions & 0 deletions jd/changesets/jobs/jobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package jobs

import (
"testing"

jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"

jdnodes "github.com/smartcontractkit/cld-changesets/jd/changesets/nodes"
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
)

const testJobTOML = `
type = "offchainreporting2"
name = "test-job"
contractID = "0x0000000000000000000000000000000000000001"
externalJobID = "00000000-0000-0000-0000-000000000001"
`

func registerTestNode(t *testing.T, rt *runtime.Runtime) string {
t.Helper()
_, err := runtime.ExecChangeset(rt, jdnodes.RegisterNodesChangeset{}, jdnodes.RegisterNodesInput{
Domain: "keystone",
Nodes: []jdnodes.NodeToRegister{{
Name: "oracle-1",
CSAKey: "csa-key-1",
Labels: map[string]string{"target": "oracle-1"},
}},
})
require.NoError(t, err)
node, err := jdops.ListNodeByPublicKey(t.Context(), rt.Environment().Offchain, "csa-key-1")
require.NoError(t, err)
require.NotNil(t, node, "node not found after registration")

return node.GetId()
}

func proposeJob(t *testing.T, rt *runtime.Runtime, nodeID string) string {
t.Helper()
resp, err := rt.Environment().Offchain.ProposeJob(t.Context(), &jobv1.ProposeJobRequest{
NodeId: nodeID,
Spec: testJobTOML,
})
require.NoError(t, err)

return resp.GetProposal().GetJobId()
}
53 changes: 53 additions & 0 deletions jd/changesets/jobs/propose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package jobs

import (
"errors"
"fmt"

cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations"

jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
)

var _ cldf.ChangeSetV2[ProposeJobsInput] = ProposeJobsChangeset{}

// JobSpec is the per-job input for ProposeJobsInput.
type JobSpec = jdops.JobSpec

// ProposeJobsInput is the serializable input of ProposeJobsChangeset.
type ProposeJobsInput = jdops.ProposeJobsInput

// ProposeJobsChangeset proposes a batch of job specs to nodes matched by label.
type ProposeJobsChangeset struct{}

func (ProposeJobsChangeset) VerifyPreconditions(_ cldf.Environment, cfg ProposeJobsInput) error {
if cfg.Domain == "" {
return errors.New("domain is required")
}
if len(cfg.Jobs) == 0 {
return errors.New("no jobs provided")
}
for i, j := range cfg.Jobs {
if j.JobspecTOML == "" {
return fmt.Errorf("job[%d]: jobspec_toml is required", i)
}
if len(j.NodeLabels) == 0 {
return fmt.Errorf("job[%d]: node_labels is required", i)
}
for k, v := range j.NodeLabels {
if k == "" || v == "" {
return fmt.Errorf("job[%d]: node_labels key and value must be non-empty (key=%q value=%q)", i, k, v)
}
}
}

return nil
}

func (ProposeJobsChangeset) Apply(e cldf.Environment, input ProposeJobsInput) (cldf.ChangesetOutput, error) {
deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name}
_, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDProposeJobs, deps, input)

return cldf.ChangesetOutput{}, err
}
69 changes: 69 additions & 0 deletions jd/changesets/jobs/propose_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package jobs

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"
)

func TestProposeJobsChangeset(t *testing.T) {
t.Parallel()

t.Run("proposes job from inline TOML", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

registerTestNode(t, rt)

_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
Domain: "keystone",
Jobs: []JobSpec{{
NodeLabels: map[string]string{"target": "oracle-1"},
JobspecTOML: testJobTOML,
}},
})
require.NoError(t, err)
})

t.Run("precondition — missing domain rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}, JobspecTOML: testJobTOML}},
})
require.ErrorContains(t, err, "domain is required")
})

t.Run("precondition — missing node_labels rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
Domain: "keystone",
Jobs: []JobSpec{{JobspecTOML: testJobTOML}},
})
require.ErrorContains(t, err, "node_labels is required")
})

t.Run("precondition — missing jobspec rejected", func(t *testing.T) {
t.Parallel()

rt, err := runtime.New(t.Context())
require.NoError(t, err)

_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
Domain: "keystone",
Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}}},
})
require.ErrorContains(t, err, "jobspec_toml is required")
})
}
Loading
Loading