Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,29 +1053,19 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing ccip job: %w", txerr)
}
case job.StandardCapabilities:
// Only possible to match standard capabilities by external job id
// no-op
case job.Gateway:
existingJobID, txerr = tx.jobORM.FindGatewayJobID(ctx, *j.GatewaySpec)
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing gateway job: %w", txerr)
}
case job.Stream:
existingJobID, txerr = tx.jobORM.FindJobIDByStreamID(ctx, *j.StreamID)
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing stream job: %w", txerr)
}
case job.CRESettings:
// Only possible to match CRE Setting by external job id
// no-op
case job.CCVCommitteeVerifier, job.CCVExecutor:
// Only possible to match CCV jobs by external job id
// no-op
case job.CRESettings,
job.Stream,
Comment thread
ChrisAmora marked this conversation as resolved.
job.CCVCommitteeVerifier,
job.CCVExecutor,
job.StandardCapabilities:
// NOOP: These jobs are only matched by external job ID, so do nothing
default:
return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type)
}
Expand Down
195 changes: 39 additions & 156 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3890,10 +3890,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
Version: 1,
Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID),
}
j = job.Job{
ID: 1,
ExternalJobID: externalJobID,
}
)

testCases := []struct {
Expand All @@ -3913,7 +3909,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows)

svc.spawner.
On("CreateJob",
Expand Down Expand Up @@ -3945,25 +3940,17 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
force: false,
},
{
name: "cancelled spec success when no other spec is approved",
// Stream jobs are matched only by external job ID; a collision
// on stream ID alone must not block approval at the feeds layer.
// The strict mock asserts FindJobIDByStreamID is never called.
name: "pending job success when another job has the same stream ID",
httpTimeout: commonconfig.MustNewDuration(1 * time.Minute),
before: func(svc *TestService) {
otherSpec := feeds.JobProposalSpec{
ID: 21,
Status: feeds.SpecStatusRevoked,
JobProposalID: jp.ID,
Version: 2,
Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID),
}

svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil)
svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}).
Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil)
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows)

svc.spawner.
On("CreateJob",
Expand All @@ -3977,7 +3964,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
cancelledSpec.ID,
spec.ID,
externalJobID,
).Return(nil)
svc.fmsClient.On("ApprovedJob",
Expand All @@ -3991,68 +3978,28 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
},
id: cancelledSpec.ID,
id: spec.ID,
force: false,
},
{
name: "cancelled spec failed when another spec is approved",
name: "cancelled spec success when no other spec is approved",
httpTimeout: commonconfig.MustNewDuration(1 * time.Minute),
before: func(svc *TestService) {
otherSpec := feeds.JobProposalSpec{
ID: 21,
Status: feeds.SpecStatusApproved,
Status: feeds.SpecStatusRevoked,
JobProposalID: jp.ID,
Version: 2,
Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID),
}

svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil)
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(cancelledSpec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}).
Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil)
},
id: cancelledSpec.ID,
force: false,
wantErr: "the job spec with version 2 is already approved",
},
{
name: "rejected spec failed cannot be approved",
before: func(svc *TestService) {
svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(rejectedSpec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
},
id: rejectedSpec.ID,
force: false,
wantErr: "cannot approve a rejected spec",
},
{
name: "already existing job replacement error",
httpTimeout: commonconfig.MustNewDuration(1 * time.Minute),
before: func(svc *TestService) {
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
},
id: spec.ID,
force: false,
wantErr: "could not approve job proposal: a job for this contract address already exists - please use the 'force' option to replace it",
},
{
name: "already existing self managed job replacement success if forced without feedID",
httpTimeout: commonconfig.MustNewDuration(1 * time.Minute),
before: func(svc *TestService) {
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)
svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil)
svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil)

svc.spawner.
On("CreateJob",
Expand All @@ -4066,7 +4013,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
spec.ID,
cancelledSpec.ID,
externalJobID,
).Return(nil)
svc.fmsClient.On("ApprovedJob",
Expand All @@ -4080,99 +4027,38 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
},
id: spec.ID,
force: true,
id: cancelledSpec.ID,
force: false,
},
{
name: "already existing self managed job replacement success if forced with feedID",
httpTimeout: commonconfig.MustNewDuration(1 * time.Minute),
name: "cancelled spec failed when another spec is approved",
before: func(svc *TestService) {
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(&feeds.JobProposalSpec{
ID: 20,
Status: feeds.SpecStatusPending,
otherSpec := feeds.JobProposalSpec{
ID: 21,
Status: feeds.SpecStatusApproved,
JobProposalID: jp.ID,
Version: 1,
Version: 2,
Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID),
}, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)
svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil)
svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil)
}

svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.Name.String == streamName
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
spec.ID,
externalJobID,
).Return(nil)
svc.fmsClient.On("ApprovedJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.ApprovedJobRequest{
Uuid: jp.RemoteUUID.String(),
Version: int64(spec.Version),
},
).Return(&proto.ApprovedJobResponse{}, nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(cancelledSpec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}).
Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil)
},
id: spec.ID,
force: true,
id: cancelledSpec.ID,
force: false,
wantErr: "the job spec with version 2 is already approved",
},
{
name: "already existing FMS managed job replacement success if forced",
httpTimeout: commonconfig.MustNewDuration(1 * time.Minute),
name: "rejected spec failed cannot be approved",
before: func(svc *TestService) {
svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil)
svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(rejectedSpec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)
svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(&feeds.JobProposalSpec{ID: 100}, nil)
svc.orm.EXPECT().CancelSpec(mock.Anything, int64(100)).Return(nil)
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil)
svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil)

svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.Name.String == streamName
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
spec.ID,
externalJobID,
).Return(nil)
svc.fmsClient.On("ApprovedJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.ApprovedJobRequest{
Uuid: jp.RemoteUUID.String(),
Version: int64(spec.Version),
},
).Return(&proto.ApprovedJobResponse{}, nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
},
id: spec.ID,
force: true,
id: rejectedSpec.ID,
force: false,
wantErr: "cannot approve a rejected spec",
},
{
name: "spec does not exist",
Expand Down Expand Up @@ -4206,7 +4092,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
JobProposalID: jp.ID,
Status: feeds.SpecStatusRejected,
}
svc.orm.On("GetSpec", mock.Anything, rspec.ID, mock.Anything).Return(rspec, nil)
svc.orm.On("GetSpec", mock.Anything, rspec.ID).Return(rspec, nil)
svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil)
},
id: spec.ID,
Expand Down Expand Up @@ -4255,7 +4141,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows)

svc.spawner.
On("CreateJob",
Expand Down Expand Up @@ -4283,7 +4168,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows)

svc.spawner.
On("CreateJob",
Expand Down Expand Up @@ -4317,7 +4201,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) {
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows)
svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows)

svc.spawner.
On("CreateJob",
Expand Down Expand Up @@ -5081,7 +4964,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) {
name: "success",
before: func(svc *TestService) {
svc.orm.
On("GetSpec", mock.Anything, specID, mock.Anything).
On("GetSpec", mock.Anything, specID).
Return(spec, nil)
svc.orm.On("UpdateSpecDefinition", mock.Anything,
specID,
Expand All @@ -5095,7 +4978,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) {
name: "does not exist",
before: func(svc *TestService) {
svc.orm.
On("GetSpec", mock.Anything, specID, mock.Anything).
On("GetSpec", mock.Anything, specID).
Return(nil, sql.ErrNoRows)
},
specID: specID,
Expand All @@ -5105,7 +4988,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) {
name: "other get errors",
before: func(svc *TestService) {
svc.orm.
On("GetSpec", mock.Anything, specID, mock.Anything).
On("GetSpec", mock.Anything, specID).
Return(nil, errors.New("other db error"))
},
specID: specID,
Expand All @@ -5120,7 +5003,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) {
}

svc.orm.
On("GetSpec", mock.Anything, specID, mock.Anything).
On("GetSpec", mock.Anything, specID).
Return(spec, nil)
},
specID: specID,
Expand Down
Loading
Loading