diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 54e999159b3..61a2830bc96 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -1053,9 +1053,6 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { return fmt.Errorf("failed while checking for existing ccip job: %w", txerr) } - case job.StandardCapabilities: - // Only possible to match standard capabilities by external job id - // no-op case job.Gateway: existingJobID, txerr = tx.jobORM.FindGatewayJobID(ctx, *j.GatewaySpec) // Return an error if the repository errors. If there is a not found @@ -1063,19 +1060,12 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { return fmt.Errorf("failed while checking for existing gateway job: %w", txerr) } - case job.Stream: - existingJobID, txerr = tx.jobORM.FindJobIDByStreamID(ctx, *j.StreamID) - // Return an error if the repository errors. If there is a not found - // error we want to continue with approving the job. - if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { - return fmt.Errorf("failed while checking for existing stream job: %w", txerr) - } - case job.CRESettings: - // Only possible to match CRE Setting by external job id - // no-op - case job.CCVCommitteeVerifier, job.CCVExecutor: - // Only possible to match CCV jobs by external job id - // no-op + case job.CRESettings, + job.Stream, + job.CCVCommitteeVerifier, + job.CCVExecutor, + job.StandardCapabilities: + // NOOP: These jobs are only matched by external job ID, so do nothing default: return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type) } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 4486deda640..7e3d527a747 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -3890,10 +3890,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { Version: 1, Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), } - j = job.Job{ - ID: 1, - ExternalJobID: externalJobID, - } ) testCases := []struct { @@ -3913,7 +3909,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -3945,25 +3940,17 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { force: false, }, { - name: "cancelled spec success when no other spec is approved", + // Stream jobs are matched only by external job ID; a collision + // on stream ID alone must not block approval at the feeds layer. + // The strict mock asserts FindJobIDByStreamID is never called. + name: "pending job success when another job has the same stream ID", httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), before: func(svc *TestService) { - otherSpec := feeds.JobProposalSpec{ - ID: 21, - Status: feeds.SpecStatusRevoked, - JobProposalID: jp.ID, - Version: 2, - Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), - } - - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}). - Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -3977,7 +3964,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { Return(nil) svc.orm.On("ApproveSpec", mock.Anything, - cancelledSpec.ID, + spec.ID, externalJobID, ).Return(nil) svc.fmsClient.On("ApprovedJob", @@ -3991,68 +3978,28 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) }, - id: cancelledSpec.ID, + id: spec.ID, force: false, }, { - name: "cancelled spec failed when another spec is approved", + name: "cancelled spec success when no other spec is approved", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), before: func(svc *TestService) { otherSpec := feeds.JobProposalSpec{ ID: 21, - Status: feeds.SpecStatusApproved, + Status: feeds.SpecStatusRevoked, JobProposalID: jp.ID, Version: 2, Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), } - svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(cancelledSpec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}). Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil) - }, - id: cancelledSpec.ID, - force: false, - wantErr: "the job spec with version 2 is already approved", - }, - { - name: "rejected spec failed cannot be approved", - before: func(svc *TestService) { - svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(rejectedSpec, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - }, - id: rejectedSpec.ID, - force: false, - wantErr: "cannot approve a rejected spec", - }, - { - name: "already existing job replacement error", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), - before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) - svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) - }, - id: spec.ID, - force: false, - wantErr: "could not approve job proposal: a job for this contract address already exists - please use the 'force' option to replace it", - }, - { - name: "already existing self managed job replacement success if forced without feedID", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), - before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) - svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows) - svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) svc.spawner. On("CreateJob", @@ -4066,7 +4013,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { Return(nil) svc.orm.On("ApproveSpec", mock.Anything, - spec.ID, + cancelledSpec.ID, externalJobID, ).Return(nil) svc.fmsClient.On("ApprovedJob", @@ -4080,99 +4027,38 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) }, - id: spec.ID, - force: true, + id: cancelledSpec.ID, + force: false, }, { - name: "already existing self managed job replacement success if forced with feedID", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + name: "cancelled spec failed when another spec is approved", before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(&feeds.JobProposalSpec{ - ID: 20, - Status: feeds.SpecStatusPending, + otherSpec := feeds.JobProposalSpec{ + ID: 21, + Status: feeds.SpecStatusApproved, JobProposalID: jp.ID, - Version: 1, + Version: 2, Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), - }, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) - svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows) - svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) + } - svc.spawner. - On("CreateJob", - mock.Anything, - mock.Anything, - mock.MatchedBy(func(j *job.Job) bool { - return j.Name.String == streamName - }), - ). - Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). - Return(nil) - svc.orm.On("ApproveSpec", - mock.Anything, - spec.ID, - externalJobID, - ).Return(nil) - svc.fmsClient.On("ApprovedJob", - mock.MatchedBy(func(ctx context.Context) bool { return true }), - &proto.ApprovedJobRequest{ - Uuid: jp.RemoteUUID.String(), - Version: int64(spec.Version), - }, - ).Return(&proto.ApprovedJobResponse{}, nil) - svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) - svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) - svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(cancelledSpec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}). + Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil) }, - id: spec.ID, - force: true, + id: cancelledSpec.ID, + force: false, + wantErr: "the job spec with version 2 is already approved", }, { - name: "already existing FMS managed job replacement success if forced", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + name: "rejected spec failed cannot be approved", before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(rejectedSpec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) - svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(&feeds.JobProposalSpec{ID: 100}, nil) - svc.orm.EXPECT().CancelSpec(mock.Anything, int64(100)).Return(nil) - svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) - - svc.spawner. - On("CreateJob", - mock.Anything, - mock.Anything, - mock.MatchedBy(func(j *job.Job) bool { - return j.Name.String == streamName - }), - ). - Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). - Return(nil) - svc.orm.On("ApproveSpec", - mock.Anything, - spec.ID, - externalJobID, - ).Return(nil) - svc.fmsClient.On("ApprovedJob", - mock.MatchedBy(func(ctx context.Context) bool { return true }), - &proto.ApprovedJobRequest{ - Uuid: jp.RemoteUUID.String(), - Version: int64(spec.Version), - }, - ).Return(&proto.ApprovedJobResponse{}, nil) - svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) - svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) - svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) }, - id: spec.ID, - force: true, + id: rejectedSpec.ID, + force: false, + wantErr: "cannot approve a rejected spec", }, { name: "spec does not exist", @@ -4206,7 +4092,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { JobProposalID: jp.ID, Status: feeds.SpecStatusRejected, } - svc.orm.On("GetSpec", mock.Anything, rspec.ID, mock.Anything).Return(rspec, nil) + svc.orm.On("GetSpec", mock.Anything, rspec.ID).Return(rspec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) }, id: spec.ID, @@ -4255,7 +4141,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -4283,7 +4168,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -4317,7 +4201,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -5081,7 +4964,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { name: "success", before: func(svc *TestService) { svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(spec, nil) svc.orm.On("UpdateSpecDefinition", mock.Anything, specID, @@ -5095,7 +4978,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { name: "does not exist", before: func(svc *TestService) { svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(nil, sql.ErrNoRows) }, specID: specID, @@ -5105,7 +4988,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { name: "other get errors", before: func(svc *TestService) { svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(nil, errors.New("other db error")) }, specID: specID, @@ -5120,7 +5003,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { } svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(spec, nil) }, specID: specID, diff --git a/core/services/job/mocks/orm.go b/core/services/job/mocks/orm.go index d677c053914..3a246fb223c 100644 --- a/core/services/job/mocks/orm.go +++ b/core/services/job/mocks/orm.go @@ -651,63 +651,6 @@ func (_c *ORM_FindJobIDByCapabilityNameAndVersion_Call) RunAndReturn(run func(co return _c } -// FindJobIDByStreamID provides a mock function with given fields: ctx, streamID -func (_m *ORM) FindJobIDByStreamID(ctx context.Context, streamID uint32) (int32, error) { - ret := _m.Called(ctx, streamID) - - if len(ret) == 0 { - panic("no return value specified for FindJobIDByStreamID") - } - - var r0 int32 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint32) (int32, error)); ok { - return rf(ctx, streamID) - } - if rf, ok := ret.Get(0).(func(context.Context, uint32) int32); ok { - r0 = rf(ctx, streamID) - } else { - r0 = ret.Get(0).(int32) - } - - if rf, ok := ret.Get(1).(func(context.Context, uint32) error); ok { - r1 = rf(ctx, streamID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_FindJobIDByStreamID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindJobIDByStreamID' -type ORM_FindJobIDByStreamID_Call struct { - *mock.Call -} - -// FindJobIDByStreamID is a helper method to define mock.On call -// - ctx context.Context -// - streamID uint32 -func (_e *ORM_Expecter) FindJobIDByStreamID(ctx interface{}, streamID interface{}) *ORM_FindJobIDByStreamID_Call { - return &ORM_FindJobIDByStreamID_Call{Call: _e.mock.On("FindJobIDByStreamID", ctx, streamID)} -} - -func (_c *ORM_FindJobIDByStreamID_Call) Run(run func(ctx context.Context, streamID uint32)) *ORM_FindJobIDByStreamID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint32)) - }) - return _c -} - -func (_c *ORM_FindJobIDByStreamID_Call) Return(_a0 int32, _a1 error) *ORM_FindJobIDByStreamID_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_FindJobIDByStreamID_Call) RunAndReturn(run func(context.Context, uint32) (int32, error)) *ORM_FindJobIDByStreamID_Call { - _c.Call.Return(run) - return _c -} - // FindJobIDByWorkflow provides a mock function with given fields: ctx, spec func (_m *ORM) FindJobIDByWorkflow(ctx context.Context, spec job.WorkflowSpec) (int32, error) { ret := _m.Called(ctx, spec) diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 6f11f3f90c6..b0b91f2f1a8 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -80,8 +80,6 @@ type ORM interface { FindJobIDByCapabilityNameAndVersion(ctx context.Context, spec CCIPSpec) (int32, error) FindStandardCapabilityJobID(ctx context.Context, spec StandardCapabilitiesSpec) (int32, error) FindGatewayJobID(ctx context.Context, spec GatewaySpec) (int32, error) - - FindJobIDByStreamID(ctx context.Context, streamID uint32) (int32, error) } type ORMConfig interface { @@ -1089,15 +1087,15 @@ func (o *orm) FindOCR2JobIDByAddress(ctx context.Context, relay string, chainID stmt := ` SELECT jobs.id FROM jobs -LEFT JOIN ocr2_oracle_specs ocr2spec on - ocr2spec.contract_id = $1 AND - ocr2spec.feed_id IS NOT DISTINCT FROM $2 AND +LEFT JOIN ocr2_oracle_specs ocr2spec on + ocr2spec.contract_id = $1 AND + ocr2spec.feed_id IS NOT DISTINCT FROM $2 AND ocr2spec.id = jobs.ocr2_oracle_spec_id AND ocr2spec.relay = $3 AND ocr2spec.relay_config->'chainID' = $4 -LEFT JOIN bootstrap_specs bs on - bs.contract_id = $1 AND - bs.feed_id IS NOT DISTINCT FROM $2 AND +LEFT JOIN bootstrap_specs bs on + bs.contract_id = $1 AND + bs.feed_id IS NOT DISTINCT FROM $2 AND bs.id = jobs.bootstrap_spec_id AND bs.relay = $3 AND bs.relay_config->'chainID' = $4 @@ -1436,20 +1434,6 @@ func (o *orm) FindJobsByPipelineSpecIDs(ctx context.Context, ids []int32) ([]Job return jbs, errors.Wrap(err, "FindJobsByPipelineSpecIDs failed") } -func (o *orm) FindJobIDByStreamID(ctx context.Context, streamID uint32) (jobID int32, err error) { - stmt := `SELECT id FROM jobs WHERE type = 'stream' AND stream_id = $1` - err = o.ds.GetContext(ctx, &jobID, stmt, streamID) - if err != nil { - if !errors.Is(err, sql.ErrNoRows) { - err = errors.Wrap(err, "error searching for job by stream id") - } - err = errors.Wrap(err, "FindJobIDByStreamID failed") - return - } - - return -} - // PipelineRuns returns pipeline runs for a job, with spec and taskruns loaded, latest first // If jobID is nil, returns all pipeline runs func (o *orm) PipelineRuns(ctx context.Context, jobID *int32, offset, size int) (runs []pipeline.Run, count int, err error) {