@@ -2,14 +2,19 @@ package memory
22
33import (
44 "context"
5+ "errors"
56 "fmt"
7+ "slices"
8+ "strings"
69 "sync"
710 "time"
811
912 "github.com/google/uuid"
13+ "github.com/pelletier/go-toml/v2"
1014 csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
1115 jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
1216 nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
17+ "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
1318 "google.golang.org/grpc"
1419 "google.golang.org/grpc/codes"
1520 "google.golang.org/grpc/status"
@@ -48,36 +53,21 @@ func NewMemoryJobDistributor() *MemoryJobDistributor {
4853// Job Service Methods
4954
5055// ProposeJob creates a new job proposal and stores it in memory.
51- func (m * MemoryJobDistributor ) ProposeJob (ctx context.Context , in * jobv1.ProposeJobRequest , opts ... grpc.CallOption ) (* jobv1.ProposeJobResponse , error ) {
56+ func (m * MemoryJobDistributor ) ProposeJob (
57+ ctx context.Context , in * jobv1.ProposeJobRequest , opts ... grpc.CallOption ,
58+ ) (* jobv1.ProposeJobResponse , error ) {
5259 if in == nil {
5360 return nil , status .Error (codes .InvalidArgument , "request cannot be nil" )
5461 }
5562
56- // Generate unique IDs
57- proposalID := uuid .New ().String ()
58- jobID := uuid .New ().String ()
59-
60- // Create the proposal
61- proposal := & jobv1.Proposal {
62- Id : proposalID ,
63- JobId : jobID ,
64- Spec : in .Spec ,
65- Status : jobv1 .ProposalStatus_PROPOSAL_STATUS_APPROVED ,
66- }
63+ m .mu .Lock ()
64+ defer m .mu .Unlock ()
6765
68- // Also create a job based on the proposal
69- job := & jobv1.Job {
70- Id : jobID ,
71- Uuid : uuid .New ().String (),
72- NodeId : in .NodeId ,
73- Labels : in .Labels ,
66+ proposal , err := m .upsertProposal (in .NodeId , in .Spec , in .Labels )
67+ if err != nil {
68+ return nil , err
7469 }
7570
76- m .mu .Lock ()
77- m .proposals [proposalID ] = proposal
78- m .jobs [jobID ] = job
79- m .mu .Unlock ()
80-
8171 return & jobv1.ProposeJobResponse {
8272 Proposal : proposal ,
8373 }, nil
@@ -179,29 +169,11 @@ func (m *MemoryJobDistributor) BatchProposeJob(ctx context.Context, in *jobv1.Ba
179169 m .mu .Lock ()
180170 // Create a proposal for each node
181171 for _ , nodeID := range in .GetNodeIds () {
182- // Generate unique IDs
183- proposalID := uuid .New ().String ()
184- jobID := uuid .New ().String ()
185-
186- // Create the proposal
187- proposal := & jobv1.Proposal {
188- Id : proposalID ,
189- JobId : jobID ,
190- Spec : in .GetSpec (),
191- Status : jobv1 .ProposalStatus_PROPOSAL_STATUS_APPROVED ,
172+ proposal , err := m .upsertProposal (nodeID , in .GetSpec (), in .GetLabels ())
173+ if err != nil {
174+ return nil , err
192175 }
193176
194- // Store the proposal
195- m .proposals [proposalID ] = proposal
196-
197- // Also create and store a job based on the proposal
198- job := & jobv1.Job {
199- Id : jobID ,
200- NodeId : nodeID ,
201- Labels : in .GetLabels (),
202- }
203- m .jobs [jobID ] = job
204-
205177 // Add to success responses
206178 successResponses [nodeID ] = & jobv1.ProposeJobResponse {
207179 Proposal : proposal ,
@@ -220,38 +192,39 @@ func (m *MemoryJobDistributor) RevokeJob(ctx context.Context, in *jobv1.RevokeJo
220192 return nil , status .Error (codes .InvalidArgument , "request cannot be nil" )
221193 }
222194
223- // Use GetId() method to access the oneof field
224- jobID := in .GetId ()
195+ var jobID string
196+ switch idOneof := in .GetIdOneof ().(type ) {
197+ case * jobv1.RevokeJobRequest_Id :
198+ if idOneof .Id == "" {
199+ return nil , status .Error (codes .InvalidArgument , "job id must be provided" )
200+ }
225201
226- if jobID == "" {
227- return nil , status .Error (codes .InvalidArgument , "job id must be provided" )
202+ jobID = idOneof .Id
203+ case * jobv1.RevokeJobRequest_Uuid :
204+ return nil , status .Error (codes .InvalidArgument , "uuid is not supported" )
228205 }
229206
230207 m .mu .Lock ()
231- // Find the proposal associated with this job
232- var foundProposal * jobv1.Proposal
233- for _ , proposal := range m .proposals {
234- if proposal .JobId == jobID {
235- foundProposal = proposal
236- break
237- }
238- }
208+ defer m .mu .Unlock ()
239209
240- if foundProposal != nil {
241- foundProposal .Status = jobv1 .ProposalStatus_PROPOSAL_STATUS_REVOKED
210+ // Find the proposal with the highest revision number associated with this job
211+ prop := m .getHighestRevisionProposalByJobID (jobID )
212+ if prop == nil {
213+ return nil , status .Errorf (codes .NotFound , "proposal with job id %s not found" , jobID )
242214 }
243- m .mu .Unlock ()
244215
245- if foundProposal != nil {
246- return & jobv1.RevokeJobResponse {
247- Proposal : foundProposal ,
248- }, nil
216+ if ! slices .Contains ([]jobv1.ProposalStatus {jobv1 .ProposalStatus_PROPOSAL_STATUS_PROPOSED , jobv1 .ProposalStatus_PROPOSAL_STATUS_CANCELLED }, prop .Status ) {
217+ return nil , errors .New ("job cannot be revoked" )
249218 }
250219
251- return & jobv1.RevokeJobResponse {}, nil
220+ prop .Status = jobv1 .ProposalStatus_PROPOSAL_STATUS_REVOKED
221+
222+ return & jobv1.RevokeJobResponse {
223+ Proposal : prop ,
224+ }, nil
252225}
253226
254- // DeleteJob removes a job from memory .
227+ // DeleteJob soft deletes a job, setting the DeletedAt field to the current time .
255228func (m * MemoryJobDistributor ) DeleteJob (ctx context.Context , in * jobv1.DeleteJobRequest , opts ... grpc.CallOption ) (* jobv1.DeleteJobResponse , error ) {
256229 if in == nil {
257230 return nil , status .Error (codes .InvalidArgument , "request cannot be nil" )
@@ -318,7 +291,7 @@ func (m *MemoryJobDistributor) RegisterNode(ctx context.Context, in *nodev1.Regi
318291 }
319292
320293 // Generate a new ID for the node
321- nodeID := uuid . New (). String ()
294+ nodeID := newNodeID ()
322295
323296 // Create the node
324297 node := & nodev1.Node {
@@ -536,3 +509,112 @@ func (m *MemoryJobDistributor) AddKeypair(keypair *csav1.Keypair) {
536509 m .mu .Unlock ()
537510 }
538511}
512+
513+ // getJobByUUID retrieves a job by its UUID and node ID.
514+ func (m * MemoryJobDistributor ) getJobByUUIDAndNodeID (uuid string , nodeID string ) (* jobv1.Job , error ) {
515+ for _ , job := range m .jobs {
516+ if job .Uuid == uuid && job .NodeId == nodeID {
517+ return job , nil
518+ }
519+ }
520+
521+ return nil , fmt .Errorf ("job with uuid %s not found" , uuid )
522+ }
523+
524+ // getNextRevisionNum returns the next revision number for a given job ID.
525+ func (m * MemoryJobDistributor ) getNextRevisionNum (jobID string ) int64 {
526+ return m .proposalsByJobCount (jobID ) + 1
527+ }
528+
529+ // getHighestRevisionProposalByJobID returns the proposal with the highest revision number for a
530+ // given job ID.
531+ func (m * MemoryJobDistributor ) getHighestRevisionProposalByJobID (jobID string ) * jobv1.Proposal {
532+ var prop * jobv1.Proposal
533+ var maxRevision int64 = - 1
534+
535+ for _ , p := range m .proposals {
536+ if p .JobId == jobID && p .Revision > maxRevision {
537+ prop = p
538+ maxRevision = prop .Revision
539+ }
540+ }
541+
542+ return prop
543+ }
544+
545+ // proposalsByJobCount returns the number of proposals for a given job ID.
546+ func (m * MemoryJobDistributor ) proposalsByJobCount (jobID string ) int64 {
547+ count := 0
548+ for _ , proposal := range m .proposals {
549+ if proposal .JobId == jobID {
550+ count ++
551+ }
552+ }
553+
554+ return int64 (count )
555+ }
556+
557+ // upsertProposal upserts a proposal for a given spec
558+ func (m * MemoryJobDistributor ) upsertProposal (
559+ nodeID string , spec string , labels []* ptypes.Label ,
560+ ) (* jobv1.Proposal , error ) {
561+ specUUID , err := getSpecUUID (spec )
562+ if err != nil {
563+ return nil , status .Errorf (codes .InvalidArgument , "invalid spec: %v" , err )
564+ }
565+
566+ // Generate unique IDs
567+ proposalID := newProposalID ()
568+ jobID := newJobID ()
569+ now := timestamppb .Now ()
570+
571+ // If the job already exists, we use the existing job ID, otherwise we create a new job
572+ if job , _ := m .getJobByUUIDAndNodeID (specUUID .String (), nodeID ); job != nil {
573+ jobID = job .Id
574+
575+ m .jobs [jobID ].ProposalIds = append (m .jobs [jobID ].ProposalIds , proposalID )
576+ } else {
577+ m .jobs [jobID ] = & jobv1.Job {
578+ Id : jobID ,
579+ Uuid : specUUID .String (),
580+ NodeId : nodeID ,
581+ ProposalIds : []string {proposalID },
582+ Labels : labels ,
583+ CreatedAt : now ,
584+ UpdatedAt : now ,
585+ }
586+ }
587+
588+ // Insert the proposal
589+ m .proposals [proposalID ] = & jobv1.Proposal {
590+ Id : proposalID ,
591+ Revision : m .getNextRevisionNum (jobID ),
592+ Status : jobv1 .ProposalStatus_PROPOSAL_STATUS_PROPOSED ,
593+ DeliveryStatus : jobv1 .ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED ,
594+ JobId : jobID ,
595+ Spec : spec ,
596+ CreatedAt : now ,
597+ UpdatedAt : now ,
598+ }
599+
600+ return m .proposals [proposalID ], nil
601+ }
602+
603+ // getSpecUUID extracts the UUID from a spec
604+ func getSpecUUID (spec string ) (uuid.UUID , error ) {
605+ s := struct {
606+ ExternalJobID * uuid.UUID `toml:"externalJobID,omitempty"`
607+ }{}
608+
609+ d := toml .NewDecoder (strings .NewReader (spec ))
610+
611+ if err := d .Decode (& s ); err != nil {
612+ return uuid .Nil , err
613+ }
614+
615+ if s .ExternalJobID == nil {
616+ return uuid .Nil , errors .New ("externalJobID field not found in spec" )
617+ }
618+
619+ return * s .ExternalJobID , nil
620+ }
0 commit comments