Skip to content

Commit 0d2db11

Browse files
committed
feat: jd changesets
1 parent c71b37c commit 0d2db11

23 files changed

Lines changed: 1713 additions & 151 deletions

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
golang.org/x/crypto v0.53.0
3232
golang.org/x/mod v0.37.0
3333
golang.org/x/sync v0.21.0
34+
google.golang.org/grpc v1.81.1
3435
gopkg.in/yaml.v3 v3.0.1
3536
)
3637

@@ -320,7 +321,6 @@ require (
320321
golang.org/x/tools v0.45.0 // indirect
321322
google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect
322323
google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect
323-
google.golang.org/grpc v1.81.1 // indirect
324324
google.golang.org/protobuf v1.36.11 // indirect
325325
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
326326
gopkg.in/inf.v0 v0.9.1 // indirect

jd/changesets/jobs/delete.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package jobs
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
8+
fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations"
9+
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
10+
11+
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
12+
)
13+
14+
var _ cldf.ChangeSetV2[DeleteJobsInput] = DeleteJobsChangeset{}
15+
16+
// DeleteJobsInput is the serializable input of DeleteJobsChangeset.
17+
type DeleteJobsInput = jdops.DeleteJobsInput
18+
19+
// DeleteJobsChangeset deletes jobs.
20+
// Only jobs with proposals in PROPOSED, APPROVED, or PENDING state are eligible.
21+
type DeleteJobsChangeset struct{}
22+
23+
func (DeleteJobsChangeset) VerifyPreconditions(env cldf.Environment, cfg DeleteJobsInput) error {
24+
if len(cfg.JobIDs) == 0 {
25+
return errors.New("no job_ids provided")
26+
}
27+
seen := make(map[string]struct{}, len(cfg.JobIDs))
28+
for _, id := range cfg.JobIDs {
29+
if id == "" {
30+
return errors.New("job id cannot be empty")
31+
}
32+
if _, ok := seen[id]; ok {
33+
return fmt.Errorf("duplicate job id %q", id)
34+
}
35+
seen[id] = struct{}{}
36+
}
37+
38+
jobs, err := env.Offchain.ListJobs(env.GetContext(), &jobv1.ListJobsRequest{
39+
Filter: &jobv1.ListJobsRequest_Filter{Ids: cfg.JobIDs, IncludeDeleted: true},
40+
})
41+
if err != nil {
42+
return fmt.Errorf("failed to list jobs: %w", err)
43+
}
44+
for _, j := range jobs.Jobs {
45+
if j.DeletedAt != nil {
46+
return fmt.Errorf("job %q is already deleted", j.Id)
47+
}
48+
}
49+
if len(jobs.Jobs) != len(cfg.JobIDs) {
50+
found := make([]string, 0, len(jobs.Jobs))
51+
for _, j := range jobs.Jobs {
52+
found = append(found, j.Id)
53+
}
54+
55+
return fmt.Errorf("not all jobs found: requested %v, found %v", cfg.JobIDs, found)
56+
}
57+
58+
return nil
59+
}
60+
61+
func (DeleteJobsChangeset) Apply(e cldf.Environment, input DeleteJobsInput) (cldf.ChangesetOutput, error) {
62+
deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name}
63+
_, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDDeleteJobs, deps, input)
64+
65+
return cldf.ChangesetOutput{}, err
66+
}

jd/changesets/jobs/delete_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package jobs
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"
9+
)
10+
11+
func TestDeleteJobsChangeset(t *testing.T) {
12+
t.Parallel()
13+
14+
t.Run("deletes eligible jobs", func(t *testing.T) {
15+
t.Parallel()
16+
17+
rt, err := runtime.New(t.Context())
18+
require.NoError(t, err)
19+
20+
nodeID := registerTestNode(t, rt)
21+
jobID := proposeJob(t, rt, nodeID)
22+
23+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
24+
JobIDs: []string{jobID},
25+
})
26+
require.NoError(t, err)
27+
})
28+
29+
t.Run("precondition — empty job list rejected", func(t *testing.T) {
30+
t.Parallel()
31+
32+
rt, err := runtime.New(t.Context())
33+
require.NoError(t, err)
34+
35+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{})
36+
require.ErrorContains(t, err, "no job_ids provided")
37+
})
38+
39+
t.Run("precondition — duplicate job id rejected", func(t *testing.T) {
40+
t.Parallel()
41+
42+
rt, err := runtime.New(t.Context())
43+
require.NoError(t, err)
44+
45+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
46+
JobIDs: []string{"job_1", "job_1"},
47+
})
48+
require.ErrorContains(t, err, "duplicate job id")
49+
})
50+
51+
t.Run("precondition — non-existent job rejected", func(t *testing.T) {
52+
t.Parallel()
53+
54+
rt, err := runtime.New(t.Context())
55+
require.NoError(t, err)
56+
57+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
58+
JobIDs: []string{"job_does_not_exist"},
59+
})
60+
require.Error(t, err)
61+
})
62+
63+
t.Run("precondition — already-deleted job rejected", func(t *testing.T) {
64+
t.Parallel()
65+
66+
rt, err := runtime.New(t.Context())
67+
require.NoError(t, err)
68+
69+
nodeID := registerTestNode(t, rt)
70+
jobID := proposeJob(t, rt, nodeID)
71+
72+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
73+
JobIDs: []string{jobID},
74+
})
75+
require.NoError(t, err)
76+
77+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
78+
JobIDs: []string{jobID},
79+
})
80+
require.ErrorContains(t, err, "already deleted")
81+
})
82+
}

jd/changesets/jobs/doc.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Package jobs provides changesets for managing Job Distributor jobs.
2+
//
3+
// # Usage
4+
//
5+
// import "github.com/smartcontractkit/cld-changesets/jd/changesets/jobs"
6+
//
7+
// // Propose jobs to nodes matched by domain and label:
8+
// _, err := runtime.ExecChangeset(rt, jobs.ProposeJobsChangeset{}, jobs.ProposeJobsInput{
9+
// Domain: "keystone",
10+
// Jobs: []jobs.JobSpec{{
11+
// NodeLabels: map[string]string{"target": "oracle-1"},
12+
// JobspecTOML: "...",
13+
// }},
14+
// })
15+
//
16+
// // Revoke jobs by ID (idempotent — already-absent IDs are not an error):
17+
// _, err = runtime.ExecChangeset(rt, jobs.RevokeJobsChangeset{}, jobs.RevokeJobsInput{
18+
// JobIDs: []string{"job-id-1"},
19+
// })
20+
//
21+
// // Delete jobs by ID (preconditions verify existence and eligibility):
22+
// _, err = runtime.ExecChangeset(rt, jobs.DeleteJobsChangeset{}, jobs.DeleteJobsInput{
23+
// JobIDs: []string{"job-id-1"},
24+
// })
25+
package jobs

jd/changesets/jobs/jobs_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package jobs
2+
3+
import (
4+
"testing"
5+
6+
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"
10+
11+
jdnodes "github.com/smartcontractkit/cld-changesets/jd/changesets/nodes"
12+
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
13+
)
14+
15+
const testJobTOML = `
16+
type = "offchainreporting2"
17+
name = "test-job"
18+
contractID = "0x0000000000000000000000000000000000000001"
19+
externalJobID = "00000000-0000-0000-0000-000000000001"
20+
`
21+
22+
func registerTestNode(t *testing.T, rt *runtime.Runtime) string {
23+
t.Helper()
24+
_, err := runtime.ExecChangeset(rt, jdnodes.RegisterNodesChangeset{}, jdnodes.RegisterNodesInput{
25+
Domain: "keystone",
26+
Nodes: []jdnodes.NodeToRegister{{
27+
Name: "oracle-1",
28+
CSAKey: "csa-key-1",
29+
Labels: map[string]string{"target": "oracle-1"},
30+
}},
31+
})
32+
require.NoError(t, err)
33+
node, err := jdops.ListNodeByPublicKey(t.Context(), rt.Environment().Offchain, "csa-key-1")
34+
require.NoError(t, err)
35+
require.NotNil(t, node, "node not found after registration")
36+
37+
return node.GetId()
38+
}
39+
40+
func proposeJob(t *testing.T, rt *runtime.Runtime, nodeID string) string {
41+
t.Helper()
42+
resp, err := rt.Environment().Offchain.ProposeJob(t.Context(), &jobv1.ProposeJobRequest{
43+
NodeId: nodeID,
44+
Spec: testJobTOML,
45+
})
46+
require.NoError(t, err)
47+
48+
return resp.GetProposal().GetJobId()
49+
}

jd/changesets/jobs/propose.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package jobs
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
8+
fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations"
9+
10+
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
11+
)
12+
13+
var _ cldf.ChangeSetV2[ProposeJobsInput] = ProposeJobsChangeset{}
14+
15+
// JobSpec is the per-job input for ProposeJobsInput.
16+
type JobSpec = jdops.JobSpec
17+
18+
// ProposeJobsInput is the serializable input of ProposeJobsChangeset.
19+
type ProposeJobsInput = jdops.ProposeJobsInput
20+
21+
// ProposeJobsChangeset proposes a batch of job specs to nodes matched by label.
22+
type ProposeJobsChangeset struct{}
23+
24+
func (ProposeJobsChangeset) VerifyPreconditions(_ cldf.Environment, cfg ProposeJobsInput) error {
25+
if cfg.Domain == "" {
26+
return errors.New("domain is required")
27+
}
28+
if len(cfg.Jobs) == 0 {
29+
return errors.New("no jobs provided")
30+
}
31+
for i, j := range cfg.Jobs {
32+
if j.JobspecTOML == "" {
33+
return fmt.Errorf("job[%d]: jobspec_toml is required", i)
34+
}
35+
if len(j.NodeLabels) == 0 {
36+
return fmt.Errorf("job[%d]: node_labels is required", i)
37+
}
38+
for k, v := range j.NodeLabels {
39+
if k == "" || v == "" {
40+
return fmt.Errorf("job[%d]: node_labels key and value must be non-empty (key=%q value=%q)", i, k, v)
41+
}
42+
}
43+
}
44+
45+
return nil
46+
}
47+
48+
func (ProposeJobsChangeset) Apply(e cldf.Environment, input ProposeJobsInput) (cldf.ChangesetOutput, error) {
49+
deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name}
50+
_, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDProposeJobs, deps, input)
51+
52+
return cldf.ChangesetOutput{}, err
53+
}

jd/changesets/jobs/propose_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package jobs
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"
9+
)
10+
11+
func TestProposeJobsChangeset(t *testing.T) {
12+
t.Parallel()
13+
14+
t.Run("proposes job from inline TOML", func(t *testing.T) {
15+
t.Parallel()
16+
17+
rt, err := runtime.New(t.Context())
18+
require.NoError(t, err)
19+
20+
registerTestNode(t, rt)
21+
22+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
23+
Domain: "keystone",
24+
Jobs: []JobSpec{{
25+
NodeLabels: map[string]string{"target": "oracle-1"},
26+
JobspecTOML: testJobTOML,
27+
}},
28+
})
29+
require.NoError(t, err)
30+
})
31+
32+
t.Run("precondition — missing domain rejected", func(t *testing.T) {
33+
t.Parallel()
34+
35+
rt, err := runtime.New(t.Context())
36+
require.NoError(t, err)
37+
38+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
39+
Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}, JobspecTOML: testJobTOML}},
40+
})
41+
require.ErrorContains(t, err, "domain is required")
42+
})
43+
44+
t.Run("precondition — missing node_labels rejected", func(t *testing.T) {
45+
t.Parallel()
46+
47+
rt, err := runtime.New(t.Context())
48+
require.NoError(t, err)
49+
50+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
51+
Domain: "keystone",
52+
Jobs: []JobSpec{{JobspecTOML: testJobTOML}},
53+
})
54+
require.ErrorContains(t, err, "node_labels is required")
55+
})
56+
57+
t.Run("precondition — missing jobspec rejected", func(t *testing.T) {
58+
t.Parallel()
59+
60+
rt, err := runtime.New(t.Context())
61+
require.NoError(t, err)
62+
63+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
64+
Domain: "keystone",
65+
Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}}},
66+
})
67+
require.ErrorContains(t, err, "jobspec_toml is required")
68+
})
69+
}

0 commit comments

Comments
 (0)