Skip to content

Commit d6804e9

Browse files
committed
feat: update memory client propose and revoke functionality
This updates the memory client to correctly handle the propose and revoke functionality to mimic the Job Distributor API. This also introduces a new ID generator for the memory client to differentiate from the UUIDs that are set on the job.
1 parent 35d9189 commit d6804e9

5 files changed

Lines changed: 549 additions & 104 deletions

File tree

.changeset/mean-otters-sink.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink-deployments-framework": minor
3+
---
4+
5+
Aligns MemoryJobDistributor `ProposeJob` and `RevokeJob` to have the same functionality as the JobDistributor service

offchain/jd/memory/id.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package memory
2+
3+
import "github.com/segmentio/ksuid"
4+
5+
// newProposalID generates a new proposal ID.
6+
func newProposalID() string {
7+
return newID("prop")
8+
}
9+
10+
// newJobID generates a new job ID.
11+
func newJobID() string {
12+
return newID("job")
13+
}
14+
15+
// newNodeID generates a new node ID.
16+
func newNodeID() string {
17+
return newID("node")
18+
}
19+
20+
// newID generates a new ID with a given prefix.
21+
//
22+
// This uses ksuid to generate a unique ID which differs from the Job Distributor ID format, to
23+
// better differentiate from the UUID that are set on the job. Each ID should be prefixed with a
24+
// string to identify the type of ID.
25+
func newID(prefix string) string {
26+
return prefix + "_" + ksuid.New().String()
27+
}

offchain/jd/memory/id_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package memory
2+
3+
import (
4+
"strings"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestNewProposalID(t *testing.T) {
11+
t.Parallel()
12+
13+
id := newProposalID()
14+
15+
assert.NotEmpty(t, id)
16+
assert.True(t, strings.HasPrefix(id, "prop_"))
17+
}
18+
19+
func TestNewJobID(t *testing.T) {
20+
t.Parallel()
21+
22+
id := newJobID()
23+
24+
assert.NotEmpty(t, id)
25+
assert.True(t, strings.HasPrefix(id, "job_"))
26+
}
27+
28+
func TestNewNodeID(t *testing.T) {
29+
t.Parallel()
30+
31+
id := newNodeID()
32+
33+
assert.NotEmpty(t, id)
34+
assert.True(t, strings.HasPrefix(id, "node_"))
35+
}
36+
37+
func TestNewID(t *testing.T) {
38+
t.Parallel()
39+
40+
tests := []struct {
41+
name string
42+
prefix string
43+
wantPrefix string
44+
}{
45+
{
46+
name: "empty prefix",
47+
prefix: "",
48+
wantPrefix: "_",
49+
},
50+
{
51+
name: "single character prefix",
52+
prefix: "a",
53+
wantPrefix: "a_",
54+
},
55+
{
56+
name: "multi character prefix",
57+
prefix: "test",
58+
wantPrefix: "test_",
59+
},
60+
{
61+
name: "prefix with numbers",
62+
prefix: "test123",
63+
wantPrefix: "test123_",
64+
},
65+
{
66+
name: "prefix with special characters",
67+
prefix: "test-123",
68+
wantPrefix: "test-123_",
69+
},
70+
}
71+
72+
for _, tt := range tests {
73+
t.Run(tt.name, func(t *testing.T) {
74+
t.Parallel()
75+
76+
id := newID(tt.prefix)
77+
78+
assert.NotEmpty(t, id)
79+
assert.True(t, strings.HasPrefix(id, tt.wantPrefix))
80+
})
81+
}
82+
}

offchain/jd/memory/memory_client.go

Lines changed: 148 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ package memory
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"slices"
8+
"strings"
69
"sync"
710
"time"
811

912
"github.com/google/uuid"
13+
"github.com/pelletier/go-toml/v2"
1014
csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
1115
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
1216
nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
17+
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
1318
"google.golang.org/grpc"
1419
"google.golang.org/grpc/codes"
1520
"google.golang.org/grpc/status"
@@ -48,36 +53,21 @@ func NewMemoryJobDistributor() *MemoryJobDistributor {
4853
// Job Service Methods
4954

5055
// ProposeJob creates a new job proposal and stores it in memory.
51-
func (m *MemoryJobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) {
56+
func (m *MemoryJobDistributor) ProposeJob(
57+
ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption,
58+
) (*jobv1.ProposeJobResponse, error) {
5259
if in == nil {
5360
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
5461
}
5562

56-
// Generate unique IDs
57-
proposalID := uuid.New().String()
58-
jobID := uuid.New().String()
59-
60-
// Create the proposal
61-
proposal := &jobv1.Proposal{
62-
Id: proposalID,
63-
JobId: jobID,
64-
Spec: in.Spec,
65-
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED,
66-
}
63+
m.mu.Lock()
64+
defer m.mu.Unlock()
6765

68-
// Also create a job based on the proposal
69-
job := &jobv1.Job{
70-
Id: jobID,
71-
Uuid: uuid.New().String(),
72-
NodeId: in.NodeId,
73-
Labels: in.Labels,
66+
proposal, err := m.upsertProposal(in.NodeId, in.Spec, in.Labels)
67+
if err != nil {
68+
return nil, err
7469
}
7570

76-
m.mu.Lock()
77-
m.proposals[proposalID] = proposal
78-
m.jobs[jobID] = job
79-
m.mu.Unlock()
80-
8171
return &jobv1.ProposeJobResponse{
8272
Proposal: proposal,
8373
}, nil
@@ -179,29 +169,11 @@ func (m *MemoryJobDistributor) BatchProposeJob(ctx context.Context, in *jobv1.Ba
179169
m.mu.Lock()
180170
// Create a proposal for each node
181171
for _, nodeID := range in.GetNodeIds() {
182-
// Generate unique IDs
183-
proposalID := uuid.New().String()
184-
jobID := uuid.New().String()
185-
186-
// Create the proposal
187-
proposal := &jobv1.Proposal{
188-
Id: proposalID,
189-
JobId: jobID,
190-
Spec: in.GetSpec(),
191-
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED,
172+
proposal, err := m.upsertProposal(nodeID, in.GetSpec(), in.GetLabels())
173+
if err != nil {
174+
return nil, err
192175
}
193176

194-
// Store the proposal
195-
m.proposals[proposalID] = proposal
196-
197-
// Also create and store a job based on the proposal
198-
job := &jobv1.Job{
199-
Id: jobID,
200-
NodeId: nodeID,
201-
Labels: in.GetLabels(),
202-
}
203-
m.jobs[jobID] = job
204-
205177
// Add to success responses
206178
successResponses[nodeID] = &jobv1.ProposeJobResponse{
207179
Proposal: proposal,
@@ -220,38 +192,39 @@ func (m *MemoryJobDistributor) RevokeJob(ctx context.Context, in *jobv1.RevokeJo
220192
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
221193
}
222194

223-
// Use GetId() method to access the oneof field
224-
jobID := in.GetId()
195+
var jobID string
196+
switch idOneof := in.GetIdOneof().(type) {
197+
case *jobv1.RevokeJobRequest_Id:
198+
if idOneof.Id == "" {
199+
return nil, status.Error(codes.InvalidArgument, "job id must be provided")
200+
}
225201

226-
if jobID == "" {
227-
return nil, status.Error(codes.InvalidArgument, "job id must be provided")
202+
jobID = idOneof.Id
203+
case *jobv1.RevokeJobRequest_Uuid:
204+
return nil, status.Error(codes.InvalidArgument, "uuid is not supported")
228205
}
229206

230207
m.mu.Lock()
231-
// Find the proposal associated with this job
232-
var foundProposal *jobv1.Proposal
233-
for _, proposal := range m.proposals {
234-
if proposal.JobId == jobID {
235-
foundProposal = proposal
236-
break
237-
}
238-
}
208+
defer m.mu.Unlock()
239209

240-
if foundProposal != nil {
241-
foundProposal.Status = jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED
210+
// Find the proposal with the highest revision number associated with this job
211+
prop := m.getHighestRevisionProposalByJobID(jobID)
212+
if prop == nil {
213+
return nil, status.Errorf(codes.NotFound, "proposal with job id %s not found", jobID)
242214
}
243-
m.mu.Unlock()
244215

245-
if foundProposal != nil {
246-
return &jobv1.RevokeJobResponse{
247-
Proposal: foundProposal,
248-
}, nil
216+
if !slices.Contains([]jobv1.ProposalStatus{jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, jobv1.ProposalStatus_PROPOSAL_STATUS_CANCELLED}, prop.Status) {
217+
return nil, errors.New("job cannot be revoked")
249218
}
250219

251-
return &jobv1.RevokeJobResponse{}, nil
220+
prop.Status = jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED
221+
222+
return &jobv1.RevokeJobResponse{
223+
Proposal: prop,
224+
}, nil
252225
}
253226

254-
// DeleteJob removes a job from memory.
227+
// DeleteJob soft deletes a job, setting the DeletedAt field to the current time.
255228
func (m *MemoryJobDistributor) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) {
256229
if in == nil {
257230
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
@@ -318,7 +291,7 @@ func (m *MemoryJobDistributor) RegisterNode(ctx context.Context, in *nodev1.Regi
318291
}
319292

320293
// Generate a new ID for the node
321-
nodeID := uuid.New().String()
294+
nodeID := newNodeID()
322295

323296
// Create the node
324297
node := &nodev1.Node{
@@ -536,3 +509,112 @@ func (m *MemoryJobDistributor) AddKeypair(keypair *csav1.Keypair) {
536509
m.mu.Unlock()
537510
}
538511
}
512+
513+
// getJobByUUID retrieves a job by its UUID and node ID.
514+
func (m *MemoryJobDistributor) getJobByUUIDAndNodeID(uuid string, nodeID string) (*jobv1.Job, error) {
515+
for _, job := range m.jobs {
516+
if job.Uuid == uuid && job.NodeId == nodeID {
517+
return job, nil
518+
}
519+
}
520+
521+
return nil, fmt.Errorf("job with uuid %s not found", uuid)
522+
}
523+
524+
// getNextRevisionNum returns the next revision number for a given job ID.
525+
func (m *MemoryJobDistributor) getNextRevisionNum(jobID string) int64 {
526+
return m.proposalsByJobCount(jobID) + 1
527+
}
528+
529+
// getHighestRevisionProposalByJobID returns the proposal with the highest revision number for a
530+
// given job ID.
531+
func (m *MemoryJobDistributor) getHighestRevisionProposalByJobID(jobID string) *jobv1.Proposal {
532+
var prop *jobv1.Proposal
533+
var maxRevision int64 = -1
534+
535+
for _, p := range m.proposals {
536+
if p.JobId == jobID && p.Revision > maxRevision {
537+
prop = p
538+
maxRevision = prop.Revision
539+
}
540+
}
541+
542+
return prop
543+
}
544+
545+
// proposalsByJobCount returns the number of proposals for a given job ID.
546+
func (m *MemoryJobDistributor) proposalsByJobCount(jobID string) int64 {
547+
count := 0
548+
for _, proposal := range m.proposals {
549+
if proposal.JobId == jobID {
550+
count++
551+
}
552+
}
553+
554+
return int64(count)
555+
}
556+
557+
// upsertProposal upserts a proposal for a given spec
558+
func (m *MemoryJobDistributor) upsertProposal(
559+
nodeID string, spec string, labels []*ptypes.Label,
560+
) (*jobv1.Proposal, error) {
561+
specUUID, err := getSpecUUID(spec)
562+
if err != nil {
563+
return nil, status.Errorf(codes.InvalidArgument, "invalid spec: %v", err)
564+
}
565+
566+
// Generate unique IDs
567+
proposalID := newProposalID()
568+
jobID := newJobID()
569+
now := timestamppb.Now()
570+
571+
// If the job already exists, we use the existing job ID, otherwise we create a new job
572+
if job, _ := m.getJobByUUIDAndNodeID(specUUID.String(), nodeID); job != nil {
573+
jobID = job.Id
574+
575+
m.jobs[jobID].ProposalIds = append(m.jobs[jobID].ProposalIds, proposalID)
576+
} else {
577+
m.jobs[jobID] = &jobv1.Job{
578+
Id: jobID,
579+
Uuid: specUUID.String(),
580+
NodeId: nodeID,
581+
ProposalIds: []string{proposalID},
582+
Labels: labels,
583+
CreatedAt: now,
584+
UpdatedAt: now,
585+
}
586+
}
587+
588+
// Insert the proposal
589+
m.proposals[proposalID] = &jobv1.Proposal{
590+
Id: proposalID,
591+
Revision: m.getNextRevisionNum(jobID),
592+
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED,
593+
DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED,
594+
JobId: jobID,
595+
Spec: spec,
596+
CreatedAt: now,
597+
UpdatedAt: now,
598+
}
599+
600+
return m.proposals[proposalID], nil
601+
}
602+
603+
// getSpecUUID extracts the UUID from a spec
604+
func getSpecUUID(spec string) (uuid.UUID, error) {
605+
s := struct {
606+
ExternalJobID *uuid.UUID `toml:"externalJobID,omitempty"`
607+
}{}
608+
609+
d := toml.NewDecoder(strings.NewReader(spec))
610+
611+
if err := d.Decode(&s); err != nil {
612+
return uuid.Nil, err
613+
}
614+
615+
if s.ExternalJobID == nil {
616+
return uuid.Nil, errors.New("externalJobID field not found in spec")
617+
}
618+
619+
return *s.ExternalJobID, nil
620+
}

0 commit comments

Comments
 (0)