Skip to content

Commit 5dcc1e3

Browse files
committed
feat: jd changesets
1 parent f10f84c commit 5dcc1e3

17 files changed

Lines changed: 1659 additions & 151 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
golang.org/x/crypto v0.53.0
3232
golang.org/x/mod v0.37.0
3333
golang.org/x/sync v0.21.0
34+
google.golang.org/grpc v1.81.1
3435
gopkg.in/yaml.v3 v3.0.1
3536
)
3637

@@ -320,7 +321,6 @@ require (
320321
golang.org/x/tools v0.45.0 // indirect
321322
google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect
322323
google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect
323-
google.golang.org/grpc v1.81.1 // indirect
324324
google.golang.org/protobuf v1.36.11 // indirect
325325
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
326326
gopkg.in/inf.v0 v0.9.1 // indirect

jd/changesets/jobs/delete.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package jobs
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
8+
fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations"
9+
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
10+
11+
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
12+
)
13+
14+
var _ cldf.ChangeSetV2[DeleteJobsInput] = DeleteJobsChangeset{}
15+
16+
// DeleteJobsInput is the serializable input of DeleteJobsChangeset.
17+
type DeleteJobsInput = jdops.DeleteJobsInput
18+
19+
// DeleteJobsChangeset deletes jobs.
20+
// Only jobs with proposals in PROPOSED, APPROVED, or PENDING state are eligible.
21+
type DeleteJobsChangeset struct{}
22+
23+
func (DeleteJobsChangeset) VerifyPreconditions(env cldf.Environment, cfg DeleteJobsInput) error {
24+
if len(cfg.JobIDs) == 0 {
25+
return errors.New("no job_ids provided")
26+
}
27+
seen := make(map[string]struct{}, len(cfg.JobIDs))
28+
for _, id := range cfg.JobIDs {
29+
if id == "" {
30+
return errors.New("job id cannot be empty")
31+
}
32+
if _, ok := seen[id]; ok {
33+
return fmt.Errorf("duplicate job id %q", id)
34+
}
35+
seen[id] = struct{}{}
36+
}
37+
38+
jobs, err := env.Offchain.ListJobs(env.GetContext(), &jobv1.ListJobsRequest{
39+
Filter: &jobv1.ListJobsRequest_Filter{Ids: cfg.JobIDs, IncludeDeleted: true},
40+
})
41+
if err != nil {
42+
return fmt.Errorf("failed to list jobs: %w", err)
43+
}
44+
for _, j := range jobs.Jobs {
45+
if j.DeletedAt != nil {
46+
return fmt.Errorf("job %q is already deleted", j.Id)
47+
}
48+
}
49+
if len(jobs.Jobs) != len(cfg.JobIDs) {
50+
found := make([]string, 0, len(jobs.Jobs))
51+
for _, j := range jobs.Jobs {
52+
found = append(found, j.Id)
53+
}
54+
55+
return fmt.Errorf("not all jobs found: requested %v, found %v", cfg.JobIDs, found)
56+
}
57+
58+
return nil
59+
}
60+
61+
func (DeleteJobsChangeset) Apply(e cldf.Environment, input DeleteJobsInput) (cldf.ChangesetOutput, error) {
62+
deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name}
63+
_, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDDeleteJobs, deps, input)
64+
65+
return cldf.ChangesetOutput{}, err
66+
}

jd/changesets/jobs/doc.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Package jobs provides changesets for managing Job Distributor jobs.
2+
//
3+
// # Usage
4+
//
5+
// import "github.com/smartcontractkit/cld-changesets/jd/changesets/jobs"
6+
//
7+
// // Propose jobs to nodes matched by domain and label:
8+
// _, err := runtime.ExecChangeset(rt, jobs.ProposeJobsChangeset{}, jobs.ProposeJobsInput{
9+
// Domain: "keystone",
10+
// Jobs: []jobs.JobSpec{{
11+
// NodeLabels: map[string]string{"target": "oracle-1"},
12+
// JobspecTOML: "...",
13+
// }},
14+
// })
15+
//
16+
// // Revoke jobs by ID (idempotent — already-absent IDs are not an error):
17+
// _, err = runtime.ExecChangeset(rt, jobs.RevokeJobsChangeset{}, jobs.RevokeJobsInput{
18+
// JobIDs: []string{"job-id-1"},
19+
// })
20+
//
21+
// // Delete jobs by ID (preconditions verify existence and eligibility):
22+
// _, err = runtime.ExecChangeset(rt, jobs.DeleteJobsChangeset{}, jobs.DeleteJobsInput{
23+
// JobIDs: []string{"job-id-1"},
24+
// })
25+
package jobs

jd/changesets/jobs/jobs_test.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package jobs
2+
3+
import (
4+
"testing"
5+
6+
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime"
10+
11+
jdnodes "github.com/smartcontractkit/cld-changesets/jd/changesets/nodes"
12+
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
13+
)
14+
15+
const testJobTOML = `
16+
type = "offchainreporting2"
17+
name = "test-job"
18+
contractID = "0x0000000000000000000000000000000000000001"
19+
externalJobID = "00000000-0000-0000-0000-000000000001"
20+
`
21+
22+
func registerTestNode(t *testing.T, rt *runtime.Runtime) string {
23+
t.Helper()
24+
_, err := runtime.ExecChangeset(rt, jdnodes.RegisterNodesChangeset{}, jdnodes.RegisterNodesInput{
25+
Domain: "keystone",
26+
Nodes: []jdnodes.NodeToRegister{{
27+
Name: "oracle-1",
28+
CSAKey: "csa-key-1",
29+
Labels: map[string]string{"target": "oracle-1"},
30+
}},
31+
})
32+
require.NoError(t, err)
33+
node, err := jdops.ListNodeByPublicKey(t.Context(), rt.Environment().Offchain, "csa-key-1")
34+
require.NoError(t, err)
35+
require.NotNil(t, node, "node not found after registration")
36+
37+
return node.GetId()
38+
}
39+
40+
func TestProposeJobsChangeset(t *testing.T) {
41+
t.Parallel()
42+
43+
t.Run("proposes job from inline TOML", func(t *testing.T) {
44+
t.Parallel()
45+
46+
rt, err := runtime.New(t.Context())
47+
require.NoError(t, err)
48+
49+
registerTestNode(t, rt)
50+
51+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
52+
Domain: "keystone",
53+
Jobs: []JobSpec{{
54+
NodeLabels: map[string]string{"target": "oracle-1"},
55+
JobspecTOML: testJobTOML,
56+
}},
57+
})
58+
require.NoError(t, err)
59+
})
60+
61+
t.Run("precondition — missing domain rejected", func(t *testing.T) {
62+
t.Parallel()
63+
64+
rt, err := runtime.New(t.Context())
65+
require.NoError(t, err)
66+
67+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
68+
Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}, JobspecTOML: testJobTOML}},
69+
})
70+
require.ErrorContains(t, err, "domain is required")
71+
})
72+
73+
t.Run("precondition — missing node_labels rejected", func(t *testing.T) {
74+
t.Parallel()
75+
76+
rt, err := runtime.New(t.Context())
77+
require.NoError(t, err)
78+
79+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
80+
Domain: "keystone",
81+
Jobs: []JobSpec{{JobspecTOML: testJobTOML}},
82+
})
83+
require.ErrorContains(t, err, "node_labels is required")
84+
})
85+
86+
t.Run("precondition — missing jobspec rejected", func(t *testing.T) {
87+
t.Parallel()
88+
89+
rt, err := runtime.New(t.Context())
90+
require.NoError(t, err)
91+
92+
_, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{
93+
Domain: "keystone",
94+
Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}}},
95+
})
96+
require.ErrorContains(t, err, "jobspec_toml is required")
97+
})
98+
}
99+
100+
func proposeJob(t *testing.T, rt *runtime.Runtime, nodeID string) string {
101+
t.Helper()
102+
resp, err := rt.Environment().Offchain.ProposeJob(t.Context(), &jobv1.ProposeJobRequest{
103+
NodeId: nodeID,
104+
Spec: testJobTOML,
105+
})
106+
require.NoError(t, err)
107+
108+
return resp.GetProposal().GetJobId()
109+
}
110+
111+
func TestRevokeJobsChangeset(t *testing.T) {
112+
t.Parallel()
113+
114+
t.Run("revokes a proposed job", func(t *testing.T) {
115+
t.Parallel()
116+
117+
rt, err := runtime.New(t.Context())
118+
require.NoError(t, err)
119+
120+
nodeID := registerTestNode(t, rt)
121+
jobID := proposeJob(t, rt, nodeID)
122+
123+
_, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{
124+
JobIDs: []string{jobID},
125+
})
126+
require.NoError(t, err)
127+
})
128+
129+
t.Run("non-existent job treated as already absent — no error", func(t *testing.T) {
130+
t.Parallel()
131+
132+
rt, err := runtime.New(t.Context())
133+
require.NoError(t, err)
134+
135+
_, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{
136+
JobIDs: []string{"job_nonexistent"},
137+
})
138+
require.NoError(t, err)
139+
})
140+
141+
t.Run("precondition — empty job list rejected", func(t *testing.T) {
142+
t.Parallel()
143+
144+
rt, err := runtime.New(t.Context())
145+
require.NoError(t, err)
146+
147+
_, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{})
148+
require.ErrorContains(t, err, "no job_ids provided")
149+
})
150+
151+
t.Run("precondition — duplicate job id rejected", func(t *testing.T) {
152+
t.Parallel()
153+
154+
rt, err := runtime.New(t.Context())
155+
require.NoError(t, err)
156+
157+
_, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{
158+
JobIDs: []string{"job_1", "job_1"},
159+
})
160+
require.ErrorContains(t, err, "duplicate job id")
161+
})
162+
}
163+
164+
func TestDeleteJobsChangeset(t *testing.T) {
165+
t.Parallel()
166+
167+
t.Run("deletes eligible jobs", func(t *testing.T) {
168+
t.Parallel()
169+
170+
rt, err := runtime.New(t.Context())
171+
require.NoError(t, err)
172+
173+
nodeID := registerTestNode(t, rt)
174+
jobID := proposeJob(t, rt, nodeID)
175+
176+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
177+
JobIDs: []string{jobID},
178+
})
179+
require.NoError(t, err)
180+
})
181+
182+
t.Run("precondition — empty job list rejected", func(t *testing.T) {
183+
t.Parallel()
184+
185+
rt, err := runtime.New(t.Context())
186+
require.NoError(t, err)
187+
188+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{})
189+
require.ErrorContains(t, err, "no job_ids provided")
190+
})
191+
192+
t.Run("precondition — duplicate job id rejected", func(t *testing.T) {
193+
t.Parallel()
194+
195+
rt, err := runtime.New(t.Context())
196+
require.NoError(t, err)
197+
198+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
199+
JobIDs: []string{"job_1", "job_1"},
200+
})
201+
require.ErrorContains(t, err, "duplicate job id")
202+
})
203+
204+
t.Run("precondition — non-existent job rejected", func(t *testing.T) {
205+
t.Parallel()
206+
207+
rt, err := runtime.New(t.Context())
208+
require.NoError(t, err)
209+
210+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
211+
JobIDs: []string{"job_does_not_exist"},
212+
})
213+
require.Error(t, err)
214+
})
215+
216+
t.Run("precondition — already-deleted job rejected", func(t *testing.T) {
217+
t.Parallel()
218+
219+
rt, err := runtime.New(t.Context())
220+
require.NoError(t, err)
221+
222+
nodeID := registerTestNode(t, rt)
223+
jobID := proposeJob(t, rt, nodeID)
224+
225+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
226+
JobIDs: []string{jobID},
227+
})
228+
require.NoError(t, err)
229+
230+
_, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{
231+
JobIDs: []string{jobID},
232+
})
233+
require.ErrorContains(t, err, "already deleted")
234+
})
235+
}

jd/changesets/jobs/propose.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package jobs
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
8+
fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations"
9+
10+
jdops "github.com/smartcontractkit/cld-changesets/jd/operations"
11+
)
12+
13+
var _ cldf.ChangeSetV2[ProposeJobsInput] = ProposeJobsChangeset{}
14+
15+
// JobSpec is the per-job input for ProposeJobsInput.
16+
type JobSpec = jdops.JobSpec
17+
18+
// ProposeJobsInput is the serializable input of ProposeJobsChangeset.
19+
type ProposeJobsInput = jdops.ProposeJobsInput
20+
21+
// ProposeJobsChangeset proposes a batch of job specs to nodes matched by label.
22+
type ProposeJobsChangeset struct{}
23+
24+
func (ProposeJobsChangeset) VerifyPreconditions(_ cldf.Environment, cfg ProposeJobsInput) error {
25+
if cfg.Domain == "" {
26+
return errors.New("domain is required")
27+
}
28+
if len(cfg.Jobs) == 0 {
29+
return errors.New("no jobs provided")
30+
}
31+
for i, j := range cfg.Jobs {
32+
if j.JobspecTOML == "" {
33+
return fmt.Errorf("job[%d]: jobspec_toml is required", i)
34+
}
35+
if len(j.NodeLabels) == 0 {
36+
return fmt.Errorf("job[%d]: node_labels is required", i)
37+
}
38+
for k, v := range j.NodeLabels {
39+
if k == "" || v == "" {
40+
return fmt.Errorf("job[%d]: node_labels key and value must be non-empty (key=%q value=%q)", i, k, v)
41+
}
42+
}
43+
}
44+
45+
return nil
46+
}
47+
48+
func (ProposeJobsChangeset) Apply(e cldf.Environment, input ProposeJobsInput) (cldf.ChangesetOutput, error) {
49+
deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name}
50+
_, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDProposeJobs, deps, input)
51+
52+
return cldf.ChangesetOutput{}, err
53+
}

0 commit comments

Comments
 (0)