From 12e3a5379f75c3b79c8c384399f3dd930982bac7 Mon Sep 17 00:00:00 2001 From: James Kong Date: Mon, 18 May 2026 13:08:55 +0800 Subject: [PATCH] feat(feeds): remove stream ID lookup when approving data streams jobs Match data streams (job.Stream) specs only by external job ID when approving job proposals, the same as CRESettings and CCV job types. Previously, a stream spec with a different external job ID but the same stream ID as a running job would trigger the force-replace path (delete + recreate). That auto-delete behavior is now removed; stream job uniqueness is determined exclusively by external job ID. OPDATA-5937 --- core/services/feeds/service.go | 22 +--- core/services/feeds/service_test.go | 195 ++++++---------------------- core/services/job/mocks/orm.go | 57 -------- core/services/job/orm.go | 28 +--- 4 files changed, 51 insertions(+), 251 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 54e999159b3..61a2830bc96 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -1053,9 +1053,6 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { return fmt.Errorf("failed while checking for existing ccip job: %w", txerr) } - case job.StandardCapabilities: - // Only possible to match standard capabilities by external job id - // no-op case job.Gateway: existingJobID, txerr = tx.jobORM.FindGatewayJobID(ctx, *j.GatewaySpec) // Return an error if the repository errors. If there is a not found @@ -1063,19 +1060,12 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { return fmt.Errorf("failed while checking for existing gateway job: %w", txerr) } - case job.Stream: - existingJobID, txerr = tx.jobORM.FindJobIDByStreamID(ctx, *j.StreamID) - // Return an error if the repository errors. If there is a not found - // error we want to continue with approving the job. - if txerr != nil && !errors.Is(txerr, sql.ErrNoRows) { - return fmt.Errorf("failed while checking for existing stream job: %w", txerr) - } - case job.CRESettings: - // Only possible to match CRE Setting by external job id - // no-op - case job.CCVCommitteeVerifier, job.CCVExecutor: - // Only possible to match CCV jobs by external job id - // no-op + case job.CRESettings, + job.Stream, + job.CCVCommitteeVerifier, + job.CCVExecutor, + job.StandardCapabilities: + // NOOP: These jobs are only matched by external job ID, so do nothing default: return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type) } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 4486deda640..7e3d527a747 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -3890,10 +3890,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { Version: 1, Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), } - j = job.Job{ - ID: 1, - ExternalJobID: externalJobID, - } ) testCases := []struct { @@ -3913,7 +3909,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -3945,25 +3940,17 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { force: false, }, { - name: "cancelled spec success when no other spec is approved", + // Stream jobs are matched only by external job ID; a collision + // on stream ID alone must not block approval at the feeds layer. + // The strict mock asserts FindJobIDByStreamID is never called. + name: "pending job success when another job has the same stream ID", httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), before: func(svc *TestService) { - otherSpec := feeds.JobProposalSpec{ - ID: 21, - Status: feeds.SpecStatusRevoked, - JobProposalID: jp.ID, - Version: 2, - Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), - } - - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil) + svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}). - Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -3977,7 +3964,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { Return(nil) svc.orm.On("ApproveSpec", mock.Anything, - cancelledSpec.ID, + spec.ID, externalJobID, ).Return(nil) svc.fmsClient.On("ApprovedJob", @@ -3991,68 +3978,28 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) }, - id: cancelledSpec.ID, + id: spec.ID, force: false, }, { - name: "cancelled spec failed when another spec is approved", + name: "cancelled spec success when no other spec is approved", + httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), before: func(svc *TestService) { otherSpec := feeds.JobProposalSpec{ ID: 21, - Status: feeds.SpecStatusApproved, + Status: feeds.SpecStatusRevoked, JobProposalID: jp.ID, Version: 2, Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), } - svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(cancelledSpec, nil) + svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(cancelledSpec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}). Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil) - }, - id: cancelledSpec.ID, - force: false, - wantErr: "the job spec with version 2 is already approved", - }, - { - name: "rejected spec failed cannot be approved", - before: func(svc *TestService) { - svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID, mock.Anything).Return(rejectedSpec, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - }, - id: rejectedSpec.ID, - force: false, - wantErr: "cannot approve a rejected spec", - }, - { - name: "already existing job replacement error", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), - before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) - svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) - }, - id: spec.ID, - force: false, - wantErr: "could not approve job proposal: a job for this contract address already exists - please use the 'force' option to replace it", - }, - { - name: "already existing self managed job replacement success if forced without feedID", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), - before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) - svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows) - svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) svc.spawner. On("CreateJob", @@ -4066,7 +4013,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { Return(nil) svc.orm.On("ApproveSpec", mock.Anything, - spec.ID, + cancelledSpec.ID, externalJobID, ).Return(nil) svc.fmsClient.On("ApprovedJob", @@ -4080,99 +4027,38 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) }, - id: spec.ID, - force: true, + id: cancelledSpec.ID, + force: false, }, { - name: "already existing self managed job replacement success if forced with feedID", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + name: "cancelled spec failed when another spec is approved", before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(&feeds.JobProposalSpec{ - ID: 20, - Status: feeds.SpecStatusPending, + otherSpec := feeds.JobProposalSpec{ + ID: 21, + Status: feeds.SpecStatusApproved, JobProposalID: jp.ID, - Version: 1, + Version: 2, Definition: fmt.Sprintf(StreamTestSpecTemplate, streamName, externalJobID.String(), streamID), - }, nil) - svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) - svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(nil, sql.ErrNoRows) - svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) + } - svc.spawner. - On("CreateJob", - mock.Anything, - mock.Anything, - mock.MatchedBy(func(j *job.Job) bool { - return j.Name.String == streamName - }), - ). - Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). - Return(nil) - svc.orm.On("ApproveSpec", - mock.Anything, - spec.ID, - externalJobID, - ).Return(nil) - svc.fmsClient.On("ApprovedJob", - mock.MatchedBy(func(ctx context.Context) bool { return true }), - &proto.ApprovedJobRequest{ - Uuid: jp.RemoteUUID.String(), - Version: int64(spec.Version), - }, - ).Return(&proto.ApprovedJobResponse{}, nil) - svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) - svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) - svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(cancelledSpec, nil) + svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) + svc.orm.On("ListSpecsByJobProposalIDs", mock.Anything, []int64{cancelledSpec.JobProposalID}). + Return([]feeds.JobProposalSpec{otherSpec, *cancelledSpec}, nil) }, - id: spec.ID, - force: true, + id: cancelledSpec.ID, + force: false, + wantErr: "the job spec with version 2 is already approved", }, { - name: "already existing FMS managed job replacement success if forced", - httpTimeout: commonconfig.MustNewDuration(1 * time.Minute), + name: "rejected spec failed cannot be approved", before: func(svc *TestService) { - svc.connMgr.On("GetClient", jp.FeedsManagerID).Return(svc.fmsClient, nil) - svc.orm.On("GetSpec", mock.Anything, spec.ID).Return(spec, nil) + svc.orm.On("GetSpec", mock.Anything, cancelledSpec.ID).Return(rejectedSpec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) - svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) - svc.orm.EXPECT().GetApprovedSpec(mock.Anything, jp.ID).Return(&feeds.JobProposalSpec{ID: 100}, nil) - svc.orm.EXPECT().CancelSpec(mock.Anything, int64(100)).Return(nil) - svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(j.ID, nil) - svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, j.ID).Return(nil) - - svc.spawner. - On("CreateJob", - mock.Anything, - mock.Anything, - mock.MatchedBy(func(j *job.Job) bool { - return j.Name.String == streamName - }), - ). - Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). - Return(nil) - svc.orm.On("ApproveSpec", - mock.Anything, - spec.ID, - externalJobID, - ).Return(nil) - svc.fmsClient.On("ApprovedJob", - mock.MatchedBy(func(ctx context.Context) bool { return true }), - &proto.ApprovedJobRequest{ - Uuid: jp.RemoteUUID.String(), - Version: int64(spec.Version), - }, - ).Return(&proto.ApprovedJobResponse{}, nil) - svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) - svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) - svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) }, - id: spec.ID, - force: true, + id: rejectedSpec.ID, + force: false, + wantErr: "cannot approve a rejected spec", }, { name: "spec does not exist", @@ -4206,7 +4092,7 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { JobProposalID: jp.ID, Status: feeds.SpecStatusRejected, } - svc.orm.On("GetSpec", mock.Anything, rspec.ID, mock.Anything).Return(rspec, nil) + svc.orm.On("GetSpec", mock.Anything, rspec.ID).Return(rspec, nil) svc.orm.On("GetJobProposal", mock.Anything, jp.ID).Return(jp, nil) }, id: spec.ID, @@ -4255,7 +4141,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -4283,7 +4168,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -4317,7 +4201,6 @@ func Test_Service_ApproveSpec_Stream(t *testing.T) { svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, externalJobID).Return(job.Job{}, sql.ErrNoRows) - svc.jobORM.On("FindJobIDByStreamID", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) svc.spawner. On("CreateJob", @@ -5081,7 +4964,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { name: "success", before: func(svc *TestService) { svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(spec, nil) svc.orm.On("UpdateSpecDefinition", mock.Anything, specID, @@ -5095,7 +4978,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { name: "does not exist", before: func(svc *TestService) { svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(nil, sql.ErrNoRows) }, specID: specID, @@ -5105,7 +4988,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { name: "other get errors", before: func(svc *TestService) { svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(nil, errors.New("other db error")) }, specID: specID, @@ -5120,7 +5003,7 @@ func Test_Service_UpdateSpecDefinition(t *testing.T) { } svc.orm. - On("GetSpec", mock.Anything, specID, mock.Anything). + On("GetSpec", mock.Anything, specID). Return(spec, nil) }, specID: specID, diff --git a/core/services/job/mocks/orm.go b/core/services/job/mocks/orm.go index d677c053914..3a246fb223c 100644 --- a/core/services/job/mocks/orm.go +++ b/core/services/job/mocks/orm.go @@ -651,63 +651,6 @@ func (_c *ORM_FindJobIDByCapabilityNameAndVersion_Call) RunAndReturn(run func(co return _c } -// FindJobIDByStreamID provides a mock function with given fields: ctx, streamID -func (_m *ORM) FindJobIDByStreamID(ctx context.Context, streamID uint32) (int32, error) { - ret := _m.Called(ctx, streamID) - - if len(ret) == 0 { - panic("no return value specified for FindJobIDByStreamID") - } - - var r0 int32 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint32) (int32, error)); ok { - return rf(ctx, streamID) - } - if rf, ok := ret.Get(0).(func(context.Context, uint32) int32); ok { - r0 = rf(ctx, streamID) - } else { - r0 = ret.Get(0).(int32) - } - - if rf, ok := ret.Get(1).(func(context.Context, uint32) error); ok { - r1 = rf(ctx, streamID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_FindJobIDByStreamID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindJobIDByStreamID' -type ORM_FindJobIDByStreamID_Call struct { - *mock.Call -} - -// FindJobIDByStreamID is a helper method to define mock.On call -// - ctx context.Context -// - streamID uint32 -func (_e *ORM_Expecter) FindJobIDByStreamID(ctx interface{}, streamID interface{}) *ORM_FindJobIDByStreamID_Call { - return &ORM_FindJobIDByStreamID_Call{Call: _e.mock.On("FindJobIDByStreamID", ctx, streamID)} -} - -func (_c *ORM_FindJobIDByStreamID_Call) Run(run func(ctx context.Context, streamID uint32)) *ORM_FindJobIDByStreamID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint32)) - }) - return _c -} - -func (_c *ORM_FindJobIDByStreamID_Call) Return(_a0 int32, _a1 error) *ORM_FindJobIDByStreamID_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_FindJobIDByStreamID_Call) RunAndReturn(run func(context.Context, uint32) (int32, error)) *ORM_FindJobIDByStreamID_Call { - _c.Call.Return(run) - return _c -} - // FindJobIDByWorkflow provides a mock function with given fields: ctx, spec func (_m *ORM) FindJobIDByWorkflow(ctx context.Context, spec job.WorkflowSpec) (int32, error) { ret := _m.Called(ctx, spec) diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 6f11f3f90c6..b0b91f2f1a8 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -80,8 +80,6 @@ type ORM interface { FindJobIDByCapabilityNameAndVersion(ctx context.Context, spec CCIPSpec) (int32, error) FindStandardCapabilityJobID(ctx context.Context, spec StandardCapabilitiesSpec) (int32, error) FindGatewayJobID(ctx context.Context, spec GatewaySpec) (int32, error) - - FindJobIDByStreamID(ctx context.Context, streamID uint32) (int32, error) } type ORMConfig interface { @@ -1089,15 +1087,15 @@ func (o *orm) FindOCR2JobIDByAddress(ctx context.Context, relay string, chainID stmt := ` SELECT jobs.id FROM jobs -LEFT JOIN ocr2_oracle_specs ocr2spec on - ocr2spec.contract_id = $1 AND - ocr2spec.feed_id IS NOT DISTINCT FROM $2 AND +LEFT JOIN ocr2_oracle_specs ocr2spec on + ocr2spec.contract_id = $1 AND + ocr2spec.feed_id IS NOT DISTINCT FROM $2 AND ocr2spec.id = jobs.ocr2_oracle_spec_id AND ocr2spec.relay = $3 AND ocr2spec.relay_config->'chainID' = $4 -LEFT JOIN bootstrap_specs bs on - bs.contract_id = $1 AND - bs.feed_id IS NOT DISTINCT FROM $2 AND +LEFT JOIN bootstrap_specs bs on + bs.contract_id = $1 AND + bs.feed_id IS NOT DISTINCT FROM $2 AND bs.id = jobs.bootstrap_spec_id AND bs.relay = $3 AND bs.relay_config->'chainID' = $4 @@ -1436,20 +1434,6 @@ func (o *orm) FindJobsByPipelineSpecIDs(ctx context.Context, ids []int32) ([]Job return jbs, errors.Wrap(err, "FindJobsByPipelineSpecIDs failed") } -func (o *orm) FindJobIDByStreamID(ctx context.Context, streamID uint32) (jobID int32, err error) { - stmt := `SELECT id FROM jobs WHERE type = 'stream' AND stream_id = $1` - err = o.ds.GetContext(ctx, &jobID, stmt, streamID) - if err != nil { - if !errors.Is(err, sql.ErrNoRows) { - err = errors.Wrap(err, "error searching for job by stream id") - } - err = errors.Wrap(err, "FindJobIDByStreamID failed") - return - } - - return -} - // PipelineRuns returns pipeline runs for a job, with spec and taskruns loaded, latest first // If jobID is nil, returns all pipeline runs func (o *orm) PipelineRuns(ctx context.Context, jobID *int32, offset, size int) (runs []pipeline.Run, count int, err error) {