Skip to content

Commit 0b66636

Browse files
authored
fix(embedding): not-found metric classification + backfill pagination (#91)
* fix: record embedding worker not-found as skipped, not failed_final When GetFeedbackRecord returns not-found at the start of Work — a benign race where the record was deleted or its tenant purged between enqueue and processing — the worker recorded terminal-failure metrics (a get_record_failed worker error plus a failed_final outcome) before the not-found short-circuit. That inflates the failure metrics and can trip false alerts, and it was inconsistent with the same worker's not-found-on-write path, which already records skipped. Check not-found first and record it as skipped (no worker error); record the failure metrics only for genuine errors. * perf: stream the embedding backfill in keyset pages ListFeedbackRecordIDsForBackfill and BackfillEmbeddings materialized every eligible record id in one slice; on a large deployment that is a memory spike when the CLI runs. They now keyset-paginate (id > cursor LIMIT n), enqueuing page by page so the full set is never held in memory at once.
1 parent 4141d58 commit 0b66636

5 files changed

Lines changed: 273 additions & 24 deletions

File tree

internal/repository/embeddings_repository.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,24 @@ func (r *EmbeddingsRepository) DeleteByFeedbackRecordAndModel(
9595
})
9696
}
9797

98-
// ListFeedbackRecordIDsForBackfill returns IDs of feedback records that have non-empty value_text
99-
// and no row in embeddings for the given model (so they need an embedding for that model).
100-
func (r *EmbeddingsRepository) ListFeedbackRecordIDsForBackfill(ctx context.Context, model string) ([]uuid.UUID, error) {
98+
// ListFeedbackRecordIDsForBackfill returns one keyset page (fr.id > afterID, ordered by id,
99+
// at most limit rows) of feedback-record IDs that have non-empty value_text and no row in
100+
// embeddings for the given model (so they need an embedding for that model). Pass uuid.Nil
101+
// as afterID for the first page.
102+
func (r *EmbeddingsRepository) ListFeedbackRecordIDsForBackfill(
103+
ctx context.Context, model string, afterID uuid.UUID, limit int,
104+
) ([]uuid.UUID, error) {
101105
rows, err := r.db.Query(ctx, `
102106
SELECT fr.id FROM feedback_records fr
103107
WHERE fr.value_text IS NOT NULL AND trim(fr.value_text) != ''
108+
AND fr.id > $2
104109
AND NOT EXISTS (
105110
SELECT 1 FROM embeddings e
106111
WHERE e.feedback_record_id = fr.id AND e.model = $1
107112
)
108-
`, model)
113+
ORDER BY fr.id
114+
LIMIT $3
115+
`, model, afterID, limit)
109116
if err != nil {
110117
return nil, fmt.Errorf("list feedback record ids for backfill: %w", err)
111118
}

internal/service/feedback_records_service.go

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ type FeedbackRecordsRepository interface {
4444
type EmbeddingsRepository interface {
4545
Upsert(ctx context.Context, feedbackRecordID uuid.UUID, model string, embedding []float32) error
4646
DeleteByFeedbackRecordAndModel(ctx context.Context, feedbackRecordID uuid.UUID, model string) error
47-
ListFeedbackRecordIDsForBackfill(ctx context.Context, model string) ([]uuid.UUID, error)
47+
ListFeedbackRecordIDsForBackfill(
48+
ctx context.Context, model string, afterID uuid.UUID, limit int,
49+
) ([]uuid.UUID, error)
4850
}
4951

5052
// FeedbackRecordsService handles business logic for feedback records.
@@ -293,38 +295,58 @@ func (s *FeedbackRecordsService) SetEmbedding(
293295
return nil
294296
}
295297

298+
// embeddingBackfillPageSize bounds how many record ids the embedding backfill lists and
299+
// enqueues per keyset page, so a large deployment is never fully materialized in memory.
300+
const embeddingBackfillPageSize = 500
301+
296302
// BackfillEmbeddings enqueues embedding jobs for the given model for all feedback records that have
297303
// non-empty value_text and no embedding row for that model (existing rows are replaced by upsert when the job runs).
298-
// Returns the number of jobs enqueued. Requires embeddingInserter and embeddingQueueName to be set.
304+
// It streams the records in keyset pages. Returns the number of jobs enqueued. Requires embeddingInserter
305+
// and embeddingQueueName to be set.
299306
func (s *FeedbackRecordsService) BackfillEmbeddings(ctx context.Context, model string) (int, error) {
300307
if s.embeddingInserter == nil || s.embeddingQueueName == "" {
301308
return 0, ErrEmbeddingBackfillNotConfigured
302309
}
303310

304-
ids, err := s.embeddingsRepo.ListFeedbackRecordIDsForBackfill(ctx, model)
305-
if err != nil {
306-
return 0, fmt.Errorf("list ids for embedding backfill: %w", err)
307-
}
308-
309311
opts := &river.InsertOpts{
310312
Queue: s.embeddingQueueName,
311313
MaxAttempts: s.embeddingMaxAttempts,
312314
UniqueOpts: river.UniqueOpts{ByArgs: true, ByPeriod: uniqueByPeriodEmbedding},
313315
}
314316

315317
enqueued := 0
318+
afterID := uuid.Nil
316319

317-
for _, id := range ids {
318-
_, err := s.embeddingInserter.Insert(ctx, FeedbackEmbeddingArgs{
319-
FeedbackRecordID: id,
320-
Model: model,
321-
ValueTextHash: "backfill",
322-
}, opts)
320+
for {
321+
ids, err := s.embeddingsRepo.ListFeedbackRecordIDsForBackfill(ctx, model, afterID, embeddingBackfillPageSize)
323322
if err != nil {
324-
return enqueued, fmt.Errorf("enqueue embedding job for %s: %w", id, err)
323+
return enqueued, fmt.Errorf("list ids for embedding backfill: %w", err)
324+
}
325+
326+
if len(ids) == 0 {
327+
break
328+
}
329+
330+
for _, id := range ids {
331+
_, err := s.embeddingInserter.Insert(ctx, FeedbackEmbeddingArgs{
332+
FeedbackRecordID: id,
333+
Model: model,
334+
ValueTextHash: "backfill",
335+
}, opts)
336+
if err != nil {
337+
return enqueued, fmt.Errorf("enqueue embedding job for %s: %w", id, err)
338+
}
339+
340+
enqueued++
325341
}
326342

327-
enqueued++
343+
// Advance the keyset cursor past the last id seen; the query excludes
344+
// already-embedded records, so the cursor always moves forward.
345+
afterID = ids[len(ids)-1]
346+
347+
if len(ids) < embeddingBackfillPageSize {
348+
break
349+
}
328350
}
329351

330352
return enqueued, nil

internal/workers/feedback_embedding.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,22 @@ func (w *FeedbackEmbeddingWorker) Work(ctx context.Context, job *river.Job[servi
6464

6565
record, err := w.embeddingService.GetFeedbackRecord(ctx, args.FeedbackRecordID)
6666
if err != nil {
67+
// Not-found means the record was deleted or its tenant purged between enqueue and
68+
// now: a benign race, not a terminal failure. Record it as skipped (consistent with
69+
// the not-found-on-write path) so it does not trip failure alerts.
70+
if errors.Is(err, huberrors.ErrNotFound) {
71+
if w.metrics != nil {
72+
w.metrics.RecordEmbeddingOutcome(ctx, "skipped")
73+
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "skipped")
74+
}
75+
76+
slog.Info("embedding: record gone before embed, skipping",
77+
"feedback_record_id", args.FeedbackRecordID,
78+
)
79+
80+
return nil
81+
}
82+
6783
if w.metrics != nil {
6884
w.metrics.RecordWorkerError(ctx, "get_record_failed")
6985
w.metrics.RecordEmbeddingOutcome(ctx, "failed_final")
@@ -75,11 +91,6 @@ func (w *FeedbackEmbeddingWorker) Work(ctx context.Context, job *river.Job[servi
7591
"error", err,
7692
)
7793

78-
// Only suppress retries for not-found; transient DB/network errors should retry.
79-
if errors.Is(err, huberrors.ErrNotFound) {
80-
return nil
81-
}
82-
8394
return fmt.Errorf("get feedback record: %w", err)
8495
}
8596

internal/workers/feedback_embedding_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,43 @@ import (
44
"context"
55
"errors"
66
"testing"
7+
"time"
78

89
"github.com/google/uuid"
910
"github.com/riverqueue/river"
1011
"github.com/riverqueue/river/rivertype"
1112

1213
"github.com/formbricks/hub/internal/huberrors"
1314
"github.com/formbricks/hub/internal/models"
15+
"github.com/formbricks/hub/internal/observability"
1416
"github.com/formbricks/hub/internal/service"
1517
)
1618

19+
// countingEmbeddingMetrics records outcome/worker-error counts for assertions.
20+
type countingEmbeddingMetrics struct {
21+
outcomes map[string]int
22+
workerErr map[string]int
23+
}
24+
25+
func newCountingEmbeddingMetrics() *countingEmbeddingMetrics {
26+
return &countingEmbeddingMetrics{outcomes: map[string]int{}, workerErr: map[string]int{}}
27+
}
28+
29+
func (m *countingEmbeddingMetrics) RecordJobsEnqueued(context.Context, int64) {}
30+
func (m *countingEmbeddingMetrics) RecordProviderError(context.Context, string) {}
31+
32+
func (m *countingEmbeddingMetrics) RecordEmbeddingOutcome(_ context.Context, status string) {
33+
m.outcomes[status]++
34+
}
35+
36+
func (m *countingEmbeddingMetrics) RecordWorkerError(_ context.Context, reason string) {
37+
m.workerErr[reason]++
38+
}
39+
40+
func (m *countingEmbeddingMetrics) RecordEmbeddingDuration(context.Context, time.Duration, string) {}
41+
42+
var _ observability.EmbeddingMetrics = (*countingEmbeddingMetrics)(nil)
43+
1744
type mockEmbeddingService struct {
1845
record *models.FeedbackRecord
1946
getErr error
@@ -69,6 +96,26 @@ func textRecord(valueText string) *models.FeedbackRecord {
6996
return record
7097
}
7198

99+
func TestFeedbackEmbeddingWorker_GetNotFoundRecordsSkipped(t *testing.T) {
100+
metrics := newCountingEmbeddingMetrics()
101+
svc := &mockEmbeddingService{getErr: huberrors.NewNotFoundError("feedback record", "gone")}
102+
worker := NewFeedbackEmbeddingWorker(svc, &mockEmbeddingClient{}, "", metrics)
103+
104+
if err := worker.Work(context.Background(), embeddingJob()); err != nil {
105+
t.Fatalf("Work() error = %v, want nil (not-found completes)", err)
106+
}
107+
108+
// A not-found GET is a benign delete/purge race: record it as skipped, never
109+
// failed_final (which would trip failure alerts) and not as a worker error.
110+
if metrics.outcomes["skipped"] != 1 || metrics.outcomes["failed_final"] != 0 {
111+
t.Fatalf("skipped=%d failed_final=%d, want 1/0", metrics.outcomes["skipped"], metrics.outcomes["failed_final"])
112+
}
113+
114+
if metrics.workerErr["get_record_failed"] != 0 {
115+
t.Fatalf("get_record_failed=%d, want 0 (not-found is not a worker error)", metrics.workerErr["get_record_failed"])
116+
}
117+
}
118+
72119
func TestFeedbackEmbeddingWorker_Work_SetEmbeddingConflict(t *testing.T) {
73120
ctx := context.Background()
74121

tests/embedding_backfill_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package tests
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"testing"
7+
8+
"github.com/google/uuid"
9+
"github.com/riverqueue/river"
10+
"github.com/riverqueue/river/rivertype"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/formbricks/hub/internal/config"
15+
"github.com/formbricks/hub/internal/models"
16+
"github.com/formbricks/hub/internal/repository"
17+
"github.com/formbricks/hub/internal/service"
18+
"github.com/formbricks/hub/pkg/database"
19+
)
20+
21+
// countingEmbeddingInserter records the FeedbackEmbeddingArgs jobs enqueued.
22+
type countingEmbeddingInserter struct {
23+
ids []uuid.UUID
24+
}
25+
26+
func (c *countingEmbeddingInserter) Insert(
27+
_ context.Context, args river.JobArgs, _ *river.InsertOpts,
28+
) (*rivertype.JobInsertResult, error) {
29+
if a, ok := args.(service.FeedbackEmbeddingArgs); ok {
30+
c.ids = append(c.ids, a.FeedbackRecordID)
31+
}
32+
33+
return &rivertype.JobInsertResult{}, nil
34+
}
35+
36+
func embeddingBackfillRepos(t *testing.T) (*repository.FeedbackRecordsRepository, *repository.EmbeddingsRepository) {
37+
t.Helper()
38+
39+
cfg, err := config.Load()
40+
require.NoError(t, err)
41+
42+
db, err := database.NewPostgresPool(
43+
context.Background(), cfg.Database.URL, database.WithPoolConfig(cfg.Database.PoolConfig()))
44+
require.NoError(t, err)
45+
46+
t.Cleanup(db.Close)
47+
48+
return repository.NewFeedbackRecordsRepository(db), repository.NewEmbeddingsRepository(db)
49+
}
50+
51+
// TestListFeedbackRecordIDsForBackfill_KeysetPagination locks the new keyset behavior of the
52+
// embedding backfill query: each page is bounded by the limit, ids are returned strictly
53+
// ascending with no id on two pages, and paging from uuid.Nil eventually returns every record
54+
// that needs an embedding. A fresh model is used so the created records are all eligible.
55+
func TestListFeedbackRecordIDsForBackfill_KeysetPagination(t *testing.T) {
56+
ctx := context.Background()
57+
feedbackRepo, embeddingsRepo := embeddingBackfillRepos(t)
58+
59+
model := "backfill-keyset-" + uuid.NewString()
60+
tenant := uuid.NewString()
61+
62+
makeText := func(value string) uuid.UUID {
63+
rec, err := feedbackRepo.Create(ctx, &models.CreateFeedbackRecordRequest{
64+
SourceType: "formbricks",
65+
SubmissionID: uuid.NewString(),
66+
TenantID: tenant,
67+
FieldID: "q1",
68+
FieldType: models.FieldTypeText,
69+
ValueText: &value,
70+
})
71+
require.NoError(t, err)
72+
73+
return rec.ID
74+
}
75+
76+
mine := map[uuid.UUID]bool{}
77+
for _, value := range []string{"one", "two", "three", "four", "five"} {
78+
mine[makeText(value)] = true
79+
}
80+
81+
seen := map[uuid.UUID]bool{}
82+
afterID := uuid.Nil
83+
84+
var prev uuid.UUID
85+
86+
hasPrev := false
87+
88+
for {
89+
page, err := embeddingsRepo.ListFeedbackRecordIDsForBackfill(ctx, model, afterID, 2)
90+
require.NoError(t, err)
91+
require.LessOrEqual(t, len(page), 2, "LIMIT bounds the page size")
92+
93+
if len(page) == 0 {
94+
break
95+
}
96+
97+
for _, id := range page {
98+
require.False(t, seen[id], "an id must not appear on two pages")
99+
seen[id] = true
100+
101+
if hasPrev {
102+
require.Negativef(t, bytes.Compare(prev[:], id[:]), "ids are returned strictly ascending")
103+
}
104+
105+
prev = id
106+
hasPrev = true
107+
}
108+
109+
afterID = page[len(page)-1]
110+
111+
if len(page) < 2 {
112+
break
113+
}
114+
}
115+
116+
for id := range mine {
117+
assert.Truef(t, seen[id], "record %s needing an embedding must be returned across pages", id)
118+
}
119+
}
120+
121+
// TestBackfillEmbeddings_StreamsAllEligible drives the service backfill against the real DB
122+
// with a recording inserter and asserts it enqueues an embedding job for every record that
123+
// needs one (a fresh model makes the created records eligible).
124+
func TestBackfillEmbeddings_StreamsAllEligible(t *testing.T) {
125+
ctx := context.Background()
126+
feedbackRepo, embeddingsRepo := embeddingBackfillRepos(t)
127+
128+
model := "backfill-stream-" + uuid.NewString()
129+
tenant := uuid.NewString()
130+
131+
makeText := func(value string) uuid.UUID {
132+
rec, err := feedbackRepo.Create(ctx, &models.CreateFeedbackRecordRequest{
133+
SourceType: "formbricks",
134+
SubmissionID: uuid.NewString(),
135+
TenantID: tenant,
136+
FieldID: "q1",
137+
FieldType: models.FieldTypeText,
138+
ValueText: &value,
139+
})
140+
require.NoError(t, err)
141+
142+
return rec.ID
143+
}
144+
145+
mine := []uuid.UUID{makeText("one"), makeText("two"), makeText("three")}
146+
147+
inserter := &countingEmbeddingInserter{}
148+
svc := service.NewFeedbackRecordsService(feedbackRepo, embeddingsRepo, model, nil, inserter, "embeddings", 3)
149+
150+
enqueued, err := svc.BackfillEmbeddings(ctx, model)
151+
require.NoError(t, err)
152+
require.GreaterOrEqual(t, enqueued, len(mine))
153+
154+
got := map[uuid.UUID]bool{}
155+
for _, id := range inserter.ids {
156+
got[id] = true
157+
}
158+
159+
for _, id := range mine {
160+
assert.Truef(t, got[id], "record %s needing an embedding must be enqueued", id)
161+
}
162+
}

0 commit comments

Comments
 (0)