diff --git a/.changeset/mean-otters-sink.md b/.changeset/mean-otters-sink.md new file mode 100644 index 000000000..711c8a6a6 --- /dev/null +++ b/.changeset/mean-otters-sink.md @@ -0,0 +1,5 @@ +--- +"chainlink-deployments-framework": minor +--- + +Aligns MemoryJobDistributor `ProposeJob` and `RevokeJob` to have the same functionality as the JobDistributor service diff --git a/offchain/jd/memory/id.go b/offchain/jd/memory/id.go new file mode 100644 index 000000000..728cfe7c2 --- /dev/null +++ b/offchain/jd/memory/id.go @@ -0,0 +1,27 @@ +package memory + +import "github.com/segmentio/ksuid" + +// newProposalID generates a new proposal ID. +func newProposalID() string { + return newID("prop") +} + +// newJobID generates a new job ID. +func newJobID() string { + return newID("job") +} + +// newNodeID generates a new node ID. +func newNodeID() string { + return newID("node") +} + +// newID generates a new ID with a given prefix. +// +// This uses ksuid to generate a unique ID which differs from the Job Distributor ID format, to +// better differentiate from the UUID that are set on the job. Each ID should be prefixed with a +// string to identify the type of ID. +func newID(prefix string) string { + return prefix + "_" + ksuid.New().String() +} diff --git a/offchain/jd/memory/id_test.go b/offchain/jd/memory/id_test.go new file mode 100644 index 000000000..8faa97514 --- /dev/null +++ b/offchain/jd/memory/id_test.go @@ -0,0 +1,82 @@ +package memory + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewProposalID(t *testing.T) { + t.Parallel() + + id := newProposalID() + + assert.NotEmpty(t, id) + assert.True(t, strings.HasPrefix(id, "prop_")) +} + +func TestNewJobID(t *testing.T) { + t.Parallel() + + id := newJobID() + + assert.NotEmpty(t, id) + assert.True(t, strings.HasPrefix(id, "job_")) +} + +func TestNewNodeID(t *testing.T) { + t.Parallel() + + id := newNodeID() + + assert.NotEmpty(t, id) + assert.True(t, strings.HasPrefix(id, "node_")) +} + +func TestNewID(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + prefix string + wantPrefix string + }{ + { + name: "empty prefix", + prefix: "", + wantPrefix: "_", + }, + { + name: "single character prefix", + prefix: "a", + wantPrefix: "a_", + }, + { + name: "multi character prefix", + prefix: "test", + wantPrefix: "test_", + }, + { + name: "prefix with numbers", + prefix: "test123", + wantPrefix: "test123_", + }, + { + name: "prefix with special characters", + prefix: "test-123", + wantPrefix: "test-123_", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + id := newID(tt.prefix) + + assert.NotEmpty(t, id) + assert.True(t, strings.HasPrefix(id, tt.wantPrefix)) + }) + } +} diff --git a/offchain/jd/memory/memory_client.go b/offchain/jd/memory/memory_client.go index 093a35ef6..49f7ea29e 100644 --- a/offchain/jd/memory/memory_client.go +++ b/offchain/jd/memory/memory_client.go @@ -2,14 +2,19 @@ package memory import ( "context" + "errors" "fmt" + "slices" + "strings" "sync" "time" "github.com/google/uuid" + "github.com/pelletier/go-toml/v2" csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa" jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -48,36 +53,21 @@ func NewMemoryJobDistributor() *MemoryJobDistributor { // Job Service Methods // ProposeJob creates a new job proposal and stores it in memory. -func (m *MemoryJobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) { +func (m *MemoryJobDistributor) ProposeJob( + ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption, +) (*jobv1.ProposeJobResponse, error) { if in == nil { return nil, status.Error(codes.InvalidArgument, "request cannot be nil") } - // Generate unique IDs - proposalID := uuid.New().String() - jobID := uuid.New().String() - - // Create the proposal - proposal := &jobv1.Proposal{ - Id: proposalID, - JobId: jobID, - Spec: in.Spec, - Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, - } + m.mu.Lock() + defer m.mu.Unlock() - // Also create a job based on the proposal - job := &jobv1.Job{ - Id: jobID, - Uuid: uuid.New().String(), - NodeId: in.NodeId, - Labels: in.Labels, + proposal, err := m.upsertProposal(in.NodeId, in.Spec, in.Labels) + if err != nil { + return nil, err } - m.mu.Lock() - m.proposals[proposalID] = proposal - m.jobs[jobID] = job - m.mu.Unlock() - return &jobv1.ProposeJobResponse{ Proposal: proposal, }, nil @@ -179,29 +169,11 @@ func (m *MemoryJobDistributor) BatchProposeJob(ctx context.Context, in *jobv1.Ba m.mu.Lock() // Create a proposal for each node for _, nodeID := range in.GetNodeIds() { - // Generate unique IDs - proposalID := uuid.New().String() - jobID := uuid.New().String() - - // Create the proposal - proposal := &jobv1.Proposal{ - Id: proposalID, - JobId: jobID, - Spec: in.GetSpec(), - Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, + proposal, err := m.upsertProposal(nodeID, in.GetSpec(), in.GetLabels()) + if err != nil { + return nil, err } - // Store the proposal - m.proposals[proposalID] = proposal - - // Also create and store a job based on the proposal - job := &jobv1.Job{ - Id: jobID, - NodeId: nodeID, - Labels: in.GetLabels(), - } - m.jobs[jobID] = job - // Add to success responses successResponses[nodeID] = &jobv1.ProposeJobResponse{ Proposal: proposal, @@ -220,38 +192,39 @@ func (m *MemoryJobDistributor) RevokeJob(ctx context.Context, in *jobv1.RevokeJo return nil, status.Error(codes.InvalidArgument, "request cannot be nil") } - // Use GetId() method to access the oneof field - jobID := in.GetId() + var jobID string + switch idOneof := in.GetIdOneof().(type) { + case *jobv1.RevokeJobRequest_Id: + if idOneof.Id == "" { + return nil, status.Error(codes.InvalidArgument, "job id must be provided") + } - if jobID == "" { - return nil, status.Error(codes.InvalidArgument, "job id must be provided") + jobID = idOneof.Id + case *jobv1.RevokeJobRequest_Uuid: + return nil, status.Error(codes.InvalidArgument, "uuid is not supported") } m.mu.Lock() - // Find the proposal associated with this job - var foundProposal *jobv1.Proposal - for _, proposal := range m.proposals { - if proposal.JobId == jobID { - foundProposal = proposal - break - } - } + defer m.mu.Unlock() - if foundProposal != nil { - foundProposal.Status = jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED + // Find the proposal with the highest revision number associated with this job + prop := m.getHighestRevisionProposalByJobID(jobID) + if prop == nil { + return nil, status.Errorf(codes.NotFound, "proposal with job id %s not found", jobID) } - m.mu.Unlock() - if foundProposal != nil { - return &jobv1.RevokeJobResponse{ - Proposal: foundProposal, - }, nil + if !slices.Contains([]jobv1.ProposalStatus{jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, jobv1.ProposalStatus_PROPOSAL_STATUS_CANCELLED}, prop.Status) { + return nil, errors.New("job cannot be revoked") } - return &jobv1.RevokeJobResponse{}, nil + prop.Status = jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED + + return &jobv1.RevokeJobResponse{ + Proposal: prop, + }, nil } -// DeleteJob removes a job from memory. +// DeleteJob soft deletes a job, setting the DeletedAt field to the current time. func (m *MemoryJobDistributor) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) { if in == nil { return nil, status.Error(codes.InvalidArgument, "request cannot be nil") @@ -318,7 +291,7 @@ func (m *MemoryJobDistributor) RegisterNode(ctx context.Context, in *nodev1.Regi } // Generate a new ID for the node - nodeID := uuid.New().String() + nodeID := newNodeID() // Create the node node := &nodev1.Node{ @@ -536,3 +509,112 @@ func (m *MemoryJobDistributor) AddKeypair(keypair *csav1.Keypair) { m.mu.Unlock() } } + +// getJobByUUID retrieves a job by its UUID and node ID. +func (m *MemoryJobDistributor) getJobByUUIDAndNodeID(uuid string, nodeID string) (*jobv1.Job, error) { + for _, job := range m.jobs { + if job.Uuid == uuid && job.NodeId == nodeID { + return job, nil + } + } + + return nil, fmt.Errorf("job with uuid %s not found", uuid) +} + +// getNextRevisionNum returns the next revision number for a given job ID. +func (m *MemoryJobDistributor) getNextRevisionNum(jobID string) int64 { + return m.proposalsByJobCount(jobID) + 1 +} + +// getHighestRevisionProposalByJobID returns the proposal with the highest revision number for a +// given job ID. +func (m *MemoryJobDistributor) getHighestRevisionProposalByJobID(jobID string) *jobv1.Proposal { + var prop *jobv1.Proposal + var maxRevision int64 = -1 + + for _, p := range m.proposals { + if p.JobId == jobID && p.Revision > maxRevision { + prop = p + maxRevision = prop.Revision + } + } + + return prop +} + +// proposalsByJobCount returns the number of proposals for a given job ID. +func (m *MemoryJobDistributor) proposalsByJobCount(jobID string) int64 { + count := 0 + for _, proposal := range m.proposals { + if proposal.JobId == jobID { + count++ + } + } + + return int64(count) +} + +// upsertProposal upserts a proposal for a given spec +func (m *MemoryJobDistributor) upsertProposal( + nodeID string, spec string, labels []*ptypes.Label, +) (*jobv1.Proposal, error) { + specUUID, err := getSpecUUID(spec) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid spec: %v", err) + } + + // Generate unique IDs + proposalID := newProposalID() + jobID := newJobID() + now := timestamppb.Now() + + // If the job already exists, we use the existing job ID, otherwise we create a new job + if job, _ := m.getJobByUUIDAndNodeID(specUUID.String(), nodeID); job != nil { + jobID = job.Id + + m.jobs[jobID].ProposalIds = append(m.jobs[jobID].ProposalIds, proposalID) + } else { + m.jobs[jobID] = &jobv1.Job{ + Id: jobID, + Uuid: specUUID.String(), + NodeId: nodeID, + ProposalIds: []string{proposalID}, + Labels: labels, + CreatedAt: now, + UpdatedAt: now, + } + } + + // Insert the proposal + m.proposals[proposalID] = &jobv1.Proposal{ + Id: proposalID, + Revision: m.getNextRevisionNum(jobID), + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + JobId: jobID, + Spec: spec, + CreatedAt: now, + UpdatedAt: now, + } + + return m.proposals[proposalID], nil +} + +// getSpecUUID extracts the UUID from a spec +func getSpecUUID(spec string) (uuid.UUID, error) { + s := struct { + ExternalJobID *uuid.UUID `toml:"externalJobID,omitempty"` + }{} + + d := toml.NewDecoder(strings.NewReader(spec)) + + if err := d.Decode(&s); err != nil { + return uuid.Nil, err + } + + if s.ExternalJobID == nil { + return uuid.Nil, errors.New("externalJobID field not found in spec") + } + + return *s.ExternalJobID, nil +} diff --git a/offchain/jd/memory/memory_client_test.go b/offchain/jd/memory/memory_client_test.go index 81d5e6ca2..a4de758f5 100644 --- a/offchain/jd/memory/memory_client_test.go +++ b/offchain/jd/memory/memory_client_test.go @@ -1,8 +1,11 @@ package memory import ( + "fmt" + "strings" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,34 +17,132 @@ import ( "github.com/smartcontractkit/chainlink-deployments-framework/internal/pointer" ) +var ( + // A unique spec external job id for testing + spec1UUID = uuid.New().String() + // A minimally viable spec1 for testing with an external job id + spec1 = fmt.Sprintf("externalJobID = '%s'", spec1UUID) + + // An additional spec with a different external job id for testing + spec2UUID = uuid.New().String() + // A minimally viable spec2 for testing with an external job id + spec2 = fmt.Sprintf("externalJobID = '%s'", spec2UUID) +) + func TestMemoryJobDistributor_ProposeJob(t *testing.T) { t.Parallel() + var ( + labels = []*ptypes.Label{ + {Key: "environment", Value: pointer.To("prod")}, + } + ) + t.Run("successfully propose a job", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() - ctx := t.Context() req := &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "test job spec", + Spec: spec1, + Labels: labels, } - resp, err := client.ProposeJob(ctx, req) + resp, err := client.ProposeJob(t.Context(), req) require.NoError(t, err) require.NotNil(t, resp) require.NotNil(t, resp.Proposal) + assertProposalEquals(t, &jobv1.Proposal{ + Revision: 1, + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + JobId: resp.Proposal.JobId, + Spec: spec1, + }, resp.Proposal) + + // Check the job + getResp, err := client.GetJob(t.Context(), &jobv1.GetJobRequest{ + IdOneof: &jobv1.GetJobRequest_Id{Id: resp.Proposal.JobId}, + }) + require.NoError(t, err) + require.NotNil(t, getResp) + require.NotNil(t, getResp.Job) + assertJobEquals(t, &jobv1.Job{ + Id: resp.Proposal.JobId, + Uuid: spec1UUID, + NodeId: req.NodeId, + ProposalIds: []string{resp.Proposal.Id}, + Labels: labels, + }, getResp.Job) + }) - assert.NotEmpty(t, resp.Proposal.Id) - assert.NotEmpty(t, resp.Proposal.JobId) - assert.Equal(t, "test job spec", resp.Proposal.Spec) - assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + t.Run("propose job with same external job id returns same job id", func(t *testing.T) { + t.Parallel() + + client := NewMemoryJobDistributor() + ctx := t.Context() + req := &jobv1.ProposeJobRequest{ + NodeId: "test-node-1", + Spec: spec1, + Labels: labels, + } + + resp1, err := client.ProposeJob(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp1) + assertProposalEquals(t, &jobv1.Proposal{ + Revision: 1, + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + JobId: resp1.Proposal.JobId, + Spec: spec1, + }, resp1.Proposal) + + resp2, err := client.ProposeJob(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp2) + assertProposalEquals(t, &jobv1.Proposal{ + Revision: 2, + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + JobId: resp1.Proposal.JobId, + Spec: spec1, + }, resp2.Proposal) + + // Check the job + getResp, err := client.GetJob(t.Context(), &jobv1.GetJobRequest{ + IdOneof: &jobv1.GetJobRequest_Id{Id: resp1.Proposal.JobId}, + }) + require.NoError(t, err) + require.NotNil(t, getResp) + require.NotNil(t, getResp.Job) + assertJobEquals(t, &jobv1.Job{ + Id: resp1.Proposal.JobId, + Uuid: spec1UUID, + NodeId: req.NodeId, + ProposalIds: []string{resp1.Proposal.Id, resp2.Proposal.Id}, + Labels: labels, + }, getResp.Job) }) - t.Run("nil request returns error", func(t *testing.T) { + t.Run("spec is invalid", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() - resp, err := client.ProposeJob(ctx, nil) + _, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ + NodeId: "test-node-1", + Spec: "invalid spec", + }) + + require.ErrorContains(t, err, "invalid spec") + }) + + t.Run("nil request returns error", func(t *testing.T) { + t.Parallel() + + client := NewMemoryJobDistributor() + resp, err := client.ProposeJob(t.Context(), nil) require.ErrorContains(t, err, "request cannot be nil") assert.Nil(t, resp) }) @@ -57,7 +158,7 @@ func TestMemoryJobDistributor_GetJob(t *testing.T) { // First create a job via proposal proposeResp, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "test job spec", + Spec: spec1, }) require.NoError(t, err) jobID := proposeResp.Proposal.JobId @@ -68,9 +169,13 @@ func TestMemoryJobDistributor_GetJob(t *testing.T) { }) require.NoError(t, err) require.NotNil(t, getResp) - require.NotNil(t, getResp.Job) - assert.Equal(t, jobID, getResp.Job.Id) + assertJobEquals(t, &jobv1.Job{ + Id: jobID, + Uuid: spec1UUID, + NodeId: "test-node-1", + ProposalIds: []string{proposeResp.Proposal.Id}, + }, getResp.Job) }) t.Run("get non-existent job returns error", func(t *testing.T) { @@ -115,7 +220,7 @@ func TestMemoryJobDistributor_ListJobs(t *testing.T) { // Create multiple jobs _, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "job spec 1", + Spec: spec1, Labels: []*ptypes.Label{ {Key: "environment", Value: pointer.To("prod")}, }, @@ -124,7 +229,7 @@ func TestMemoryJobDistributor_ListJobs(t *testing.T) { _, err = client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-2", - Spec: "job spec 2", + Spec: spec2, }) require.NoError(t, err) @@ -161,7 +266,7 @@ func TestMemoryJobDistributor_ListJobs(t *testing.T) { // Create a job proposeResp, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "job spec 1", + Spec: spec1, }) require.NoError(t, err) jobID := proposeResp.Proposal.JobId @@ -207,12 +312,13 @@ func TestMemoryJobDistributor_GetProposal(t *testing.T) { t.Run("get existing proposal", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() // Create a proposal proposeResp, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "test job spec", + Spec: spec1, }) require.NoError(t, err) proposalID := proposeResp.Proposal.Id @@ -221,14 +327,19 @@ func TestMemoryJobDistributor_GetProposal(t *testing.T) { getResp, err := client.GetProposal(ctx, &jobv1.GetProposalRequest{Id: proposalID}) require.NoError(t, err) require.NotNil(t, getResp) - require.NotNil(t, getResp.Proposal) - - assert.Equal(t, proposalID, getResp.Proposal.Id) - assert.Equal(t, "test job spec", getResp.Proposal.Spec) + assertProposalEquals(t, &jobv1.Proposal{ + Id: proposalID, + Revision: 1, + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + JobId: proposeResp.Proposal.JobId, + Spec: spec1, + }, getResp.Proposal) }) t.Run("get non-existent proposal returns error", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() resp, err := client.GetProposal(ctx, &jobv1.GetProposalRequest{Id: "non-existent"}) @@ -242,18 +353,19 @@ func TestMemoryJobDistributor_ListProposals(t *testing.T) { t.Run("list proposals returns all proposals", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() // Create multiple proposals _, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "job spec 1", + Spec: spec1, }) require.NoError(t, err) _, err = client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-2", - Spec: "job spec 2", + Spec: spec1, }) require.NoError(t, err) @@ -275,21 +387,26 @@ func TestMemoryJobDistributor_BatchProposeJob(t *testing.T) { ctx := t.Context() req := &jobv1.BatchProposeJobRequest{ NodeIds: []string{"node-1", "node-2", "node-3"}, - Spec: "shared spec", + Spec: spec1, } resp, err := client.BatchProposeJob(ctx, req) require.NoError(t, err) require.NotNil(t, resp) - assert.Len(t, resp.SuccessResponses, 3) + require.Len(t, resp.SuccessResponses, 3) for _, nodeID := range req.NodeIds { propResp, exists := resp.SuccessResponses[nodeID] assert.True(t, exists, "missing response for node %s", nodeID) - assert.NotNil(t, propResp.Proposal) - assert.NotEmpty(t, propResp.Proposal.Id) - assert.NotEmpty(t, propResp.Proposal.JobId) - assert.Equal(t, "shared spec", propResp.Proposal.Spec) + + assertProposalEquals(t, &jobv1.Proposal{ + Id: propResp.Proposal.Id, + Revision: 1, + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + JobId: propResp.Proposal.JobId, + Spec: spec1, + }, propResp.Proposal) } // Verify jobs were created @@ -300,6 +417,7 @@ func TestMemoryJobDistributor_BatchProposeJob(t *testing.T) { t.Run("nil request returns error", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() resp, err := client.BatchProposeJob(ctx, nil) @@ -311,18 +429,89 @@ func TestMemoryJobDistributor_BatchProposeJob(t *testing.T) { func TestMemoryJobDistributor_RevokeJob(t *testing.T) { t.Parallel() + // Setup a job and proposal and return the client, proposal id, and job id + setup := func(t *testing.T) (*MemoryJobDistributor, string, string) { + t.Helper() + + client := NewMemoryJobDistributor() + + proposeResp, err := client.ProposeJob(t.Context(), &jobv1.ProposeJobRequest{ + NodeId: "test-node-1", + Spec: spec1, + }) + require.NoError(t, err) + + jobID := proposeResp.Proposal.JobId + propID := proposeResp.Proposal.Id + + return client, propID, jobID + } + t.Run("revoke existing job", func(t *testing.T) { t.Parallel() - client := NewMemoryJobDistributor() + ctx := t.Context() - // Create a job + client, propID, jobID := setup(t) + + // Revoke the job + revokeResp, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Id{Id: jobID}, + }) + require.NoError(t, err) + require.NotNil(t, revokeResp) + + // Verify proposal status changed + getResp, err := client.GetProposal(ctx, &jobv1.GetProposalRequest{Id: propID}) + require.NoError(t, err) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED, getResp.Proposal.Status) + }) + + t.Run("revoke with UUID is not supported", func(t *testing.T) { + t.Parallel() + + client, _, _ := setup(t) + ctx := t.Context() + _, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Uuid{Uuid: uuid.New().String()}, + }) + require.ErrorContains(t, err, "uuid is not supported") + }) + + t.Run("revoke non-existent job returns error", func(t *testing.T) { + t.Parallel() + + client, _, _ := setup(t) + ctx := t.Context() + _, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Id{Id: "non-existent"}, + }) + require.ErrorContains(t, err, "not found") + }) + + t.Run("revoke job with invalid id returns error", func(t *testing.T) { + t.Parallel() + + client, _, _ := setup(t) + ctx := t.Context() + _, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Id{Id: ""}, + }) + require.ErrorContains(t, err, "job id must be provided") + }) + + t.Run("highest revision proposal is revoked", func(t *testing.T) { + t.Parallel() + + client, prop1ID, jobID := setup(t) + ctx := t.Context() + + // Propose another job with the same job id to get a new proposal with a higher revision + // number proposeResp, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "test job spec", + Spec: spec1, }) require.NoError(t, err) - jobID := proposeResp.Proposal.JobId - proposalID := proposeResp.Proposal.Id // Revoke the job revokeResp, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ @@ -331,14 +520,50 @@ func TestMemoryJobDistributor_RevokeJob(t *testing.T) { require.NoError(t, err) require.NotNil(t, revokeResp) + // Verify original proposal status is not revoked + getResp, err := client.GetProposal(ctx, &jobv1.GetProposalRequest{Id: prop1ID}) + require.NoError(t, err) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_PROPOSED, getResp.Proposal.Status) + // Verify proposal status changed - getResp, err := client.GetProposal(ctx, &jobv1.GetProposalRequest{Id: proposalID}) + getResp2, err := client.GetProposal(ctx, &jobv1.GetProposalRequest{Id: proposeResp.Proposal.Id}) require.NoError(t, err) - assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED, getResp.Proposal.Status) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_REVOKED, getResp2.Proposal.Status) + }) + + t.Run("job has no proposals (should never happen) returns error", func(t *testing.T) { + t.Parallel() + + client, propID, jobID := setup(t) + ctx := t.Context() + + // Delete the proposal by accessing the map directly + delete(client.proposals, propID) + + _, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Id{Id: jobID}, + }) + require.ErrorContains(t, err, "not found") + }) + + t.Run("job is not in the correct state to be revoked returns error", func(t *testing.T) { + t.Parallel() + + client, propID, jobID := setup(t) + ctx := t.Context() + + // Update the proposal status to a state that is not allowed to be revoked + client.proposals[propID].Status = jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED + + _, err := client.RevokeJob(ctx, &jobv1.RevokeJobRequest{ + IdOneof: &jobv1.RevokeJobRequest_Id{Id: jobID}, + }) + require.ErrorContains(t, err, "job cannot be revoked") }) t.Run("nil request returns error", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() resp, err := client.RevokeJob(ctx, nil) @@ -352,12 +577,13 @@ func TestMemoryJobDistributor_DeleteJob(t *testing.T) { t.Run("delete existing job", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() // Create a job proposeResp, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "test job spec", + Spec: spec1, }) require.NoError(t, err) jobID := proposeResp.Proposal.JobId @@ -374,12 +600,12 @@ func TestMemoryJobDistributor_DeleteJob(t *testing.T) { IdOneof: &jobv1.GetJobRequest_Id{Id: jobID}, }) require.NoError(t, err) - assert.NotNil(t, getResp.Job.DeletedAt) }) t.Run("delete non-existent job does errors", func(t *testing.T) { t.Parallel() + client := NewMemoryJobDistributor() ctx := t.Context() _, err := client.DeleteJob(ctx, &jobv1.DeleteJobRequest{ @@ -400,7 +626,7 @@ func TestMemoryJobDistributor_UpdateJob(t *testing.T) { value := "testvalue" proposeResp, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ NodeId: "test-node-1", - Spec: "original spec", + Spec: spec1, Labels: []*ptypes.Label{ {Key: "env", Value: &value}, }, @@ -808,3 +1034,30 @@ func TestMemoryJobDistributor_ListKeypairs(t *testing.T) { assert.Empty(t, listResp.Keypairs) }) } + +// assertJobEquals asserts that two jobv1.Job objects are functionally equal. +func assertJobEquals(t *testing.T, want *jobv1.Job, actual *jobv1.Job) { + t.Helper() + + assert.True(t, strings.HasPrefix(actual.Id, "job_")) + assert.ElementsMatch(t, want.ProposalIds, actual.ProposalIds) + assert.Equal(t, want.NodeId, actual.NodeId) + assert.Equal(t, want.Uuid, actual.Uuid) + assert.ElementsMatch(t, want.Labels, actual.Labels) + assert.NotNil(t, actual.CreatedAt) + assert.NotNil(t, actual.UpdatedAt) +} + +// assertProposalEquals asserts that two jobv1.Proposal objects are functionally equal. +func assertProposalEquals(t *testing.T, want *jobv1.Proposal, actual *jobv1.Proposal) { + t.Helper() + + assert.True(t, strings.HasPrefix(actual.Id, "prop_")) + assert.Equal(t, want.Revision, actual.Revision) + assert.Equal(t, want.Status, actual.Status) + assert.Equal(t, want.DeliveryStatus, actual.DeliveryStatus) + assert.Equal(t, want.JobId, actual.JobId) + assert.Equal(t, want.Spec, actual.Spec) + assert.NotNil(t, actual.CreatedAt) + assert.NotNil(t, actual.UpdatedAt) +}