Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-otters-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": minor
---

Aligns MemoryJobDistributor `ProposeJob` and `RevokeJob` to have the same functionality as the JobDistributor service
27 changes: 27 additions & 0 deletions offchain/jd/memory/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package memory

import "github.com/segmentio/ksuid"

// newProposalID generates a new proposal ID.
func newProposalID() string {
return newID("prop")
}

// newJobID generates a new job ID.
func newJobID() string {
return newID("job")
}

// newNodeID generates a new node ID.
func newNodeID() string {
return newID("node")
}

// newID generates a new ID with a given prefix.
//
// This uses ksuid to generate a unique ID which differs from the Job Distributor ID format, to
// better differentiate from the UUID that are set on the job. Each ID should be prefixed with a
// string to identify the type of ID.
func newID(prefix string) string {
return prefix + "_" + ksuid.New().String()
}
82 changes: 82 additions & 0 deletions offchain/jd/memory/id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package memory

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewProposalID(t *testing.T) {
t.Parallel()

id := newProposalID()

assert.NotEmpty(t, id)
assert.True(t, strings.HasPrefix(id, "prop_"))
}

func TestNewJobID(t *testing.T) {
t.Parallel()

id := newJobID()

assert.NotEmpty(t, id)
assert.True(t, strings.HasPrefix(id, "job_"))
}

func TestNewNodeID(t *testing.T) {
t.Parallel()

id := newNodeID()

assert.NotEmpty(t, id)
assert.True(t, strings.HasPrefix(id, "node_"))
}

func TestNewID(t *testing.T) {
t.Parallel()

tests := []struct {
name string
prefix string
wantPrefix string
}{
{
name: "empty prefix",
prefix: "",
wantPrefix: "_",
},
{
name: "single character prefix",
prefix: "a",
wantPrefix: "a_",
},
{
name: "multi character prefix",
prefix: "test",
wantPrefix: "test_",
},
{
name: "prefix with numbers",
prefix: "test123",
wantPrefix: "test123_",
},
{
name: "prefix with special characters",
prefix: "test-123",
wantPrefix: "test-123_",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

id := newID(tt.prefix)

assert.NotEmpty(t, id)
assert.True(t, strings.HasPrefix(id, tt.wantPrefix))
})
}
}
214 changes: 148 additions & 66 deletions offchain/jd/memory/memory_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package memory

import (
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/pelletier/go-toml/v2"
csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -48,36 +53,21 @@ func NewMemoryJobDistributor() *MemoryJobDistributor {
// Job Service Methods

// ProposeJob creates a new job proposal and stores it in memory.
func (m *MemoryJobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) {
func (m *MemoryJobDistributor) ProposeJob(
ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption,
) (*jobv1.ProposeJobResponse, error) {
if in == nil {
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
}

// Generate unique IDs
proposalID := uuid.New().String()
jobID := uuid.New().String()

// Create the proposal
proposal := &jobv1.Proposal{
Id: proposalID,
JobId: jobID,
Spec: in.Spec,
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED,
}
m.mu.Lock()
defer m.mu.Unlock()

// Also create a job based on the proposal
job := &jobv1.Job{
Id: jobID,
Uuid: uuid.New().String(),
NodeId: in.NodeId,
Labels: in.Labels,
proposal, err := m.upsertProposal(in.NodeId, in.Spec, in.Labels)
if err != nil {
return nil, err
}

m.mu.Lock()
m.proposals[proposalID] = proposal
m.jobs[jobID] = job
m.mu.Unlock()

return &jobv1.ProposeJobResponse{
Proposal: proposal,
}, nil
Expand Down Expand Up @@ -179,29 +169,11 @@ func (m *MemoryJobDistributor) BatchProposeJob(ctx context.Context, in *jobv1.Ba
m.mu.Lock()
// Create a proposal for each node
for _, nodeID := range in.GetNodeIds() {
// Generate unique IDs
proposalID := uuid.New().String()
jobID := uuid.New().String()

// Create the proposal
proposal := &jobv1.Proposal{
Id: proposalID,
JobId: jobID,
Spec: in.GetSpec(),
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED,
proposal, err := m.upsertProposal(nodeID, in.GetSpec(), in.GetLabels())
if err != nil {
return nil, err
}
Comment on lines -182 to 192

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all moved into a shared function which can be used for batch proposals as well


// Store the proposal
m.proposals[proposalID] = proposal

// Also create and store a job based on the proposal
job := &jobv1.Job{
Id: jobID,
NodeId: nodeID,
Labels: in.GetLabels(),
}
m.jobs[jobID] = job

// Add to success responses
successResponses[nodeID] = &jobv1.ProposeJobResponse{
Proposal: proposal,
Expand All @@ -220,38 +192,39 @@ func (m *MemoryJobDistributor) RevokeJob(ctx context.Context, in *jobv1.RevokeJo
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
}

// Use GetId() method to access the oneof field
jobID := in.GetId()
var jobID string
switch idOneof := in.GetIdOneof().(type) {
case *jobv1.RevokeJobRequest_Id:
if idOneof.Id == "" {
return nil, status.Error(codes.InvalidArgument, "job id must be provided")
}

if jobID == "" {
return nil, status.Error(codes.InvalidArgument, "job id must be provided")
jobID = idOneof.Id
case *jobv1.RevokeJobRequest_Uuid:
return nil, status.Error(codes.InvalidArgument, "uuid is not supported")
}

m.mu.Lock()
// Find the proposal associated with this job
var foundProposal *jobv1.Proposal
for _, proposal := range m.proposals {
if proposal.JobId == jobID {
foundProposal = proposal
break
}
}
defer m.mu.Unlock()

if foundProposal != nil {
foundProposal.Status = jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED
// Find the proposal with the highest revision number associated with this job
prop := m.getHighestRevisionProposalByJobID(jobID)
if prop == nil {
return nil, status.Errorf(codes.NotFound, "proposal with job id %s not found", jobID)
}
m.mu.Unlock()

if foundProposal != nil {
return &jobv1.RevokeJobResponse{
Proposal: foundProposal,
}, nil
if !slices.Contains([]jobv1.ProposalStatus{jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, jobv1.ProposalStatus_PROPOSAL_STATUS_CANCELLED}, prop.Status) {
return nil, errors.New("job cannot be revoked")
}

return &jobv1.RevokeJobResponse{}, nil
prop.Status = jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED

return &jobv1.RevokeJobResponse{
Proposal: prop,
}, nil
}

// DeleteJob removes a job from memory.
// DeleteJob soft deletes a job, setting the DeletedAt field to the current time.
func (m *MemoryJobDistributor) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) {
if in == nil {
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
Expand Down Expand Up @@ -318,7 +291,7 @@ func (m *MemoryJobDistributor) RegisterNode(ctx context.Context, in *nodev1.Regi
}

// Generate a new ID for the node
nodeID := uuid.New().String()
nodeID := newNodeID()

// Create the node
node := &nodev1.Node{
Expand Down Expand Up @@ -536,3 +509,112 @@ func (m *MemoryJobDistributor) AddKeypair(keypair *csav1.Keypair) {
m.mu.Unlock()
}
}

// getJobByUUID retrieves a job by its UUID and node ID.
func (m *MemoryJobDistributor) getJobByUUIDAndNodeID(uuid string, nodeID string) (*jobv1.Job, error) {
for _, job := range m.jobs {
if job.Uuid == uuid && job.NodeId == nodeID {
return job, nil
}
}

return nil, fmt.Errorf("job with uuid %s not found", uuid)
}

// getNextRevisionNum returns the next revision number for a given job ID.
func (m *MemoryJobDistributor) getNextRevisionNum(jobID string) int64 {
return m.proposalsByJobCount(jobID) + 1
}

// getHighestRevisionProposalByJobID returns the proposal with the highest revision number for a
// given job ID.
func (m *MemoryJobDistributor) getHighestRevisionProposalByJobID(jobID string) *jobv1.Proposal {
var prop *jobv1.Proposal
var maxRevision int64 = -1

for _, p := range m.proposals {
if p.JobId == jobID && p.Revision > maxRevision {
prop = p
maxRevision = prop.Revision
}
}

return prop
}

// proposalsByJobCount returns the number of proposals for a given job ID.
func (m *MemoryJobDistributor) proposalsByJobCount(jobID string) int64 {
count := 0
for _, proposal := range m.proposals {
if proposal.JobId == jobID {
count++
}
}

return int64(count)
}

// upsertProposal upserts a proposal for a given spec
func (m *MemoryJobDistributor) upsertProposal(
nodeID string, spec string, labels []*ptypes.Label,
) (*jobv1.Proposal, error) {
specUUID, err := getSpecUUID(spec)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid spec: %v", err)
}

// Generate unique IDs
proposalID := newProposalID()
jobID := newJobID()
now := timestamppb.Now()

// If the job already exists, we use the existing job ID, otherwise we create a new job
if job, _ := m.getJobByUUIDAndNodeID(specUUID.String(), nodeID); job != nil {
jobID = job.Id

m.jobs[jobID].ProposalIds = append(m.jobs[jobID].ProposalIds, proposalID)
} else {
m.jobs[jobID] = &jobv1.Job{
Id: jobID,
Uuid: specUUID.String(),
NodeId: nodeID,
ProposalIds: []string{proposalID},
Labels: labels,
CreatedAt: now,
UpdatedAt: now,
}
}

// Insert the proposal
m.proposals[proposalID] = &jobv1.Proposal{
Id: proposalID,
Revision: m.getNextRevisionNum(jobID),
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED,
DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED,
JobId: jobID,
Spec: spec,
CreatedAt: now,
UpdatedAt: now,
}

return m.proposals[proposalID], nil
}

// getSpecUUID extracts the UUID from a spec
func getSpecUUID(spec string) (uuid.UUID, error) {
s := struct {
ExternalJobID *uuid.UUID `toml:"externalJobID,omitempty"`
}{}

d := toml.NewDecoder(strings.NewReader(spec))

if err := d.Decode(&s); err != nil {
return uuid.Nil, err
}

if s.ExternalJobID == nil {
return uuid.Nil, errors.New("externalJobID field not found in spec")
}

return *s.ExternalJobID, nil
}
Loading
Loading