diff --git a/go.mod b/go.mod index dd2ccbb..18ef3a1 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( golang.org/x/crypto v0.53.0 golang.org/x/mod v0.37.0 golang.org/x/sync v0.21.0 + google.golang.org/grpc v1.81.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -320,7 +321,6 @@ require ( golang.org/x/tools v0.45.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect - google.golang.org/grpc v1.81.1 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/jd/changesets/jobs/delete.go b/jd/changesets/jobs/delete.go new file mode 100644 index 0000000..57a3057 --- /dev/null +++ b/jd/changesets/jobs/delete.go @@ -0,0 +1,66 @@ +package jobs + +import ( + "errors" + "fmt" + + cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +var _ cldf.ChangeSetV2[DeleteJobsInput] = DeleteJobsChangeset{} + +// DeleteJobsInput is the serializable input of DeleteJobsChangeset. +type DeleteJobsInput = jdops.DeleteJobsInput + +// DeleteJobsChangeset deletes jobs. +// Only jobs with proposals in PROPOSED, APPROVED, or PENDING state are eligible. +type DeleteJobsChangeset struct{} + +func (DeleteJobsChangeset) VerifyPreconditions(env cldf.Environment, cfg DeleteJobsInput) error { + if len(cfg.JobIDs) == 0 { + return errors.New("no job_ids provided") + } + seen := make(map[string]struct{}, len(cfg.JobIDs)) + for _, id := range cfg.JobIDs { + if id == "" { + return errors.New("job id cannot be empty") + } + if _, ok := seen[id]; ok { + return fmt.Errorf("duplicate job id %q", id) + } + seen[id] = struct{}{} + } + + jobs, err := env.Offchain.ListJobs(env.GetContext(), &jobv1.ListJobsRequest{ + Filter: &jobv1.ListJobsRequest_Filter{Ids: cfg.JobIDs, IncludeDeleted: true}, + }) + if err != nil { + return fmt.Errorf("failed to list jobs: %w", err) + } + for _, j := range jobs.Jobs { + if j.DeletedAt != nil { + return fmt.Errorf("job %q is already deleted", j.Id) + } + } + if len(jobs.Jobs) != len(cfg.JobIDs) { + found := make([]string, 0, len(jobs.Jobs)) + for _, j := range jobs.Jobs { + found = append(found, j.Id) + } + + return fmt.Errorf("not all jobs found: requested %v, found %v", cfg.JobIDs, found) + } + + return nil +} + +func (DeleteJobsChangeset) Apply(e cldf.Environment, input DeleteJobsInput) (cldf.ChangesetOutput, error) { + deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name} + _, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDDeleteJobs, deps, input) + + return cldf.ChangesetOutput{}, err +} diff --git a/jd/changesets/jobs/delete_test.go b/jd/changesets/jobs/delete_test.go new file mode 100644 index 0000000..38b1571 --- /dev/null +++ b/jd/changesets/jobs/delete_test.go @@ -0,0 +1,82 @@ +package jobs + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestDeleteJobsChangeset(t *testing.T) { + t.Parallel() + + t.Run("deletes eligible jobs", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + nodeID := registerTestNode(t, rt) + jobID := proposeJob(t, rt, nodeID) + + _, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{ + JobIDs: []string{jobID}, + }) + require.NoError(t, err) + }) + + t.Run("precondition — empty job list rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{}) + require.ErrorContains(t, err, "no job_ids provided") + }) + + t.Run("precondition — duplicate job id rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{ + JobIDs: []string{"job_1", "job_1"}, + }) + require.ErrorContains(t, err, "duplicate job id") + }) + + t.Run("precondition — non-existent job rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{ + JobIDs: []string{"job_does_not_exist"}, + }) + require.Error(t, err) + }) + + t.Run("precondition — already-deleted job rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + nodeID := registerTestNode(t, rt) + jobID := proposeJob(t, rt, nodeID) + + _, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{ + JobIDs: []string{jobID}, + }) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DeleteJobsChangeset{}, DeleteJobsInput{ + JobIDs: []string{jobID}, + }) + require.ErrorContains(t, err, "already deleted") + }) +} diff --git a/jd/changesets/jobs/doc.go b/jd/changesets/jobs/doc.go new file mode 100644 index 0000000..f4d5553 --- /dev/null +++ b/jd/changesets/jobs/doc.go @@ -0,0 +1,25 @@ +// Package jobs provides changesets for managing Job Distributor jobs. +// +// # Usage +// +// import "github.com/smartcontractkit/cld-changesets/jd/changesets/jobs" +// +// // Propose jobs to nodes matched by domain and label: +// _, err := runtime.ExecChangeset(rt, jobs.ProposeJobsChangeset{}, jobs.ProposeJobsInput{ +// Domain: "keystone", +// Jobs: []jobs.JobSpec{{ +// NodeLabels: map[string]string{"target": "oracle-1"}, +// JobspecTOML: "...", +// }}, +// }) +// +// // Revoke jobs by ID (idempotent — already-absent IDs are not an error): +// _, err = runtime.ExecChangeset(rt, jobs.RevokeJobsChangeset{}, jobs.RevokeJobsInput{ +// JobIDs: []string{"job-id-1"}, +// }) +// +// // Delete jobs by ID (preconditions verify existence and eligibility): +// _, err = runtime.ExecChangeset(rt, jobs.DeleteJobsChangeset{}, jobs.DeleteJobsInput{ +// JobIDs: []string{"job-id-1"}, +// }) +package jobs diff --git a/jd/changesets/jobs/jobs_test.go b/jd/changesets/jobs/jobs_test.go new file mode 100644 index 0000000..c4eb795 --- /dev/null +++ b/jd/changesets/jobs/jobs_test.go @@ -0,0 +1,49 @@ +package jobs + +import ( + "testing" + + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" + + jdnodes "github.com/smartcontractkit/cld-changesets/jd/changesets/nodes" + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +const testJobTOML = ` +type = "offchainreporting2" +name = "test-job" +contractID = "0x0000000000000000000000000000000000000001" +externalJobID = "00000000-0000-0000-0000-000000000001" +` + +func registerTestNode(t *testing.T, rt *runtime.Runtime) string { + t.Helper() + _, err := runtime.ExecChangeset(rt, jdnodes.RegisterNodesChangeset{}, jdnodes.RegisterNodesInput{ + Domain: "keystone", + Nodes: []jdnodes.NodeToRegister{{ + Name: "oracle-1", + CSAKey: "csa-key-1", + Labels: map[string]string{"target": "oracle-1"}, + }}, + }) + require.NoError(t, err) + node, err := jdops.ListNodeByPublicKey(t.Context(), rt.Environment().Offchain, "csa-key-1") + require.NoError(t, err) + require.NotNil(t, node, "node not found after registration") + + return node.GetId() +} + +func proposeJob(t *testing.T, rt *runtime.Runtime, nodeID string) string { + t.Helper() + resp, err := rt.Environment().Offchain.ProposeJob(t.Context(), &jobv1.ProposeJobRequest{ + NodeId: nodeID, + Spec: testJobTOML, + }) + require.NoError(t, err) + + return resp.GetProposal().GetJobId() +} diff --git a/jd/changesets/jobs/propose.go b/jd/changesets/jobs/propose.go new file mode 100644 index 0000000..d97e5c1 --- /dev/null +++ b/jd/changesets/jobs/propose.go @@ -0,0 +1,53 @@ +package jobs + +import ( + "errors" + "fmt" + + cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +var _ cldf.ChangeSetV2[ProposeJobsInput] = ProposeJobsChangeset{} + +// JobSpec is the per-job input for ProposeJobsInput. +type JobSpec = jdops.JobSpec + +// ProposeJobsInput is the serializable input of ProposeJobsChangeset. +type ProposeJobsInput = jdops.ProposeJobsInput + +// ProposeJobsChangeset proposes a batch of job specs to nodes matched by label. +type ProposeJobsChangeset struct{} + +func (ProposeJobsChangeset) VerifyPreconditions(_ cldf.Environment, cfg ProposeJobsInput) error { + if cfg.Domain == "" { + return errors.New("domain is required") + } + if len(cfg.Jobs) == 0 { + return errors.New("no jobs provided") + } + for i, j := range cfg.Jobs { + if j.JobspecTOML == "" { + return fmt.Errorf("job[%d]: jobspec_toml is required", i) + } + if len(j.NodeLabels) == 0 { + return fmt.Errorf("job[%d]: node_labels is required", i) + } + for k, v := range j.NodeLabels { + if k == "" || v == "" { + return fmt.Errorf("job[%d]: node_labels key and value must be non-empty (key=%q value=%q)", i, k, v) + } + } + } + + return nil +} + +func (ProposeJobsChangeset) Apply(e cldf.Environment, input ProposeJobsInput) (cldf.ChangesetOutput, error) { + deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name} + _, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDProposeJobs, deps, input) + + return cldf.ChangesetOutput{}, err +} diff --git a/jd/changesets/jobs/propose_test.go b/jd/changesets/jobs/propose_test.go new file mode 100644 index 0000000..34a87bb --- /dev/null +++ b/jd/changesets/jobs/propose_test.go @@ -0,0 +1,69 @@ +package jobs + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestProposeJobsChangeset(t *testing.T) { + t.Parallel() + + t.Run("proposes job from inline TOML", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + registerTestNode(t, rt) + + _, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{ + Domain: "keystone", + Jobs: []JobSpec{{ + NodeLabels: map[string]string{"target": "oracle-1"}, + JobspecTOML: testJobTOML, + }}, + }) + require.NoError(t, err) + }) + + t.Run("precondition — missing domain rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{ + Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}, JobspecTOML: testJobTOML}}, + }) + require.ErrorContains(t, err, "domain is required") + }) + + t.Run("precondition — missing node_labels rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{ + Domain: "keystone", + Jobs: []JobSpec{{JobspecTOML: testJobTOML}}, + }) + require.ErrorContains(t, err, "node_labels is required") + }) + + t.Run("precondition — missing jobspec rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, ProposeJobsChangeset{}, ProposeJobsInput{ + Domain: "keystone", + Jobs: []JobSpec{{NodeLabels: map[string]string{"k": "v"}}}, + }) + require.ErrorContains(t, err, "jobspec_toml is required") + }) +} diff --git a/jd/changesets/jobs/revoke.go b/jd/changesets/jobs/revoke.go new file mode 100644 index 0000000..6bb4dab --- /dev/null +++ b/jd/changesets/jobs/revoke.go @@ -0,0 +1,44 @@ +package jobs + +import ( + "errors" + "fmt" + + cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +var _ cldf.ChangeSetV2[RevokeJobsInput] = RevokeJobsChangeset{} + +// RevokeJobsInput is the serializable input of RevokeJobsChangeset. +type RevokeJobsInput = jdops.RevokeJobsInput + +// RevokeJobsChangeset revokes jobs. +type RevokeJobsChangeset struct{} + +func (RevokeJobsChangeset) VerifyPreconditions(_ cldf.Environment, cfg RevokeJobsInput) error { + if len(cfg.JobIDs) == 0 { + return errors.New("no job_ids provided") + } + seen := make(map[string]struct{}, len(cfg.JobIDs)) + for _, id := range cfg.JobIDs { + if id == "" { + return errors.New("job id cannot be empty") + } + if _, ok := seen[id]; ok { + return fmt.Errorf("duplicate job id %q", id) + } + seen[id] = struct{}{} + } + + return nil +} + +func (RevokeJobsChangeset) Apply(e cldf.Environment, input RevokeJobsInput) (cldf.ChangesetOutput, error) { + deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name} + _, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDRevokeJobs, deps, input) + + return cldf.ChangesetOutput{}, err +} diff --git a/jd/changesets/jobs/revoke_test.go b/jd/changesets/jobs/revoke_test.go new file mode 100644 index 0000000..6580d70 --- /dev/null +++ b/jd/changesets/jobs/revoke_test.go @@ -0,0 +1,62 @@ +package jobs + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestRevokeJobsChangeset(t *testing.T) { + t.Parallel() + + t.Run("revokes a proposed job", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + nodeID := registerTestNode(t, rt) + jobID := proposeJob(t, rt, nodeID) + + _, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{ + JobIDs: []string{jobID}, + }) + require.NoError(t, err) + }) + + t.Run("non-existent job treated as already absent — no error", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{ + JobIDs: []string{"job_nonexistent"}, + }) + require.NoError(t, err) + }) + + t.Run("precondition — empty job list rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{}) + require.ErrorContains(t, err, "no job_ids provided") + }) + + t.Run("precondition — duplicate job id rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RevokeJobsChangeset{}, RevokeJobsInput{ + JobIDs: []string{"job_1", "job_1"}, + }) + require.ErrorContains(t, err, "duplicate job id") + }) +} diff --git a/jd/changesets/jobspec.go b/jd/changesets/jobspec.go deleted file mode 100644 index 720a92d..0000000 --- a/jd/changesets/jobspec.go +++ /dev/null @@ -1,150 +0,0 @@ -package changesets - -import ( - "errors" - "fmt" - - jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" - - cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" -) - -var ( - // RevokeJobsChangeset revokes job proposals with the given jobIDs through JD. It can only be used when - // each proposal is in the proposed or cancelled state in JD. - RevokeJobsChangeset = cldf.CreateChangeSet(revokeJobsLogic, revokeJobsPrecondition) - - // DeleteJobsChangeset sends a delete request to the node where each job is running and marks them as deleted in Job Distributor. - // If the node is not connected or the delete request fails, the deletion process is halted. - // Nodes are expected to cancel the job once the request is sent by JD. - DeleteJobsChangeset = cldf.CreateChangeSet(deleteJobsLogic, deleteJobsPrecondition) -) - -func revokeJobsPrecondition(env cldf.Environment, jobIDs []string) error { - proposals, err := env.Offchain.ListProposals(env.GetContext(), &jobv1.ListProposalsRequest{ - Filter: &jobv1.ListProposalsRequest_Filter{ - JobIds: jobIDs, - }, - }) - if err != nil { - return fmt.Errorf("failed to list proposals for jobIDs %v: %w", jobIDs, err) - } - found := make(map[string]struct{}, len(proposals.Proposals)) - for _, proposal := range proposals.Proposals { - if proposal.Status != jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED && proposal.Status != jobv1.ProposalStatus_PROPOSAL_STATUS_CANCELLED { - return fmt.Errorf("proposal %s is not in PROPOSED or CANCELLED state", proposal.Id) - } - found[proposal.JobId] = struct{}{} - } - for _, jobID := range jobIDs { - if _, ok := found[jobID]; !ok { - return fmt.Errorf("no proposal found for jobID %s", jobID) - } - } - - return nil -} - -func revokeJobsLogic(env cldf.Environment, jobIDs []string) (cldf.ChangesetOutput, error) { - var successfullyRevoked []string - for _, jobID := range jobIDs { - res, err := env.Offchain.RevokeJob(env.GetContext(), &jobv1.RevokeJobRequest{ - IdOneof: &jobv1.RevokeJobRequest_Id{Id: jobID}, - }) - if err != nil { - return cldf.ChangesetOutput{}, fmt.Errorf("failed to revoke job %s: %w", jobID, err) - } - if res == nil { - return cldf.ChangesetOutput{}, fmt.Errorf("revoke job response is nil for job %s", jobID) - } - if res.Proposal == nil || res.Proposal.Status != jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED { - return cldf.ChangesetOutput{}, fmt.Errorf("revoke job %s response is not in revoked state, got %s", jobID, res.Proposal.GetStatus()) - } - successfullyRevoked = append(successfullyRevoked, jobID) - } - env.Logger.Infof("successfully revoked jobs %v", successfullyRevoked) - - return cldf.ChangesetOutput{}, nil -} - -func deleteJobsPrecondition(env cldf.Environment, jobIDs []string) error { - jobs, err := env.Offchain.ListJobs(env.GetContext(), &jobv1.ListJobsRequest{ - Filter: &jobv1.ListJobsRequest_Filter{ - Ids: jobIDs, - }, - }) - if err != nil { - return fmt.Errorf("failed to list jobs for jobIDs %v: %w", jobIDs, err) - } - for _, job := range jobs.Jobs { - if job.DeletedAt != nil { - return fmt.Errorf("job %s is already deleted", job.Id) - } - } - if len(jobs.Jobs) != len(jobIDs) { - found := make([]string, 0, len(jobs.Jobs)) - for _, job := range jobs.Jobs { - found = append(found, job.Id) - } - - return fmt.Errorf("not all jobs found in listJobs response, returned job ids %v, expected %v", found, jobIDs) - } - - return nil -} - -func deleteJobsLogic(env cldf.Environment, jobIDs []string) (cldf.ChangesetOutput, error) { - jobIDsToDelete, err := jobsToDelete(env, jobIDs) - if err != nil { - return cldf.ChangesetOutput{}, fmt.Errorf("failed to get jobIDs to delete: %w", err) - } - if len(jobIDsToDelete) == 0 { - return cldf.ChangesetOutput{}, errors.New("no jobs to delete: no proposals in PROPOSED, APPROVED, or PENDING state for the given job ids") - } - for _, jobID := range jobIDsToDelete { - res, err := env.Offchain.DeleteJob(env.GetContext(), &jobv1.DeleteJobRequest{ - IdOneof: &jobv1.DeleteJobRequest_Id{Id: jobID}, - }) - if err != nil { - return cldf.ChangesetOutput{}, fmt.Errorf("failed to delete job %s: %w", jobID, err) - } - if res == nil { - return cldf.ChangesetOutput{}, fmt.Errorf("delete job response is nil for job %s", jobID) - } - if res.Job == nil || res.Job.DeletedAt == nil { - return cldf.ChangesetOutput{}, fmt.Errorf("delete job response is not in deleted state for job %s", jobID) - } - } - env.Logger.Infof("successfully deleted jobs %v", jobIDsToDelete) - - return cldf.ChangesetOutput{}, nil -} - -func jobsToDelete(env cldf.Environment, jobIDs []string) ([]string, error) { - proposalsResp, err := env.Offchain.ListProposals(env.GetContext(), &jobv1.ListProposalsRequest{ - Filter: &jobv1.ListProposalsRequest_Filter{ - JobIds: jobIDs, - }, - }) - if err != nil { - return nil, fmt.Errorf("failed to list proposals for jobIDs %v: %w", jobIDs, err) - } - if len(proposalsResp.Proposals) == 0 { - return nil, fmt.Errorf("no proposals found for jobIDs %v", jobIDs) - } - seen := make(map[string]struct{}) - var jobIDsToDelete []string - for _, proposal := range proposalsResp.Proposals { - if proposal.Status == jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED || - proposal.Status == jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED || - proposal.Status == jobv1.ProposalStatus_PROPOSAL_STATUS_PENDING { - if _, ok := seen[proposal.JobId]; ok { - continue - } - seen[proposal.JobId] = struct{}{} - jobIDsToDelete = append(jobIDsToDelete, proposal.JobId) - } - } - - return jobIDsToDelete, nil -} diff --git a/jd/changesets/nodes/disable.go b/jd/changesets/nodes/disable.go new file mode 100644 index 0000000..913042e --- /dev/null +++ b/jd/changesets/nodes/disable.go @@ -0,0 +1,44 @@ +package nodes + +import ( + "errors" + "fmt" + + cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +var _ cldf.ChangeSetV2[DisableNodesInput] = DisableNodesChangeset{} + +// DisableNodesInput is the serializable input of DisableNodesChangeset. +type DisableNodesInput = jdops.DisableNodesInput + +// DisableNodesChangeset disables nodes by CSA key. +type DisableNodesChangeset struct{} + +func (DisableNodesChangeset) VerifyPreconditions(_ cldf.Environment, cfg DisableNodesInput) error { + if len(cfg.CSAKeys) == 0 { + return errors.New("no csa_keys provided") + } + seen := make(map[string]struct{}, len(cfg.CSAKeys)) + for _, k := range cfg.CSAKeys { + if k == "" { + return errors.New("csa_key cannot be empty") + } + if _, ok := seen[k]; ok { + return fmt.Errorf("duplicate csa_key %q", k) + } + seen[k] = struct{}{} + } + + return nil +} + +func (DisableNodesChangeset) Apply(e cldf.Environment, input DisableNodesInput) (cldf.ChangesetOutput, error) { + deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name} + _, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDDisableNodes, deps, input) + + return cldf.ChangesetOutput{}, err +} diff --git a/jd/changesets/nodes/disable_test.go b/jd/changesets/nodes/disable_test.go new file mode 100644 index 0000000..683b1b2 --- /dev/null +++ b/jd/changesets/nodes/disable_test.go @@ -0,0 +1,65 @@ +package nodes + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestDisableNodesChangeset(t *testing.T) { + t.Parallel() + + t.Run("disables a registered node", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, RegisterNodesInput{ + Domain: "keystone", + Nodes: []NodeToRegister{{Name: "oracle-1", CSAKey: "csa-key-1"}}, + }) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DisableNodesChangeset{}, DisableNodesInput{ + CSAKeys: []string{"csa-key-1"}, + }) + require.NoError(t, err) + }) + + t.Run("skips nodes not found — Skipped=true, no error", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DisableNodesChangeset{}, DisableNodesInput{ + CSAKeys: []string{"nonexistent-key"}, + }) + require.NoError(t, err) + }) + + t.Run("precondition — empty CSA keys rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DisableNodesChangeset{}, DisableNodesInput{}) + require.ErrorContains(t, err, "no csa_keys provided") + }) + + t.Run("precondition — duplicate CSA key rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, DisableNodesChangeset{}, DisableNodesInput{ + CSAKeys: []string{"csa-key-1", "csa-key-1"}, + }) + require.ErrorContains(t, err, "duplicate csa_key") + }) +} diff --git a/jd/changesets/nodes/doc.go b/jd/changesets/nodes/doc.go new file mode 100644 index 0000000..0e1ea84 --- /dev/null +++ b/jd/changesets/nodes/doc.go @@ -0,0 +1,29 @@ +// Package nodes provides changesets for managing Job Distributor nodes. +// +// # Usage +// +// import "github.com/smartcontractkit/cld-changesets/jd/changesets/nodes" +// +// // Register nodes (idempotent — already-registered nodes are skipped): +// _, err := runtime.ExecChangeset(rt, nodes.RegisterNodesChangeset{}, nodes.RegisterNodesInput{ +// Domain: "keystone", +// Nodes: []nodes.NodeToRegister{{ +// Name: "oracle-1", +// CSAKey: "csa-key-1", +// }}, +// }) +// +// // Update node name and/or labels: +// _, err = runtime.ExecChangeset(rt, nodes.UpdateNodesChangeset{}, nodes.UpdateNodesInput{ +// Nodes: []nodes.NodeToUpdate{{ +// ID: "node-id-1", +// CSAKey: "csa-key-1", +// Name: "oracle-1-updated", +// }}, +// }) +// +// // Disable nodes by CSA key (idempotent — not-found and already-disabled are skipped): +// _, err = runtime.ExecChangeset(rt, nodes.DisableNodesChangeset{}, nodes.DisableNodesInput{ +// CSAKeys: []string{"csa-key-1"}, +// }) +package nodes diff --git a/jd/changesets/nodes/nodes_test.go b/jd/changesets/nodes/nodes_test.go new file mode 100644 index 0000000..462f0e1 --- /dev/null +++ b/jd/changesets/nodes/nodes_test.go @@ -0,0 +1,20 @@ +package nodes + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +func nodeIDByCSAKey(t *testing.T, rt *runtime.Runtime, csaKey string) string { + t.Helper() + node, err := jdops.ListNodeByPublicKey(t.Context(), rt.Environment().Offchain, csaKey) + require.NoError(t, err) + require.NotNil(t, node, "no node found with csa_key %q", csaKey) + + return node.GetId() +} diff --git a/jd/changesets/nodes/register.go b/jd/changesets/nodes/register.go new file mode 100644 index 0000000..d807294 --- /dev/null +++ b/jd/changesets/nodes/register.go @@ -0,0 +1,58 @@ +package nodes + +import ( + "errors" + "fmt" + + cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +var _ cldf.ChangeSetV2[RegisterNodesInput] = RegisterNodesChangeset{} + +// NodeToRegister is the per-node input for RegisterNodesInput. +type NodeToRegister = jdops.NodeToRegister + +// RegisterNodesInput is the serializable input of RegisterNodesChangeset. +type RegisterNodesInput = jdops.RegisterNodesInput + +// RegisterNodesChangeset upserts nodes (already-registered nodes are skipped). +type RegisterNodesChangeset struct{} + +func (RegisterNodesChangeset) VerifyPreconditions(_ cldf.Environment, cfg RegisterNodesInput) error { + if cfg.Domain == "" { + return errors.New("domain is required") + } + if len(cfg.Nodes) == 0 { + return errors.New("no nodes provided") + } + seenNames := make(map[string]struct{}, len(cfg.Nodes)) + seenCSAKeys := make(map[string]struct{}, len(cfg.Nodes)) + for _, n := range cfg.Nodes { + if n.Name == "" { + return errors.New("node name cannot be empty") + } + if n.CSAKey == "" { + return fmt.Errorf("node CSA key cannot be empty for node %q", n.Name) + } + if _, ok := seenNames[n.Name]; ok { + return fmt.Errorf("duplicate node name %q", n.Name) + } + seenNames[n.Name] = struct{}{} + if _, ok := seenCSAKeys[n.CSAKey]; ok { + return fmt.Errorf("duplicate csa_key for node %q", n.Name) + } + seenCSAKeys[n.CSAKey] = struct{}{} + } + + return nil +} + +func (RegisterNodesChangeset) Apply(e cldf.Environment, input RegisterNodesInput) (cldf.ChangesetOutput, error) { + deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name} + _, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDRegisterNodes, deps, input) + + return cldf.ChangesetOutput{}, err +} diff --git a/jd/changesets/nodes/register_test.go b/jd/changesets/nodes/register_test.go new file mode 100644 index 0000000..d79a675 --- /dev/null +++ b/jd/changesets/nodes/register_test.go @@ -0,0 +1,94 @@ +package nodes + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestRegisterNodesChangeset(t *testing.T) { + t.Parallel() + + t.Run("registers new nodes", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, RegisterNodesInput{ + Domain: "keystone", + Nodes: []NodeToRegister{ + {Name: "oracle-1", CSAKey: "csa-key-1"}, + {Name: "oracle-2", CSAKey: "csa-key-2", IsBootstrap: true}, + }, + }) + require.NoError(t, err) + + for _, csaKey := range []string{"csa-key-1", "csa-key-2"} { + require.NotEmpty(t, nodeIDByCSAKey(t, rt, csaKey)) + } + }) + + t.Run("idempotent — skips already registered nodes", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + input := RegisterNodesInput{ + Domain: "keystone", + Nodes: []NodeToRegister{{Name: "oracle-1", CSAKey: "csa-key-1"}}, + } + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, input) + require.NoError(t, err) + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, input) + require.NoError(t, err) + require.NotEmpty(t, nodeIDByCSAKey(t, rt, "csa-key-1")) + }) + + t.Run("precondition — empty domain rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, RegisterNodesInput{ + Nodes: []NodeToRegister{{Name: "oracle-1", CSAKey: "csa-key-1"}}, + }) + require.ErrorContains(t, err, "domain is required") + }) + + t.Run("precondition — duplicate node name rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, RegisterNodesInput{ + Domain: "keystone", + Nodes: []NodeToRegister{ + {Name: "oracle-1", CSAKey: "csa-key-1"}, + {Name: "oracle-1", CSAKey: "csa-key-2"}, + }, + }) + require.ErrorContains(t, err, "duplicate node name") + }) + + t.Run("precondition — duplicate CSA key rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, RegisterNodesChangeset{}, RegisterNodesInput{ + Domain: "keystone", + Nodes: []NodeToRegister{ + {Name: "oracle-1", CSAKey: "csa-key-1"}, + {Name: "oracle-2", CSAKey: "csa-key-1"}, + }, + }) + require.ErrorContains(t, err, "duplicate csa_key") + }) +} diff --git a/jd/changesets/nodes/update.go b/jd/changesets/nodes/update.go new file mode 100644 index 0000000..c79dd47 --- /dev/null +++ b/jd/changesets/nodes/update.go @@ -0,0 +1,55 @@ +package nodes + +import ( + "errors" + "fmt" + + cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + + jdops "github.com/smartcontractkit/cld-changesets/jd/operations" +) + +var _ cldf.ChangeSetV2[UpdateNodesInput] = UpdateNodesChangeset{} + +// NodeToUpdate is the per-node input for UpdateNodesInput. +type NodeToUpdate = jdops.NodeToUpdate + +// UpdateNodesInput is the serializable input of UpdateNodesChangeset. +type UpdateNodesInput = jdops.UpdateNodesInput + +// UpdateNodesChangeset updates node name and/or labels. +type UpdateNodesChangeset struct{} + +func (UpdateNodesChangeset) VerifyPreconditions(_ cldf.Environment, cfg UpdateNodesInput) error { + if len(cfg.Nodes) == 0 { + return errors.New("no nodes provided") + } + seenIDs := make(map[string]struct{}, len(cfg.Nodes)) + seenCSAKeys := make(map[string]struct{}, len(cfg.Nodes)) + for _, n := range cfg.Nodes { + if n.ID == "" { + return errors.New("node id cannot be empty") + } + if n.CSAKey == "" { + return fmt.Errorf("node CSA key cannot be empty for node id %q", n.ID) + } + if _, ok := seenIDs[n.ID]; ok { + return fmt.Errorf("duplicate node id %q", n.ID) + } + seenIDs[n.ID] = struct{}{} + if _, ok := seenCSAKeys[n.CSAKey]; ok { + return fmt.Errorf("duplicate csa_key for node id %q", n.ID) + } + seenCSAKeys[n.CSAKey] = struct{}{} + } + + return nil +} + +func (UpdateNodesChangeset) Apply(e cldf.Environment, input UpdateNodesInput) (cldf.ChangesetOutput, error) { + deps := jdops.JDOpDeps{Offchain: e.Offchain, EnvName: e.Name} + _, err := fwops.ExecuteSequence(e.OperationsBundle, jdops.SeqJDUpdateNodes, deps, input) + + return cldf.ChangesetOutput{}, err +} diff --git a/jd/changesets/nodes/update_test.go b/jd/changesets/nodes/update_test.go new file mode 100644 index 0000000..ecb4666 --- /dev/null +++ b/jd/changesets/nodes/update_test.go @@ -0,0 +1,75 @@ +package nodes + +import ( + "testing" + + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/environment" + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestUpdateNodesChangeset(t *testing.T) { + t.Parallel() + + registerNode := func(t *testing.T, rt *runtime.Runtime, name, csaKey string) string { + t.Helper() + _, err := runtime.ExecChangeset(rt, RegisterNodesChangeset{}, RegisterNodesInput{ + Domain: "keystone", + Nodes: []NodeToRegister{{Name: name, CSAKey: csaKey}}, + }) + require.NoError(t, err) + + return nodeIDByCSAKey(t, rt, csaKey) + } + + t.Run("updates node name and labels", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + nodeID := registerNode(t, rt, "oracle-1", "csa-key-1") + + _, err = runtime.ExecChangeset(rt, UpdateNodesChangeset{}, UpdateNodesInput{ + Nodes: []NodeToUpdate{{ + ID: nodeID, + CSAKey: "csa-key-1", + Name: "oracle-1-updated", + Labels: map[string]string{"env": "staging"}, + }}, + }) + require.NoError(t, err) + + resp, err := rt.Environment().Offchain.GetNode(t.Context(), &nodev1.GetNodeRequest{Id: nodeID}) + require.NoError(t, err) + require.Equal(t, "oracle-1-updated", resp.GetNode().GetName()) + }) + + t.Run("CSA key conflict — rejects reassignment to another node's key", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context(), runtime.WithEnvOpts(environment.WithName("test"))) + require.NoError(t, err) + + node1ID := registerNode(t, rt, "oracle-1", "csa-key-1") + registerNode(t, rt, "oracle-2", "csa-key-2") + + _, err = runtime.ExecChangeset(rt, UpdateNodesChangeset{}, UpdateNodesInput{ + Nodes: []NodeToUpdate{{ID: node1ID, CSAKey: "csa-key-2"}}, + }) + require.Error(t, err) + require.ErrorContains(t, err, "CSA key") + }) + + t.Run("precondition — empty node list rejected", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + _, err = runtime.ExecChangeset(rt, UpdateNodesChangeset{}, UpdateNodesInput{}) + require.ErrorContains(t, err, "no nodes provided") + }) +} diff --git a/jd/operations/jobs.go b/jd/operations/jobs.go new file mode 100644 index 0000000..35a4b02 --- /dev/null +++ b/jd/operations/jobs.go @@ -0,0 +1,302 @@ +package operations + +import ( + "errors" + "fmt" + + "github.com/Masterminds/semver/v3" + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/domain" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" +) + +// JobSpec is the per-job input for ProposeJobsInput. +type JobSpec struct { + NodeLabels map[string]string `json:"node_labels" yaml:"node_labels"` + JobLabels map[string]string `json:"job_labels,omitempty" yaml:"job_labels,omitempty"` + JobspecTOML string `json:"jobspec_toml" yaml:"jobspec_toml"` +} + +// ProposeJobsInput is the serializable input of SeqJDProposeJobs. +type ProposeJobsInput struct { + Domain string `json:"domain" yaml:"domain"` + Jobs []JobSpec `json:"jobs" yaml:"jobs"` +} + +// RevokeJobsInput is the serializable input of SeqJDRevokeJobs. +type RevokeJobsInput struct { + JobIDs []string `json:"job_ids" yaml:"job_ids"` +} + +// DeleteJobsInput is the serializable input of SeqJDDeleteJobs. +type DeleteJobsInput struct { + JobIDs []string `json:"job_ids" yaml:"job_ids"` +} + +// ProposeJobInput is the serializable input of OpJDProposeJob. +type ProposeJobInput struct { + Domain string `json:"domain"` + NodeLabels map[string]string `json:"node_labels"` + JobLabels map[string]string `json:"job_labels,omitempty"` + JobspecTOML string `json:"jobspec_toml"` +} + +// ProposeJobOutput is the serializable output of OpJDProposeJob. +// Empty: ProposeJob targets nodes by label and returns no proposal IDs. +type ProposeJobOutput struct{} + +// RevokeJobInput is the serializable input of OpJDRevokeJob. +type RevokeJobInput struct { + JobID string `json:"job_id"` +} + +// RevokeJobOutput is the serializable output of OpJDRevokeJob. +type RevokeJobOutput struct { + AlreadyAbsent bool `json:"already_absent"` +} + +// DeleteJobInput is the serializable input of OpJDDeleteJob. +type DeleteJobInput struct { + JobID string `json:"job_id"` +} + +// DeleteJobOutput is the serializable output of OpJDDeleteJob. +type DeleteJobOutput struct{} + +// ProposeJobsOutput is the serializable output of SeqJDProposeJobs. +type ProposeJobsOutput struct{} + +// RevokeJobsOutput is the serializable output of SeqJDRevokeJobs. +type RevokeJobsOutput struct { + RevokedJobIDs []string `json:"revoked_job_ids"` + AlreadyAbsentJobIDs []string `json:"already_absent_job_ids"` +} + +// DeleteJobsOutput is the serializable output of SeqJDDeleteJobs. +type DeleteJobsOutput struct { + DeletedJobIDs []string `json:"deleted_job_ids"` +} + +// OpJDProposeJob proposes a single job spec to nodes matched by label. +// Fails if no nodes match the domain/environment/label selectors. +var OpJDProposeJob = fwops.NewOperation( + "jd-propose-job", + semver.MustParse("1.0.0"), + "Propose a job spec to nodes matched by label", + func(b fwops.Bundle, deps JDOpDeps, in ProposeJobInput) (ProposeJobOutput, error) { + ctx := b.GetContext() + d := domain.NewDomain("", in.Domain) + domainKey := d.Key() + envName := deps.EnvName + + selectors := make([]*ptypes.Selector, 0, 2+len(in.NodeLabels)) + selectors = append(selectors, + &ptypes.Selector{Key: "product", Op: ptypes.SelectorOp_EQ, Value: &domainKey}, + &ptypes.Selector{Key: "environment", Op: ptypes.SelectorOp_EQ, Value: &envName}, + ) + for k, v := range in.NodeLabels { + selectors = append(selectors, &ptypes.Selector{Key: k, Op: ptypes.SelectorOp_EQ, Value: &v}) + } + + resp, err := deps.Offchain.ListNodes(ctx, &nodev1.ListNodesRequest{ + Filter: &nodev1.ListNodesRequest_Filter{Enabled: 1, Selectors: selectors}, + }) + if err != nil { + return ProposeJobOutput{}, fmt.Errorf("failed to list nodes: %w", err) + } + nodes := resp.GetNodes() + if len(nodes) == 0 { + return ProposeJobOutput{}, fmt.Errorf("no nodes matched domain=%q env=%q labels=%v", in.Domain, deps.EnvName, in.NodeLabels) + } + + var merr error + for _, node := range nodes { + if _, err = deps.Offchain.ProposeJob(ctx, &jobv1.ProposeJobRequest{ + NodeId: node.GetId(), + Spec: in.JobspecTOML, + Labels: labelsFromMap(in.JobLabels), + }); err != nil { + merr = errors.Join(merr, fmt.Errorf("failed to propose job to node %q: %w", node.GetId(), err)) + } + } + if merr != nil { + return ProposeJobOutput{}, merr + } + b.Logger.Infow("proposed job", "node_labels", in.NodeLabels, "node_count", len(nodes)) + + return ProposeJobOutput{}, nil + }, +) + +// OpJDRevokeJob revokes a single job. +var OpJDRevokeJob = fwops.NewOperation( + "jd-revoke-job", + semver.MustParse("1.0.0"), + "Revoke a job", + func(b fwops.Bundle, deps JDOpDeps, in RevokeJobInput) (RevokeJobOutput, error) { + _, err := deps.Offchain.RevokeJob(b.GetContext(), &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Id{Id: in.JobID}, + }) + if err != nil { + if isJDNotFound(err) { + b.Logger.Infow("job already absent or revoked", "job_id", in.JobID) + + return RevokeJobOutput{AlreadyAbsent: true}, nil + } + + return RevokeJobOutput{}, fmt.Errorf("failed to revoke job %q: %w", in.JobID, err) + } + b.Logger.Infow("revoked job", "job_id", in.JobID) + + return RevokeJobOutput{}, nil + }, +) + +// OpJDDeleteJob deletes a single job. +var OpJDDeleteJob = fwops.NewOperation( + "jd-delete-job", + semver.MustParse("1.0.0"), + "Delete a job", + func(b fwops.Bundle, deps JDOpDeps, in DeleteJobInput) (DeleteJobOutput, error) { + res, err := deps.Offchain.DeleteJob(b.GetContext(), &jobv1.DeleteJobRequest{ + IdOneof: &jobv1.DeleteJobRequest_Id{Id: in.JobID}, + }) + if err != nil { + return DeleteJobOutput{}, fmt.Errorf("failed to delete job %q: %w", in.JobID, err) + } + if res.GetJob().GetDeletedAt() == nil { + return DeleteJobOutput{}, fmt.Errorf("delete job %q response did not confirm deletion", in.JobID) + } + b.Logger.Infow("deleted job", "job_id", in.JobID) + + return DeleteJobOutput{}, nil + }, +) + +// SeqJDProposeJobs proposes multiple job specs. Failures are collected. +var SeqJDProposeJobs = fwops.NewSequence( + "seq-jd-propose-jobs", + semver.MustParse("1.0.0"), + "Propose multiple job specs", + func(b fwops.Bundle, deps JDOpDeps, in ProposeJobsInput) (ProposeJobsOutput, error) { + var failed int + var merr error + for i, j := range in.Jobs { + _, err := fwops.ExecuteOperation(b, OpJDProposeJob, deps, ProposeJobInput{ + Domain: in.Domain, + NodeLabels: j.NodeLabels, + JobLabels: j.JobLabels, + JobspecTOML: j.JobspecTOML, + }) + if err != nil { + failed++ + merr = errors.Join(merr, fmt.Errorf("job[%d]: propose failed: %w", i, err)) + b.Logger.Errorw("failed to propose job", "index", i, "node_labels", j.NodeLabels, "error", err) + } + } + b.Logger.Infow("propose_jobs complete", + "total", len(in.Jobs), + "succeeded", len(in.Jobs)-failed, + "failed", failed) + + return ProposeJobsOutput{}, merr + }, +) + +// SeqJDRevokeJobs revokes multiple jobs. Failures are collected. +var SeqJDRevokeJobs = fwops.NewSequence( + "seq-jd-revoke-jobs", + semver.MustParse("1.0.0"), + "Revoke multiple jobs", + func(b fwops.Bundle, deps JDOpDeps, in RevokeJobsInput) (RevokeJobsOutput, error) { + var revokedIDs, alreadyAbsentIDs []string + var failed int + var merr error + for _, jobID := range in.JobIDs { + report, err := fwops.ExecuteOperation(b, OpJDRevokeJob, deps, RevokeJobInput{JobID: jobID}) + if err != nil { + failed++ + merr = errors.Join(merr, err) + b.Logger.Errorw("failed to revoke job", "job_id", jobID, "error", err) + + continue + } + if report.Output.AlreadyAbsent { + alreadyAbsentIDs = append(alreadyAbsentIDs, jobID) + } else { + revokedIDs = append(revokedIDs, jobID) + } + } + b.Logger.Infow("revoke_jobs complete", + "total", len(in.JobIDs), + "revoked", len(revokedIDs), + "already_absent", len(alreadyAbsentIDs), + "failed", failed) + + return RevokeJobsOutput{RevokedJobIDs: revokedIDs, AlreadyAbsentJobIDs: alreadyAbsentIDs}, merr + }, +) + +// SeqJDDeleteJobs deletes multiple jobs. +var SeqJDDeleteJobs = fwops.NewSequence( + "seq-jd-delete-jobs", + semver.MustParse("1.0.0"), + "Delete multiple jobs", + func(b fwops.Bundle, deps JDOpDeps, in DeleteJobsInput) (DeleteJobsOutput, error) { + resp, err := deps.Offchain.ListProposals(b.GetContext(), &jobv1.ListProposalsRequest{ + Filter: &jobv1.ListProposalsRequest_Filter{JobIds: in.JobIDs}, + }) + if err != nil { + return DeleteJobsOutput{}, fmt.Errorf("failed to list proposals: %w", err) + } + if len(resp.Proposals) == 0 { + return DeleteJobsOutput{}, fmt.Errorf("no proposals found for job ids %v", in.JobIDs) + } + + eligible := eligibleJobIDsForDeletion(resp.Proposals) + if len(eligible) == 0 { + return DeleteJobsOutput{}, errors.New("no jobs eligible for deletion: no proposals in PROPOSED, APPROVED, or PENDING state") + } + + var deletedIDs []string + var failed int + var merr error + for _, jobID := range eligible { + _, err := fwops.ExecuteOperation(b, OpJDDeleteJob, deps, DeleteJobInput{JobID: jobID}) + if err != nil { + failed++ + merr = errors.Join(merr, err) + b.Logger.Errorw("failed to delete job", "job_id", jobID, "error", err) + + continue + } + deletedIDs = append(deletedIDs, jobID) + } + b.Logger.Infow("delete_jobs complete", + "total", len(eligible), + "deleted", len(deletedIDs), + "failed", failed) + + return DeleteJobsOutput{DeletedJobIDs: deletedIDs}, merr + }, +) + +func eligibleJobIDsForDeletion(proposals []*jobv1.Proposal) []string { + seen := make(map[string]struct{}) + var eligible []string + for _, p := range proposals { + if p.Status == jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED || + p.Status == jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED || + p.Status == jobv1.ProposalStatus_PROPOSAL_STATUS_PENDING { + if _, ok := seen[p.JobId]; !ok { + seen[p.JobId] = struct{}{} + eligible = append(eligible, p.JobId) + } + } + } + + return eligible +} diff --git a/jd/operations/jobs_test.go b/jd/operations/jobs_test.go new file mode 100644 index 0000000..b05f3f1 --- /dev/null +++ b/jd/operations/jobs_test.go @@ -0,0 +1,60 @@ +package operations + +import ( + "testing" + + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + "github.com/stretchr/testify/require" + + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func TestOpJDRevokeJob(t *testing.T) { + t.Parallel() + + t.Run("revokes a proposed job", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + offchain := rt.Environment().Offchain + deps := JDOpDeps{Offchain: offchain, EnvName: "test"} + + reg, err := fwops.ExecuteOperation(newBundle(t), OpJDRegisterNode, deps, RegisterNodeInput{ + Domain: "keystone", + Name: "oracle-1", + CSAKey: "csa-key-1", + }) + require.NoError(t, err) + proposal, err := offchain.ProposeJob(t.Context(), &jobv1.ProposeJobRequest{ + NodeId: reg.Output.NodeID, + Spec: ` +type = "offchainreporting2" +name = "test-job" +contractID = "0x0000000000000000000000000000000000000001" +externalJobID = "00000000-0000-0000-0000-000000000001" +`, + }) + require.NoError(t, err) + jobID := proposal.GetProposal().GetJobId() + + report, err := fwops.ExecuteOperation(newBundle(t), OpJDRevokeJob, deps, RevokeJobInput{JobID: jobID}) + require.NoError(t, err) + require.False(t, report.Output.AlreadyAbsent) + }) + + t.Run("job not found — AlreadyAbsent=true, no error", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + deps := JDOpDeps{Offchain: rt.Environment().Offchain, EnvName: "test"} + report, err := fwops.ExecuteOperation(newBundle(t), OpJDRevokeJob, deps, RevokeJobInput{JobID: "job_does_not_exist"}) + require.NoError(t, err) + require.True(t, report.Output.AlreadyAbsent, "expected not found revoke to be marked already-absent") + }) +} diff --git a/jd/operations/nodes.go b/jd/operations/nodes.go new file mode 100644 index 0000000..21045b3 --- /dev/null +++ b/jd/operations/nodes.go @@ -0,0 +1,313 @@ +package operations + +import ( + "errors" + "fmt" + + "github.com/Masterminds/semver/v3" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/domain" + cldf_engine_offchain "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/offchain" + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" +) + +// NodeToRegister is the per-node input for RegisterNodesInput. +type NodeToRegister struct { + Name string `json:"name" yaml:"name"` + CSAKey string `json:"csa_key" yaml:"csa_key"` + IsBootstrap bool `json:"is_bootstrap" yaml:"is_bootstrap"` + Labels map[string]string `json:"labels" yaml:"labels"` +} + +// RegisterNodesInput is the serializable input of SeqJDRegisterNodes. +type RegisterNodesInput struct { + Domain string `json:"domain" yaml:"domain"` + Nodes []NodeToRegister `json:"nodes" yaml:"nodes"` +} + +// NodeToUpdate is the per-node input for UpdateNodesInput. +type NodeToUpdate struct { + ID string `json:"id" yaml:"id"` + CSAKey string `json:"csa_key" yaml:"csa_key"` + Name string `json:"name,omitempty" yaml:"name,omitempty"` + Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"` +} + +// UpdateNodesInput is the serializable input of SeqJDUpdateNodes. +type UpdateNodesInput struct { + Nodes []NodeToUpdate `json:"nodes" yaml:"nodes"` +} + +// DisableNodesInput is the serializable input of SeqJDDisableNodes. +type DisableNodesInput struct { + CSAKeys []string `json:"csa_keys" yaml:"csa_keys"` +} + +// RegisterNodeInput is the serializable input of OpJDRegisterNode. +type RegisterNodeInput struct { + Domain string `json:"domain"` + Name string `json:"name"` + CSAKey string `json:"csa_key"` + IsBootstrap bool `json:"is_bootstrap"` + Labels map[string]string `json:"labels"` +} + +// RegisterNodeOutput is the serializable output of OpJDRegisterNode. +type RegisterNodeOutput struct { + NodeID string `json:"node_id"` + Skipped bool `json:"skipped"` +} + +// UpdateNodeInput is the serializable input of OpJDUpdateNode. +type UpdateNodeInput struct { + ID string `json:"id"` + CSAKey string `json:"csa_key"` + Name string `json:"name,omitempty"` + Labels map[string]string `json:"labels,omitempty"` +} + +// UpdateNodeOutput is the serializable output of OpJDUpdateNode. +type UpdateNodeOutput struct { + NodeID string `json:"node_id"` +} + +// DisableNodeInput is the serializable input of OpJDDisableNode. +type DisableNodeInput struct { + CSAKey string `json:"csa_key"` +} + +// DisableNodeOutput is the serializable output of OpJDDisableNode. +type DisableNodeOutput struct { + NodeID string `json:"node_id"` + Skipped bool `json:"skipped"` +} + +// RegisterNodesOutput is the serializable output of SeqJDRegisterNodes. +type RegisterNodesOutput struct { + RegisteredNodeIDs []string `json:"registered_node_ids"` + SkippedNodeIDs []string `json:"skipped_node_ids"` +} + +// UpdateNodesOutput is the serializable output of SeqJDUpdateNodes. +type UpdateNodesOutput struct { + UpdatedNodeIDs []string `json:"updated_node_ids"` +} + +// DisableNodesOutput is the serializable output of SeqJDDisableNodes. +type DisableNodesOutput struct { + DisabledNodeIDs []string `json:"disabled_node_ids"` + SkippedCSAKeys []string `json:"skipped_csa_keys"` +} + +// OpJDRegisterNode registers a single node. +var OpJDRegisterNode = fwops.NewOperation( + "jd-register-node", + semver.MustParse("1.0.0"), + "Register a node", + func(b fwops.Bundle, deps JDOpDeps, in RegisterNodeInput) (RegisterNodeOutput, error) { + ctx := b.GetContext() + + existing, err := ListNodeByPublicKey(ctx, deps.Offchain, in.CSAKey) + if err != nil { + return RegisterNodeOutput{}, fmt.Errorf("failed to check if node is already registered: %w", err) + } + if existing != nil { + b.Logger.Infow("node already registered, skipping", "name", in.Name, "csa_key", in.CSAKey) + + return RegisterNodeOutput{NodeID: existing.GetId(), Skipped: true}, nil + } + labels := in.Labels + if labels == nil { + labels = make(map[string]string) + } + d := domain.NewDomain("", in.Domain) + nodeID, err := cldf_engine_offchain.RegisterNode(ctx, deps.Offchain, in.Name, in.CSAKey, in.IsBootstrap, d, deps.EnvName, labels) + if err != nil { + return RegisterNodeOutput{}, fmt.Errorf("failed to register node %q (csa_key: %s): %w", in.Name, in.CSAKey, err) + } + b.Logger.Infow("registered node", "name", in.Name, "id", nodeID) + + return RegisterNodeOutput{NodeID: nodeID}, nil + }, +) + +// OpJDUpdateNode updates a node's name and/or labels. +var OpJDUpdateNode = fwops.NewOperation( + "jd-update-node", + semver.MustParse("1.0.0"), + "Update a node's name and/or labels", + func(b fwops.Bundle, deps JDOpDeps, in UpdateNodeInput) (UpdateNodeOutput, error) { + ctx := b.GetContext() + resp, err := deps.Offchain.GetNode(ctx, &nodev1.GetNodeRequest{Id: in.ID}) + if err != nil { + return UpdateNodeOutput{}, fmt.Errorf("failed to get node %q: %w", in.ID, err) + } + nodeInfo := resp.GetNode() + + existing, lookupErr := ListNodeByPublicKey(ctx, deps.Offchain, in.CSAKey) + if lookupErr != nil { + b.Logger.Warnw("could not verify CSA key conflict, skipping check", "csa_key", in.CSAKey, "error", lookupErr) + } else if existing != nil && existing.GetId() != in.ID { + return UpdateNodeOutput{}, fmt.Errorf( + "CSA key %s is already registered to node %q (%s), cannot reassign to %q", + in.CSAKey, existing.GetId(), existing.GetName(), in.ID, + ) + } + + name := nodeInfo.GetName() + if in.Name != "" { + name = in.Name + } + labels := nodeInfo.GetLabels() + if in.Labels != nil { + labels = labelsFromMap(in.Labels) + } + + if _, err = deps.Offchain.UpdateNode(ctx, &nodev1.UpdateNodeRequest{ + Id: in.ID, + Name: name, + PublicKey: in.CSAKey, + Labels: labels, + }); err != nil { + return UpdateNodeOutput{}, fmt.Errorf("failed to update node %q: %w", in.ID, err) + } + b.Logger.Infow("updated node", "id", in.ID, "name", name, + "old_csa_key", nodeInfo.GetPublicKey(), "new_csa_key", in.CSAKey) + + return UpdateNodeOutput{NodeID: in.ID}, nil + }, +) + +// OpJDDisableNode disables a node by CSA key. +var OpJDDisableNode = fwops.NewOperation( + "jd-disable-node", + semver.MustParse("1.0.0"), + "Disable a node by CSA key", + func(b fwops.Bundle, deps JDOpDeps, in DisableNodeInput) (DisableNodeOutput, error) { + ctx := b.GetContext() + nodeInfo, err := ListNodeByPublicKey(ctx, deps.Offchain, in.CSAKey) + if err != nil { + return DisableNodeOutput{}, fmt.Errorf("failed to look up node with csa_key %s: %w", in.CSAKey, err) + } + if nodeInfo == nil { + b.Logger.Infow("node not found, skipping disable", "csa_key", in.CSAKey) + + return DisableNodeOutput{Skipped: true}, nil + } + if !nodeInfo.GetIsEnabled() { + b.Logger.Infow("node already disabled, skipping", "node_id", nodeInfo.GetId()) + + return DisableNodeOutput{NodeID: nodeInfo.GetId(), Skipped: true}, nil + } + if _, err = deps.Offchain.DisableNode(ctx, &nodev1.DisableNodeRequest{Id: nodeInfo.GetId()}); err != nil { + return DisableNodeOutput{}, fmt.Errorf("failed to disable node %q (%s): %w", nodeInfo.GetId(), nodeInfo.GetName(), err) + } + b.Logger.Infow("disabled node", "node_id", nodeInfo.GetId(), "name", nodeInfo.GetName()) + + return DisableNodeOutput{NodeID: nodeInfo.GetId()}, nil + }, +) + +// SeqJDRegisterNodes registers multiple nodes. +var SeqJDRegisterNodes = fwops.NewSequence( + "seq-jd-register-nodes", + semver.MustParse("1.0.0"), + "Register multiple nodes", + func(b fwops.Bundle, deps JDOpDeps, in RegisterNodesInput) (RegisterNodesOutput, error) { + var registeredIDs, skippedIDs []string + var failed int + var merr error + for _, n := range in.Nodes { + report, err := fwops.ExecuteOperation(b, OpJDRegisterNode, deps, RegisterNodeInput{ + Domain: in.Domain, + Name: n.Name, + CSAKey: n.CSAKey, + IsBootstrap: n.IsBootstrap, + Labels: n.Labels, + }) + if err != nil { + failed++ + merr = errors.Join(merr, err) + b.Logger.Errorw("failed to register node", "name", n.Name, "error", err) + + continue + } + if report.Output.Skipped { + skippedIDs = append(skippedIDs, report.Output.NodeID) + } else { + registeredIDs = append(registeredIDs, report.Output.NodeID) + } + } + b.Logger.Infow("register_nodes complete", + "total", len(in.Nodes), + "registered", len(registeredIDs), + "skipped", len(skippedIDs), + "failed", failed) + + return RegisterNodesOutput{RegisteredNodeIDs: registeredIDs, SkippedNodeIDs: skippedIDs}, merr + }, +) + +// SeqJDUpdateNodes updates multiple nodes. +var SeqJDUpdateNodes = fwops.NewSequence( + "seq-jd-update-nodes", + semver.MustParse("1.0.0"), + "Update multiple nodes", + func(b fwops.Bundle, deps JDOpDeps, in UpdateNodesInput) (UpdateNodesOutput, error) { + var updatedIDs []string + var failed int + var merr error + for _, n := range in.Nodes { + _, err := fwops.ExecuteOperation(b, OpJDUpdateNode, deps, UpdateNodeInput(n)) + if err != nil { + failed++ + merr = errors.Join(merr, err) + b.Logger.Errorw("failed to update node", "id", n.ID, "error", err) + + continue + } + updatedIDs = append(updatedIDs, n.ID) + } + b.Logger.Infow("update_nodes complete", + "total", len(in.Nodes), + "updated", len(updatedIDs), + "failed", failed) + + return UpdateNodesOutput{UpdatedNodeIDs: updatedIDs}, merr + }, +) + +// SeqJDDisableNodes disables multiple nodes by CSA key. +var SeqJDDisableNodes = fwops.NewSequence( + "seq-jd-disable-nodes", + semver.MustParse("1.0.0"), + "Disable multiple nodes by CSA key", + func(b fwops.Bundle, deps JDOpDeps, in DisableNodesInput) (DisableNodesOutput, error) { + var disabledIDs, skippedCSAKeys []string + var failed int + var merr error + for _, csaKey := range in.CSAKeys { + report, err := fwops.ExecuteOperation(b, OpJDDisableNode, deps, DisableNodeInput{CSAKey: csaKey}) + if err != nil { + failed++ + merr = errors.Join(merr, err) + b.Logger.Errorw("failed to disable node", "csa_key", csaKey, "error", err) + + continue + } + if report.Output.Skipped { + skippedCSAKeys = append(skippedCSAKeys, csaKey) + } else { + disabledIDs = append(disabledIDs, report.Output.NodeID) + } + } + b.Logger.Infow("disable_nodes complete", + "total", len(in.CSAKeys), + "disabled", len(disabledIDs), + "skipped", len(skippedCSAKeys), + "failed", failed) + + return DisableNodesOutput{DisabledNodeIDs: disabledIDs, SkippedCSAKeys: skippedCSAKeys}, merr + }, +) diff --git a/jd/operations/nodes_test.go b/jd/operations/nodes_test.go new file mode 100644 index 0000000..0f44a0b --- /dev/null +++ b/jd/operations/nodes_test.go @@ -0,0 +1,96 @@ +package operations + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + fwops "github.com/smartcontractkit/chainlink-deployments-framework/operations" + "github.com/smartcontractkit/chainlink-deployments-framework/pkg/logger" + + "github.com/smartcontractkit/chainlink-deployments-framework/engine/test/runtime" +) + +func newBundle(t *testing.T) fwops.Bundle { + t.Helper() + + return fwops.NewBundle(func() context.Context { return t.Context() }, logger.Test(t), fwops.NewMemoryReporter()) +} + +func TestOpJDRegisterNode(t *testing.T) { + t.Parallel() + + t.Run("registers new node and returns its ID", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + deps := JDOpDeps{Offchain: rt.Environment().Offchain, EnvName: "test"} + report, err := fwops.ExecuteOperation(newBundle(t), OpJDRegisterNode, deps, RegisterNodeInput{ + Domain: "keystone", + Name: "oracle-1", + CSAKey: "csa-key-1", + }) + require.NoError(t, err) + require.NotEmpty(t, report.Output.NodeID) + require.False(t, report.Output.Skipped) + }) + + t.Run("second call with same CSA key is a no-op, returns same ID with Skipped=true", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + deps := JDOpDeps{Offchain: rt.Environment().Offchain, EnvName: "test"} + input := RegisterNodeInput{Domain: "keystone", Name: "oracle-1", CSAKey: "csa-key-1"} + + first, err := fwops.ExecuteOperation(newBundle(t), OpJDRegisterNode, deps, input) + require.NoError(t, err) + require.False(t, first.Output.Skipped) + + second, err := fwops.ExecuteOperation(newBundle(t), OpJDRegisterNode, deps, input) + require.NoError(t, err) + require.True(t, second.Output.Skipped, "expected second registration to be skipped") + require.Equal(t, first.Output.NodeID, second.Output.NodeID, "skipped op must return the existing node ID") + }) +} + +func TestOpJDDisableNode(t *testing.T) { + t.Parallel() + + t.Run("disables an existing node and returns its ID", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + deps := JDOpDeps{Offchain: rt.Environment().Offchain, EnvName: "test"} + reg, err := fwops.ExecuteOperation(newBundle(t), OpJDRegisterNode, deps, RegisterNodeInput{ + Domain: "keystone", + Name: "oracle-1", + CSAKey: "csa-key-1", + }) + require.NoError(t, err) + + dis, err := fwops.ExecuteOperation(newBundle(t), OpJDDisableNode, deps, DisableNodeInput{CSAKey: "csa-key-1"}) + require.NoError(t, err) + require.False(t, dis.Output.Skipped) + require.Equal(t, reg.Output.NodeID, dis.Output.NodeID) + }) + + t.Run("node not found — Skipped=true, no error", func(t *testing.T) { + t.Parallel() + + rt, err := runtime.New(t.Context()) + require.NoError(t, err) + + deps := JDOpDeps{Offchain: rt.Environment().Offchain, EnvName: "test"} + report, err := fwops.ExecuteOperation(newBundle(t), OpJDDisableNode, deps, DisableNodeInput{CSAKey: "nonexistent-key"}) + require.NoError(t, err) + require.True(t, report.Output.Skipped, "expected not found disable to be marked skipped") + require.Empty(t, report.Output.NodeID) + }) +} diff --git a/jd/operations/util.go b/jd/operations/util.go new file mode 100644 index 0000000..b1ec999 --- /dev/null +++ b/jd/operations/util.go @@ -0,0 +1,51 @@ +package operations + +import ( + "context" + "fmt" + + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + foffchain "github.com/smartcontractkit/chainlink-deployments-framework/offchain" +) + +// JDOpDeps is the shared dependency surface for all JD operations and sequences. +type JDOpDeps struct { + Offchain foffchain.Client + EnvName string +} + +func labelsFromMap(m map[string]string) []*ptypes.Label { + labels := make([]*ptypes.Label, 0, len(m)) + for k, v := range m { + labels = append(labels, &ptypes.Label{Key: k, Value: &v}) + } + + return labels +} + +func isJDNotFound(err error) bool { + st, ok := status.FromError(err) + return ok && st.Code() == codes.NotFound +} + +// ListNodeByPublicKey returns the node whose public key matches csaKey, +// or (nil, nil). +func ListNodeByPublicKey(ctx context.Context, svc nodev1.NodeServiceClient, csaKey string) (*nodev1.Node, error) { + resp, err := svc.ListNodes(ctx, &nodev1.ListNodesRequest{ + Filter: &nodev1.ListNodesRequest_Filter{PublicKeys: []string{csaKey}}, + }) + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + for _, n := range resp.Nodes { + if n.GetPublicKey() == csaKey { + return n, nil + } + } + + return nil, nil //nolint:nilnil // nil means "not found". +}