diff --git a/internal/api/handlers/feedback_records_handler.go b/internal/api/handlers/feedback_records_handler.go index a592295..6b6ca83 100644 --- a/internal/api/handlers/feedback_records_handler.go +++ b/internal/api/handlers/feedback_records_handler.go @@ -14,6 +14,7 @@ import ( "github.com/formbricks/hub/internal/api/validation" "github.com/formbricks/hub/internal/huberrors" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // FeedbackRecordsService defines the interface for feedback records business logic. @@ -123,6 +124,12 @@ func (h *FeedbackRecordsHandler) List(w http.ResponseWriter, r *http.Request) { result, err := h.service.ListFeedbackRecords(r.Context(), filters) if err != nil { + if errors.Is(err, cursor.ErrInvalidCursor) { + response.RespondBadRequest(w, "Invalid cursor: omit for first page, or use the exact next_cursor value from the previous response") + + return + } + response.RespondInternalServerError(w, "An unexpected error occurred") return diff --git a/internal/api/handlers/search_handler.go b/internal/api/handlers/search_handler.go index 4b7ba66..7597d70 100644 --- a/internal/api/handlers/search_handler.go +++ b/internal/api/handlers/search_handler.go @@ -18,9 +18,9 @@ import ( // SearchService defines the interface for semantic search and similar feedback. type SearchService interface { - SemanticSearch(ctx context.Context, query, tenantID string, limit, offset int, minScore float64, cursor string) ( + SemanticSearch(ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string) ( service.SearchResult, error) - SimilarFeedback(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, + SimilarFeedback(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string) (service.SearchResult, error) } @@ -55,14 +55,7 @@ type SemanticSearchResultItem struct { ValueText string `json:"value_text"` // value_text of the feedback record (the text that was embedded) } -// maxSearchOffset caps how far paging can go. With OFFSET-based paging the database -// still computes and discards all rows before the offset, so large offsets (e.g. 5000) -// make queries slow. Clamping keeps latency predictable and discourages deep paging. -// To support deeper paging without this limit, switch to cursor-based (keyset) pagination: -// return a cursor (e.g. last score + last feedback_record_id), and query WHERE (score, id) after cursor -// instead of OFFSET. const ( - maxSearchOffset = 1000 defaultSearchLimit = 10 maxSearchLimit = 100 ) @@ -99,17 +92,10 @@ func (h *SearchHandler) SemanticSearch(w http.ResponseWriter, r *http.Request) { } limit := parseLimit(r.URL.Query().Get("limit"), defaultSearchLimit, maxSearchLimit) - cursor := strings.TrimSpace(r.URL.Query().Get("cursor")) - - offset := 0 - if cursor == "" { - offset = min(parseOffset(r.URL.Query().Get("offset")), maxSearchOffset) - } - minScore := parseMinScore(r.URL.Query().Get("min_score")) - res, err := h.service.SemanticSearch(r.Context(), req.Query, req.TenantID, limit, offset, minScore, cursor) + res, err := h.service.SemanticSearch(r.Context(), req.Query, req.TenantID, limit, minScore, cursor) if err != nil { if errors.Is(err, service.ErrMissingTenantID) { response.RespondBadRequest(w, "tenant_id is required") @@ -177,17 +163,10 @@ func (h *SearchHandler) SimilarFeedback(w http.ResponseWriter, r *http.Request) } limit := parseLimit(r.URL.Query().Get("limit"), defaultSearchLimit, maxSearchLimit) - cursor := strings.TrimSpace(r.URL.Query().Get("cursor")) - - offset := 0 - if cursor == "" { - offset = min(parseOffset(r.URL.Query().Get("offset")), maxSearchOffset) - } - minScore := parseMinScore(r.URL.Query().Get("min_score")) - res, err := h.service.SimilarFeedback(r.Context(), id, tenantID, limit, offset, minScore, cursor) + res, err := h.service.SimilarFeedback(r.Context(), id, tenantID, limit, minScore, cursor) if err != nil { if errors.Is(err, service.ErrEmbeddingNotFound) { response.RespondNotFound(w, "Feedback record has no embedding for the current model") @@ -233,20 +212,6 @@ func parseLimit(s string, def, upperBound int) int { return min(n, upperBound) } -// parseOffset returns the query param "offset" as a non-negative int; default 0. -func parseOffset(s string) int { - if s == "" { - return 0 - } - - n, err := strconv.Atoi(s) - if err != nil || n < 0 { - return 0 - } - - return n -} - // defaultMinScore is the default minimum similarity score when the query param is omitted (reduces noise). const defaultMinScore = 0.7 diff --git a/internal/api/handlers/search_handler_test.go b/internal/api/handlers/search_handler_test.go index efd54d9..9f228e9 100644 --- a/internal/api/handlers/search_handler_test.go +++ b/internal/api/handlers/search_handler_test.go @@ -17,27 +17,27 @@ import ( ) type mockSearchService struct { - semanticFunc func(ctx context.Context, query, tenantID string, limit, offset int, minScore float64, + semanticFunc func(ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string) (service.SearchResult, error) - similarFunc func(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, + similarFunc func(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string) (service.SearchResult, error) } func (m *mockSearchService) SemanticSearch( - ctx context.Context, query, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { if m.semanticFunc != nil { - return m.semanticFunc(ctx, query, tenantID, limit, offset, minScore, cursor) + return m.semanticFunc(ctx, query, tenantID, limit, minScore, cursor) } return service.SearchResult{}, nil } func (m *mockSearchService) SimilarFeedback( - ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { if m.similarFunc != nil { - return m.similarFunc(ctx, feedbackRecordID, tenantID, limit, offset, minScore, cursor) + return m.similarFunc(ctx, feedbackRecordID, tenantID, limit, minScore, cursor) } return service.SearchResult{}, nil @@ -60,7 +60,7 @@ func TestSearchHandler_SemanticSearch(t *testing.T) { t.Run("empty query returns 400", func(t *testing.T) { called := false mock := &mockSearchService{ - semanticFunc: func(_ context.Context, _, _ string, _, _ int, _ float64, _ string) (service.SearchResult, error) { + semanticFunc: func(_ context.Context, _, _ string, _ int, _ float64, _ string) (service.SearchResult, error) { called = true return service.SearchResult{}, service.ErrEmptyQuery @@ -85,13 +85,12 @@ func TestSearchHandler_SemanticSearch(t *testing.T) { val1 := "Login is very slow." val2 := "Dashboard loads fast." mock := &mockSearchService{ - semanticFunc: func(_ context.Context, query, tenantID string, limit, offset int, minScore float64, + semanticFunc: func(_ context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { assert.Equal(t, "login is slow", query) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 0, offset) assert.InDelta(t, 0.7, minScore, 1e-9) assert.Empty(t, cursor) @@ -132,7 +131,7 @@ func TestSearchHandler_SemanticSearch(t *testing.T) { t.Run("invalid cursor returns 400", func(t *testing.T) { mock := &mockSearchService{ - semanticFunc: func(_ context.Context, _, _ string, _, _ int, _ float64, cursor string) (service.SearchResult, error) { + semanticFunc: func(_ context.Context, _, _ string, _ int, _ float64, cursor string) (service.SearchResult, error) { if cursor != "" { return service.SearchResult{}, service.ErrInvalidCursor } @@ -171,7 +170,7 @@ func TestSearchHandler_SimilarFeedback(t *testing.T) { t.Run("embedding not found returns 404", func(t *testing.T) { mock := &mockSearchService{ - similarFunc: func(_ context.Context, _ uuid.UUID, _ string, _, _ int, _ float64, _ string) (service.SearchResult, error) { + similarFunc: func(_ context.Context, _ uuid.UUID, _ string, _ int, _ float64, _ string) (service.SearchResult, error) { return service.SearchResult{}, service.ErrEmbeddingNotFound }, } @@ -191,13 +190,12 @@ func TestSearchHandler_SimilarFeedback(t *testing.T) { similarID := uuid.MustParse("018e1234-5678-9abc-def0-aaaaaaaaaaaa") similarVal := "Similar feedback text." mock := &mockSearchService{ - similarFunc: func(_ context.Context, fid uuid.UUID, tenantID string, limit, offset int, minScore float64, + similarFunc: func(_ context.Context, fid uuid.UUID, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { assert.Equal(t, id, fid) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 0, offset) assert.InDelta(t, 0.7, minScore, 1e-9) assert.Empty(t, cursor) diff --git a/internal/api/handlers/webhooks_handler.go b/internal/api/handlers/webhooks_handler.go index 4d98a3c..8f6ea86 100644 --- a/internal/api/handlers/webhooks_handler.go +++ b/internal/api/handlers/webhooks_handler.go @@ -13,6 +13,7 @@ import ( "github.com/formbricks/hub/internal/api/validation" "github.com/formbricks/hub/internal/huberrors" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // WebhooksService defines the interface for webhooks business logic. @@ -122,6 +123,12 @@ func (h *WebhooksHandler) List(w http.ResponseWriter, r *http.Request) { result, err := h.service.ListWebhooks(r.Context(), filters) if err != nil { + if errors.Is(err, cursor.ErrInvalidCursor) { + response.RespondBadRequest(w, "Invalid cursor: omit for first page, or use the exact next_cursor value from the previous response") + + return + } + slog.Error("Failed to list webhooks", "method", r.Method, "path", r.URL.Path, "error", err) response.RespondInternalServerError(w, "An unexpected error occurred") diff --git a/internal/models/feedback_records.go b/internal/models/feedback_records.go index 113fcb8..f3c7fbb 100644 --- a/internal/models/feedback_records.go +++ b/internal/models/feedback_records.go @@ -186,15 +186,14 @@ type ListFeedbackRecordsFilters struct { Since *time.Time `form:"since" validate:"omitempty"` Until *time.Time `form:"until" validate:"omitempty"` Limit int `form:"limit" validate:"omitempty,min=1,max=1000"` - Offset int `form:"offset" validate:"omitempty,min=0"` + Cursor string `form:"cursor" validate:"omitempty"` // keyset; omit for first page, use next_cursor for next } // ListFeedbackRecordsResponse represents the response for listing feedback records. type ListFeedbackRecordsResponse struct { - Data []FeedbackRecord `json:"data"` - Total int64 `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Data []FeedbackRecord `json:"data"` + Limit int `json:"limit"` + NextCursor string `json:"next_cursor,omitempty"` // present when there may be more results } // BulkDeleteFilters represents query parameters for bulk delete operation. diff --git a/internal/models/webhooks.go b/internal/models/webhooks.go index 0fa97a5..06594a2 100644 --- a/internal/models/webhooks.go +++ b/internal/models/webhooks.go @@ -198,13 +198,12 @@ type ListWebhooksFilters struct { Enabled *bool `form:"enabled"` TenantID *string `form:"tenant_id" validate:"omitempty,no_null_bytes"` Limit int `form:"limit" validate:"omitempty,min=1,max=1000"` - Offset int `form:"offset" validate:"omitempty,min=0"` + Cursor string `form:"cursor" validate:"omitempty"` // keyset cursor; omit for first page, use next_cursor for subsequent pages } // ListWebhooksResponse represents the response for listing webhooks. type ListWebhooksResponse struct { - Data []Webhook `json:"data"` - Total int64 `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Data []Webhook `json:"data"` + Limit int `json:"limit"` + NextCursor string `json:"next_cursor,omitempty"` // present when there may be more results } diff --git a/internal/repository/embeddings_repository.go b/internal/repository/embeddings_repository.go index e9e8561..006c252 100644 --- a/internal/repository/embeddings_repository.go +++ b/internal/repository/embeddings_repository.go @@ -169,22 +169,18 @@ func (r *EmbeddingsRepository) GetEmbeddingByFeedbackRecordAndModelAndTenant( // filtered in application code (not in WHERE) so pgvector's iterative index scan can run. Uses // full-precision query vector (no quantization); sets hnsw.ef_search for better recall. Over-fetches // then trims to limit to account for tenant/minScore filtering. excludeID optionally excludes one -// feedback record (e.g. for "similar" endpoint). offset is the number of rows to skip (for paging). +// feedback record (e.g. for "similar" endpoint). First page only; use NearestFeedbackRecordsByEmbeddingAfterCursor for next pages. func (r *EmbeddingsRepository) NearestFeedbackRecordsByEmbedding( - ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { if len(queryEmbedding) != models.EmbeddingVectorDimensions { return nil, false, fmt.Errorf("%w: got %d, want %d", ErrEmbeddingDimensionMismatch, len(queryEmbedding), models.EmbeddingVectorDimensions) } - if offset < 0 { - offset = 0 - } - // Full-precision query vector (ephemeral); pgvector compares vector vs halfvec natively. queryVec := pgvector.NewVector(queryEmbedding) - fetchLimit := min(limit*nearestOverFetchFactor+offset, maxNearestFetchLimit) + fetchLimit := min(limit*nearestOverFetchFactor, maxNearestFetchLimit) dbTx, err := r.db.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -210,7 +206,7 @@ func (r *EmbeddingsRepository) NearestFeedbackRecordsByEmbedding( INNER JOIN feedback_records fr ON fr.id = e.feedback_record_id WHERE e.model = $2 AND fr.tenant_id = $3 ORDER BY (e.embedding <=> $1), e.feedback_record_id - LIMIT $4 OFFSET $5`, queryVec, model, tenantID, fetchLimit, offset) + LIMIT $4`, queryVec, model, tenantID, fetchLimit) } else { rows, err = dbTx.Query(ctx, ` SELECT e.feedback_record_id, (1 - (e.embedding <=> $1)) AS score, COALESCE(fr.field_label, ''), fr.value_text @@ -218,7 +214,7 @@ func (r *EmbeddingsRepository) NearestFeedbackRecordsByEmbedding( INNER JOIN feedback_records fr ON fr.id = e.feedback_record_id WHERE e.model = $2 AND fr.tenant_id = $3 AND e.feedback_record_id != $4 ORDER BY (e.embedding <=> $1), e.feedback_record_id - LIMIT $5 OFFSET $6`, queryVec, model, tenantID, *excludeID, fetchLimit, offset) + LIMIT $5`, queryVec, model, tenantID, *excludeID, fetchLimit) } if err != nil { diff --git a/internal/repository/feedback_records_repository.go b/internal/repository/feedback_records_repository.go index ad461f1..1ba6d0a 100644 --- a/internal/repository/feedback_records_repository.go +++ b/internal/repository/feedback_records_repository.go @@ -180,9 +180,7 @@ func buildFilterConditions(filters *models.ListFeedbackRecordsFilters) (whereCla return whereClause, args } -// List retrieves feedback records with optional filters. Embedding is not selected (API reads stay lean). -func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) { - query := ` +const feedbackRecordsListSelect = ` SELECT id, collected_at, created_at, updated_at, source_type, source_id, source_name, field_id, field_label, field_type, field_group_id, field_group_label, @@ -191,72 +189,88 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li FROM feedback_records ` +// List retrieves feedback records with optional filters. Embedding is not selected (API reads stay lean). +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. +func (r *FeedbackRecordsRepository) List( + ctx context.Context, filters *models.ListFeedbackRecordsFilters, +) ([]models.FeedbackRecord, bool, error) { + query := feedbackRecordsListSelect + whereClause, args := buildFilterConditions(filters) query += whereClause argCount := len(args) + 1 - query += " ORDER BY collected_at DESC" - - if filters.Limit > 0 { - query += fmt.Sprintf(" LIMIT $%d", argCount) + query += " ORDER BY collected_at DESC, id ASC" - args = append(args, filters.Limit) - argCount++ + limit := filters.Limit + if limit <= 0 { + limit = 100 } - if filters.Offset > 0 { - query += fmt.Sprintf(" OFFSET $%d", argCount) + query += fmt.Sprintf(" LIMIT $%d", argCount) - args = append(args, filters.Offset) - } + args = append(args, limit+1) - rows, err := r.db.Query(ctx, query, args...) + records, err := r.fetchFeedbackRecords(ctx, query, args...) if err != nil { - return nil, fmt.Errorf("failed to list feedback records: %w", err) + return nil, false, err } - defer rows.Close() - records := []models.FeedbackRecord{} // Initialize as empty slice, not nil + hasMore := len(records) > limit + if hasMore { + records = records[:limit] + } - for rows.Next() { - var record models.FeedbackRecord + return records, hasMore, nil +} - err := rows.Scan( - &record.ID, &record.CollectedAt, &record.CreatedAt, &record.UpdatedAt, - &record.SourceType, &record.SourceID, &record.SourceName, - &record.FieldID, &record.FieldLabel, &record.FieldType, &record.FieldGroupID, &record.FieldGroupLabel, - &record.ValueText, &record.ValueNumber, &record.ValueBoolean, &record.ValueDate, - &record.Metadata, &record.Language, &record.UserIdentifier, &record.TenantID, &record.SubmissionID, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan feedback record: %w", err) - } +// ListAfterCursor retrieves feedback records after the given keyset cursor (collected_at, id). +// Order is collected_at DESC, id ASC. The cursor represents the last row of the previous page. +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. +func (r *FeedbackRecordsRepository) ListAfterCursor( + ctx context.Context, filters *models.ListFeedbackRecordsFilters, cursorCollectedAt time.Time, cursorID uuid.UUID, +) ([]models.FeedbackRecord, bool, error) { + query := feedbackRecordsListSelect - records = append(records, record) - } + whereClause, args := buildFilterConditions(filters) + query += whereClause - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating feedback records: %w", err) + // Keyset condition: next page = (collected_at < cursor) OR (collected_at = cursor AND id > cursorID) + // For ORDER BY collected_at DESC, id ASC (two cursor params: collected_at, id) + argTime := len(args) + 1 + + argID := len(args) + 2 //nolint:mnd // second keyset param + if whereClause != "" { + query += fmt.Sprintf(" AND (collected_at < $%d OR (collected_at = $%d AND id > $%d))", argTime, argTime, argID) + } else { + query += fmt.Sprintf(" WHERE (collected_at < $%d OR (collected_at = $%d AND id > $%d))", argTime, argTime, argID) } - return records, nil -} + args = append(args, cursorCollectedAt, cursorID) + argCount := len(args) + 1 -// Count returns the total count of feedback records matching the filters. -func (r *FeedbackRecordsRepository) Count(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (int64, error) { - query := `SELECT COUNT(*) FROM feedback_records` + query += " ORDER BY collected_at DESC, id ASC" - whereClause, args := buildFilterConditions(filters) - query += whereClause + limit := filters.Limit + if limit <= 0 { + limit = 100 + } + + query += fmt.Sprintf(" LIMIT $%d", argCount) - var count int64 + args = append(args, limit+1) - err := r.db.QueryRow(ctx, query, args...).Scan(&count) + records, err := r.fetchFeedbackRecords(ctx, query, args...) if err != nil { - return 0, fmt.Errorf("failed to count feedback records: %w", err) + return nil, false, err } - return count, nil + hasMore := len(records) > limit + if hasMore { + records = records[:limit] + } + + return records, hasMore, nil } // buildUpdateQuery builds an UPDATE query with SET clause and arguments. @@ -420,3 +434,40 @@ func (r *FeedbackRecordsRepository) BulkDelete(ctx context.Context, userIdentifi return ids, nil } + +// fetchFeedbackRecords executes the given query and scans rows into FeedbackRecord slices. +// Used by List and ListAfterCursor to avoid duplicating SELECT/scan logic. +func (r *FeedbackRecordsRepository) fetchFeedbackRecords( + ctx context.Context, query string, args ...any, +) ([]models.FeedbackRecord, error) { + rows, err := r.db.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to list feedback records: %w", err) + } + defer rows.Close() + + records := []models.FeedbackRecord{} + + for rows.Next() { + var record models.FeedbackRecord + + err := rows.Scan( + &record.ID, &record.CollectedAt, &record.CreatedAt, &record.UpdatedAt, + &record.SourceType, &record.SourceID, &record.SourceName, + &record.FieldID, &record.FieldLabel, &record.FieldType, &record.FieldGroupID, &record.FieldGroupLabel, + &record.ValueText, &record.ValueNumber, &record.ValueBoolean, &record.ValueDate, + &record.Metadata, &record.Language, &record.UserIdentifier, &record.TenantID, &record.SubmissionID, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan feedback record: %w", err) + } + + records = append(records, record) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating feedback records: %w", err) + } + + return records, nil +} diff --git a/internal/repository/webhooks_repository.go b/internal/repository/webhooks_repository.go index 0288fec..8509153 100644 --- a/internal/repository/webhooks_repository.go +++ b/internal/repository/webhooks_repository.go @@ -127,68 +127,90 @@ func buildWebhookFilterConditions(filters *models.ListWebhooksFilters) (whereCla return whereClause, args } -// List retrieves webhooks with optional filters. -func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, error) { - query := ` +const webhooksListSelect = ` SELECT id, url, signing_key, enabled, tenant_id, created_at, updated_at, event_types, disabled_reason, disabled_at FROM webhooks ` +// List retrieves webhooks with optional filters. +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. +func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + query := webhooksListSelect + whereClause, args := buildWebhookFilterConditions(filters) query += whereClause argCount := len(args) + 1 - query += " ORDER BY created_at DESC" - - if filters.Limit > 0 { - query += fmt.Sprintf(" LIMIT $%d", argCount) + query += " ORDER BY created_at DESC, id ASC" - args = append(args, filters.Limit) - argCount++ + limit := filters.Limit + if limit <= 0 { + limit = 100 } - if filters.Offset > 0 { - query += fmt.Sprintf(" OFFSET $%d", argCount) + query += fmt.Sprintf(" LIMIT $%d", argCount) - args = append(args, filters.Offset) - } + args = append(args, limit+1) - rows, err := r.db.Query(ctx, query, args...) + webhooks, err := r.fetchWebhooks(ctx, query, args...) if err != nil { - return nil, fmt.Errorf("failed to list webhooks: %w", err) + return nil, false, err } - defer rows.Close() - webhooks := []models.Webhook{} + hasMore := len(webhooks) > limit + if hasMore { + webhooks = webhooks[:limit] + } - for rows.Next() { - var ( - webhook models.Webhook - dbEventTypes []string - ) + return webhooks, hasMore, nil +} - err := rows.Scan( - &webhook.ID, &webhook.URL, &webhook.SigningKey, &webhook.Enabled, - &webhook.TenantID, &webhook.CreatedAt, &webhook.UpdatedAt, &dbEventTypes, - &webhook.DisabledReason, &webhook.DisabledAt, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan webhook: %w", err) - } +// ListAfterCursor retrieves webhooks after the given keyset cursor (created_at, id). +// Order is created_at DESC, id ASC. The cursor represents the last row of the previous page. +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. +func (r *WebhooksRepository) ListAfterCursor( + ctx context.Context, filters *models.ListWebhooksFilters, cursorCreatedAt time.Time, cursorID uuid.UUID, +) ([]models.Webhook, bool, error) { + query := webhooksListSelect - webhook.EventTypes, err = parseDBEventTypes(dbEventTypes) - if err != nil { - return nil, err - } + whereClause, args := buildWebhookFilterConditions(filters) + query += whereClause - webhooks = append(webhooks, webhook) + // Keyset condition: next page = (created_at < cursor) OR (created_at = cursor AND id > cursorID) + argTime := len(args) + 1 + + argID := len(args) + 2 //nolint:mnd // second keyset param + if whereClause != "" { + query += fmt.Sprintf(" AND (created_at < $%d OR (created_at = $%d AND id > $%d))", argTime, argTime, argID) + } else { + query += fmt.Sprintf(" WHERE (created_at < $%d OR (created_at = $%d AND id > $%d))", argTime, argTime, argID) } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating webhooks: %w", err) + args = append(args, cursorCreatedAt, cursorID) + argCount := len(args) + 1 + + query += " ORDER BY created_at DESC, id ASC" + + limit := filters.Limit + if limit <= 0 { + limit = 100 } - return webhooks, nil + query += fmt.Sprintf(" LIMIT $%d", argCount) + + args = append(args, limit+1) + + webhooks, err := r.fetchWebhooks(ctx, query, args...) + if err != nil { + return nil, false, err + } + + hasMore := len(webhooks) > limit + if hasMore { + webhooks = webhooks[:limit] + } + + return webhooks, hasMore, nil } // Count returns the total count of webhooks matching the filters. @@ -354,17 +376,16 @@ func parseDBEventTypes(ss []string) ([]datatypes.EventType, error) { return out, nil } -// ListEnabled retrieves all enabled webhooks. +// ListEnabled retrieves all enabled webhooks (unbounded; used for delivery fan-out). func (r *WebhooksRepository) ListEnabled(ctx context.Context) ([]models.Webhook, error) { - filters := &models.ListWebhooksFilters{ - Enabled: func() *bool { - b := true + query := webhooksListSelect + ` WHERE enabled = true ORDER BY created_at DESC, id ASC` - return &b - }(), + webhooks, err := r.fetchWebhooks(ctx, query) + if err != nil { + return nil, fmt.Errorf("list enabled webhooks: %w", err) } - return r.List(ctx, filters) + return webhooks, nil } // ListEnabledForEventType retrieves all enabled webhooks that should receive a specific event type. @@ -375,6 +396,7 @@ func (r *WebhooksRepository) ListEnabledForEventType(ctx context.Context, eventT FROM webhooks WHERE enabled = true AND (event_types IS NULL OR event_types = '{}' OR event_types @> ARRAY[$1]::VARCHAR(64)[]) + ORDER BY id ` rows, err := r.db.Query(ctx, query, eventType) @@ -414,3 +436,43 @@ func (r *WebhooksRepository) ListEnabledForEventType(ctx context.Context, eventT return webhooks, nil } + +// fetchWebhooks executes the given query and scans rows into Webhook slices. +func (r *WebhooksRepository) fetchWebhooks(ctx context.Context, query string, args ...any) ([]models.Webhook, error) { + rows, err := r.db.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to list webhooks: %w", err) + } + defer rows.Close() + + webhooks := []models.Webhook{} + + for rows.Next() { + var ( + webhook models.Webhook + dbEventTypes []string + ) + + err := rows.Scan( + &webhook.ID, &webhook.URL, &webhook.SigningKey, &webhook.Enabled, + &webhook.TenantID, &webhook.CreatedAt, &webhook.UpdatedAt, &dbEventTypes, + &webhook.DisabledReason, &webhook.DisabledAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan webhook: %w", err) + } + + webhook.EventTypes, err = parseDBEventTypes(dbEventTypes) + if err != nil { + return nil, err + } + + webhooks = append(webhooks, webhook) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating webhooks: %w", err) + } + + return webhooks, nil +} diff --git a/internal/service/feedback_records_service.go b/internal/service/feedback_records_service.go index a2784a3..9cfc6af 100644 --- a/internal/service/feedback_records_service.go +++ b/internal/service/feedback_records_service.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/google/uuid" @@ -12,6 +13,7 @@ import ( "github.com/formbricks/hub/internal/datatypes" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // ErrUserIdentifierRequired is returned when bulk delete is called without user_identifier (err113). @@ -26,8 +28,11 @@ const uniqueByPeriodEmbedding = 24 * time.Hour type FeedbackRecordsRepository interface { Create(ctx context.Context, req *models.CreateFeedbackRecordRequest) (*models.FeedbackRecord, error) GetByID(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error) - List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) - Count(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (int64, error) + List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, bool, error) + ListAfterCursor( + ctx context.Context, filters *models.ListFeedbackRecordsFilters, + cursorCollectedAt time.Time, cursorID uuid.UUID, + ) ([]models.FeedbackRecord, bool, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error) Delete(ctx context.Context, id uuid.UUID) error BulkDelete(ctx context.Context, userIdentifier string, tenantID *string) ([]uuid.UUID, error) @@ -109,29 +114,54 @@ func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid. } // ListFeedbackRecords retrieves a list of feedback records with optional filters. +// Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages. func (s *FeedbackRecordsService) ListFeedbackRecords( ctx context.Context, filters *models.ListFeedbackRecordsFilters, ) (*models.ListFeedbackRecordsResponse, error) { - // Set default limit if not provided (validation ensures it's within bounds if provided) + if filters == nil { + filters = &models.ListFeedbackRecordsFilters{} + } + if filters.Limit <= 0 { - filters.Limit = 100 // Default limit + filters.Limit = 100 + } + + cursorStr := strings.TrimSpace(filters.Cursor) + + var ( + records []models.FeedbackRecord + hasMore bool + err error + ) + + if cursorStr != "" { + collectedAt, id, decErr := cursor.Decode(cursorStr) + if decErr != nil { + return nil, fmt.Errorf("decode cursor: %w", decErr) + } + + records, hasMore, err = s.repo.ListAfterCursor(ctx, filters, collectedAt, id) + } else { + records, hasMore, err = s.repo.List(ctx, filters) } - records, err := s.repo.List(ctx, filters) if err != nil { return nil, fmt.Errorf("list feedback records: %w", err) } - total, err := s.repo.Count(ctx, filters) + meta, err := BuildListPaginationMeta(filters.Limit, hasMore, func() (string, error) { + last := records[len(records)-1] + + return cursor.Encode(last.CollectedAt, last.ID) + }) if err != nil { - return nil, fmt.Errorf("count feedback records: %w", err) + return nil, fmt.Errorf("encode next cursor: %w", err) } return &models.ListFeedbackRecordsResponse{ - Data: records, - Total: total, - Limit: filters.Limit, - Offset: filters.Offset, + Data: records, + Limit: meta.Limit, + NextCursor: meta.NextCursor, }, nil } diff --git a/internal/service/pagination.go b/internal/service/pagination.go new file mode 100644 index 0000000..8a300d9 --- /dev/null +++ b/internal/service/pagination.go @@ -0,0 +1,27 @@ +package service + +// ListPaginationMeta holds pagination metadata for list endpoints (feedback records, webhooks). +type ListPaginationMeta struct { + Limit int + NextCursor string +} + +// BuildListPaginationMeta builds pagination metadata for cursor-based list responses. +// hasMore indicates a sentinel row was fetched (limit+1 returned, trimmed to limit). +// encodeLast is called only when hasMore is true to produce next_cursor. +func BuildListPaginationMeta( + limit int, hasMore bool, encodeLast func() (string, error), +) (ListPaginationMeta, error) { + meta := ListPaginationMeta{Limit: limit} + + if hasMore && encodeLast != nil { + next, err := encodeLast() + if err != nil { + return meta, err + } + + meta.NextCursor = next + } + + return meta, nil +} diff --git a/internal/service/search_service.go b/internal/service/search_service.go index 6139f21..056c878 100644 --- a/internal/service/search_service.go +++ b/internal/service/search_service.go @@ -32,7 +32,7 @@ type EmbeddingsRepositoryForSearch interface { ctx context.Context, feedbackRecordID uuid.UUID, model, tenantID string, ) ([]float32, error) NearestFeedbackRecordsByEmbedding( - ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) NearestFeedbackRecordsByEmbeddingAfterCursor( ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, @@ -79,11 +79,10 @@ func NewSearchService(p SearchServiceParams) *SearchService { } // SemanticSearch returns feedback record IDs and similarity scores for the given query, scoped to tenantID. -// Requires non-empty tenantID and non-empty (after trim) query. If cursor is non-empty it is used for -// keyset paging (offset is ignored); otherwise offset is used. minScore is the minimum similarity score (0..1). -// NextCursor is set when there may be a next page (full page returned). +// Requires non-empty tenantID and non-empty (after trim) query. Uses cursor-based pagination. +// minScore is the minimum similarity score (0..1). NextCursor is set when there may be a next page. func (s *SearchService) SemanticSearch( - ctx context.Context, query, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (SearchResult, error) { out := SearchResult{} if tenantID == "" { @@ -126,7 +125,7 @@ func (s *SearchService) SemanticSearch( ctx, s.model, embedding, tenantID, limit, lastDistance, lastID, nil, minScore) } else { results, hasMore, err = s.embeddingsRepo.NearestFeedbackRecordsByEmbedding( - ctx, s.model, embedding, tenantID, limit, offset, nil, minScore) + ctx, s.model, embedding, tenantID, limit, nil, minScore) } if err != nil { @@ -152,9 +151,9 @@ func (s *SearchService) SemanticSearch( // SimilarFeedback returns feedback record IDs and similarity scores for records similar to the given one, scoped to tenantID. // Requires non-empty tenantID. Returns ErrEmbeddingNotFound when the record has no embedding for the current model. -// If cursor is non-empty it is used for keyset paging (offset is ignored); otherwise offset is used. +// Uses cursor-based pagination. func (s *SearchService) SimilarFeedback( - ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string, ) (SearchResult, error) { out := SearchResult{} if tenantID == "" { @@ -191,7 +190,7 @@ func (s *SearchService) SimilarFeedback( ctx, s.model, embedding, tenantID, limit, lastDistance, lastID, &feedbackRecordID, minScore) } else { results, hasMore, err = s.embeddingsRepo.NearestFeedbackRecordsByEmbedding( - ctx, s.model, embedding, tenantID, limit, offset, &feedbackRecordID, minScore) + ctx, s.model, embedding, tenantID, limit, &feedbackRecordID, minScore) } if err != nil { diff --git a/internal/service/search_service_test.go b/internal/service/search_service_test.go index f0a5163..f270be7 100644 --- a/internal/service/search_service_test.go +++ b/internal/service/search_service_test.go @@ -38,7 +38,7 @@ type mockEmbeddingsRepoForSearch struct { getEmbeddingByTenantFunc func(ctx context.Context, feedbackRecordID uuid.UUID, model, tenantID string) ([]float32, error) nearestFunc func( ctx context.Context, model string, queryEmbedding []float32, - tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) nearestAfterFunc func( ctx context.Context, model string, queryEmbedding []float32, @@ -57,10 +57,10 @@ func (m *mockEmbeddingsRepoForSearch) GetEmbeddingByFeedbackRecordAndModelAndTen } func (m *mockEmbeddingsRepoForSearch) NearestFeedbackRecordsByEmbedding( - ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { if m.nearestFunc != nil { - return m.nearestFunc(ctx, model, queryEmbedding, tenantID, limit, offset, excludeID, minScore) + return m.nearestFunc(ctx, model, queryEmbedding, tenantID, limit, excludeID, minScore) } return nil, false, nil @@ -84,7 +84,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), "query", "", 10, 0, 0, "") + res, err := svc.SemanticSearch(context.Background(), "query", "", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, ErrMissingTenantID) }) @@ -95,7 +95,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), " ", "tenant-1", 10, 0, 0, "") + res, err := svc.SemanticSearch(context.Background(), " ", "tenant-1", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, ErrEmptyQuery) }) @@ -122,7 +122,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{ nearestFunc: func( _ context.Context, model string, queryEmbedding []float32, - tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { nearestCalled = true @@ -130,7 +130,6 @@ func TestSearchService_SemanticSearch(t *testing.T) { assert.Equal(t, []float32{0.1, 0.2}, queryEmbedding) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 2, offset) assert.Nil(t, excludeID) assert.InDelta(t, 0.5, minScore, 1e-9) @@ -141,7 +140,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { }, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), "login slow", "env-1", 10, 2, 0.5, "") + res, err := svc.SemanticSearch(context.Background(), "login slow", "env-1", 10, 0.5, "") require.NoError(t, err) require.True(t, queryClientCalled) require.True(t, nearestCalled) @@ -158,7 +157,7 @@ func TestSearchService_SimilarFeedback(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SimilarFeedback(context.Background(), uuid.MustParse("018e1234-5678-9abc-def0-123456789abc"), "", 10, 0, 0, "") + res, err := svc.SimilarFeedback(context.Background(), uuid.MustParse("018e1234-5678-9abc-def0-123456789abc"), "", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, ErrMissingTenantID) }) @@ -177,7 +176,7 @@ func TestSearchService_SimilarFeedback(t *testing.T) { }, Model: "test-model", }) - res, err := svc.SimilarFeedback(context.Background(), rid, "env-1", 10, 0, 0, "") + res, err := svc.SimilarFeedback(context.Background(), rid, "env-1", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, repository.ErrEmbeddingNotFound) }) @@ -197,12 +196,11 @@ func TestSearchService_SimilarFeedback(t *testing.T) { }, nearestFunc: func( _ context.Context, model string, _ []float32, - tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { assert.Equal(t, "test-model", model) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 0, offset) require.NotNil(t, excludeID) assert.Equal(t, sourceID, *excludeID) assert.InDelta(t, 0.5, minScore, 1e-9) @@ -214,7 +212,7 @@ func TestSearchService_SimilarFeedback(t *testing.T) { }, Model: "test-model", }) - res, err := svc.SimilarFeedback(context.Background(), sourceID, "env-1", 10, 0, 0.5, "") + res, err := svc.SimilarFeedback(context.Background(), sourceID, "env-1", 10, 0.5, "") require.NoError(t, err) require.Len(t, res.Results, 1) assert.Equal(t, similarID, res.Results[0].FeedbackRecordID) @@ -238,7 +236,7 @@ func TestSearchService_SemanticSearch_EmbeddingError(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), "query", "env-1", 10, 0, 0, "") + res, err := svc.SemanticSearch(context.Background(), "query", "env-1", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, embeddingErr) } diff --git a/internal/service/webhook_provider_test.go b/internal/service/webhook_provider_test.go index b3b330d..f3029f1 100644 --- a/internal/service/webhook_provider_test.go +++ b/internal/service/webhook_provider_test.go @@ -60,8 +60,14 @@ func (m *mockProviderRepo) GetByID(_ context.Context, _ uuid.UUID) (*models.Webh return nil, errors.New("not implemented") } -func (m *mockProviderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, error) { - return nil, errors.New("not implemented") +func (m *mockProviderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + return nil, false, errors.New("not implemented") +} + +func (m *mockProviderRepo) ListAfterCursor( + _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, +) ([]models.Webhook, bool, error) { + return nil, false, errors.New("not implemented") } func (m *mockProviderRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { diff --git a/internal/service/webhook_sender_test.go b/internal/service/webhook_sender_test.go index e6d8aef..09789bd 100644 --- a/internal/service/webhook_sender_test.go +++ b/internal/service/webhook_sender_test.go @@ -32,8 +32,14 @@ func (m *mockSenderRepo) GetByID(_ context.Context, _ uuid.UUID) (*models.Webhoo return nil, nil } -func (m *mockSenderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, error) { - return nil, nil +func (m *mockSenderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + return nil, false, nil +} + +func (m *mockSenderRepo) ListAfterCursor( + _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, +) ([]models.Webhook, bool, error) { + return nil, false, nil } func (m *mockSenderRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { diff --git a/internal/service/webhooks_service.go b/internal/service/webhooks_service.go index 5a58d83..33f4a34 100644 --- a/internal/service/webhooks_service.go +++ b/internal/service/webhooks_service.go @@ -5,6 +5,8 @@ import ( "crypto/rand" "encoding/base64" "fmt" + "strings" + "time" "github.com/google/uuid" standardwebhooks "github.com/standard-webhooks/standard-webhooks/libraries/go" @@ -12,13 +14,18 @@ import ( "github.com/formbricks/hub/internal/datatypes" "github.com/formbricks/hub/internal/huberrors" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // WebhooksRepository defines the interface for webhooks data access. type WebhooksRepository interface { Create(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error) GetByID(ctx context.Context, id uuid.UUID) (*models.Webhook, error) - List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, error) + List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, bool, error) + ListAfterCursor( + ctx context.Context, filters *models.ListWebhooksFilters, + cursorCreatedAt time.Time, cursorID uuid.UUID, + ) ([]models.Webhook, bool, error) Count(ctx context.Context, filters *models.ListWebhooksFilters) (int64, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error) Delete(ctx context.Context, id uuid.UUID) error @@ -114,26 +121,52 @@ func (s *WebhooksService) GetWebhook(ctx context.Context, id uuid.UUID) (*models } // ListWebhooks retrieves a list of webhooks with optional filters. +// Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages. func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.ListWebhooksFilters) (*models.ListWebhooksResponse, error) { + if filters == nil { + filters = &models.ListWebhooksFilters{} + } + if filters.Limit <= 0 { filters.Limit = 100 } - webhooks, err := s.repo.List(ctx, filters) + cursorStr := strings.TrimSpace(filters.Cursor) + + var ( + webhooks []models.Webhook + hasMore bool + err error + ) + + if cursorStr != "" { + createdAt, id, decErr := cursor.Decode(cursorStr) + if decErr != nil { + return nil, fmt.Errorf("decode cursor: %w", decErr) + } + + webhooks, hasMore, err = s.repo.ListAfterCursor(ctx, filters, createdAt, id) + } else { + webhooks, hasMore, err = s.repo.List(ctx, filters) + } + if err != nil { return nil, fmt.Errorf("list webhooks: %w", err) } - total, err := s.repo.Count(ctx, filters) + meta, err := BuildListPaginationMeta(filters.Limit, hasMore, func() (string, error) { + last := webhooks[len(webhooks)-1] + + return cursor.Encode(last.CreatedAt, last.ID) + }) if err != nil { - return nil, fmt.Errorf("count webhooks: %w", err) + return nil, fmt.Errorf("encode next cursor: %w", err) } return &models.ListWebhooksResponse{ - Data: webhooks, - Total: total, - Limit: filters.Limit, - Offset: filters.Offset, + Data: webhooks, + Limit: meta.Limit, + NextCursor: meta.NextCursor, }, nil } diff --git a/internal/service/webhooks_service_test.go b/internal/service/webhooks_service_test.go index 1bb126d..adcc598 100644 --- a/internal/service/webhooks_service_test.go +++ b/internal/service/webhooks_service_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/google/uuid" @@ -24,8 +25,14 @@ func (m *mockWebhooksRepo) GetByID(_ context.Context, _ uuid.UUID) (*models.Webh return nil, nil } -func (m *mockWebhooksRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, error) { - return nil, nil +func (m *mockWebhooksRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + return nil, false, nil +} + +func (m *mockWebhooksRepo) ListAfterCursor( + _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, +) ([]models.Webhook, bool, error) { + return nil, false, nil } func (m *mockWebhooksRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { diff --git a/migrations/006_list_keyset_indexes.sql b/migrations/006_list_keyset_indexes.sql new file mode 100644 index 0000000..7efe2e4 --- /dev/null +++ b/migrations/006_list_keyset_indexes.sql @@ -0,0 +1,21 @@ +-- +goose NO TRANSACTION +-- +goose Up +-- Indexes for optimal keyset (cursor) pagination on list endpoints. +-- CONCURRENTLY avoids blocking writes on large tables (requires NO TRANSACTION). +-- +-- feedback_records: list/ListAfterCursor ORDER BY collected_at DESC, id ASC with tenant_id filter. +-- Replaces idx_feedback_records_tenant_collected_at with id for tie-break; drop the older one. +DROP INDEX CONCURRENTLY IF EXISTS idx_feedback_records_tenant_collected_at; +CREATE INDEX CONCURRENTLY idx_feedback_records_tenant_collected_at_id + ON feedback_records (tenant_id, collected_at DESC, id); + +-- webhooks: list/ListAfterCursor ORDER BY created_at DESC, id ASC with tenant_id filter. +CREATE INDEX CONCURRENTLY idx_webhooks_tenant_created_at_id + ON webhooks (tenant_id, created_at DESC, id); + +-- +goose Down +DROP INDEX CONCURRENTLY IF EXISTS idx_webhooks_tenant_created_at_id; + +DROP INDEX CONCURRENTLY IF EXISTS idx_feedback_records_tenant_collected_at_id; +CREATE INDEX CONCURRENTLY idx_feedback_records_tenant_collected_at + ON feedback_records (tenant_id, collected_at); diff --git a/openapi.yaml b/openapi.yaml index bf9d2cd..243d8cd 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -145,16 +145,14 @@ paths: default: 100 minimum: 1 maximum: 1000 - - name: offset + - name: cursor in: query - description: Number of results to skip + description: | + Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. + Opaque (base64-encoded); keyset pagination. schema: - type: integer - description: Number of results to skip - format: int64 - default: 0 - minimum: 0 - maximum: 2147483647 + type: string + example: "eyJ0IjoiMjAyNC0wMS0xNVQxMDozMDowMFoiLCJpIjoiMDE4ZTEyMzQtNTY3OC05YWJjLWRlZjAtMTIzNDU2Nzg5YWJjIn0=" responses: "200": description: OK @@ -180,9 +178,13 @@ paths: source_name: "Q1 NPS Survey" user_identifier: "user-abc-123" tenant_id: "org-123" - total: 1 limit: 100 - offset: 0 + "400": + description: Bad Request (e.g. invalid cursor) + content: + application/problem+json: + schema: + $ref: '#/components/schemas/ErrorModel' default: description: Error content: @@ -584,19 +586,11 @@ paths: default: 10 minimum: 1 maximum: 100 - - name: offset - in: query - description: Number of results to skip (OFFSET-based paging). Ignored if cursor is set. Capped at 1000 for performance. - schema: - type: integer - minimum: 0 - maximum: 1000 - default: 0 - name: cursor in: query description: | Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. - Opaque (base64-encoded); when set, offset is ignored and the next page after this cursor is returned. + Opaque (base64-encoded); keyset pagination. schema: type: string example: "eyJkIjowLjEsImkiOiIwMThlMTIzNC01Njc4LTlhYmMtZGVmMC0xMTExMTExMTExMTEifQ==" @@ -679,19 +673,11 @@ paths: default: 10 minimum: 1 maximum: 100 - - name: offset - in: query - description: Number of results to skip (OFFSET-based paging). Ignored if cursor is set. Capped at 1000 for performance. - schema: - type: integer - minimum: 0 - maximum: 1000 - default: 0 - name: cursor in: query description: | Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. - Opaque (base64-encoded); when set, offset is ignored and the next page after this cursor is returned. + Opaque (base64-encoded); keyset pagination. schema: type: string example: "eyJkIjowLjEsImkiOiIwMThlMTIzNC01Njc4LTlhYmMtZGVmMC0xMTExMTExMTExMTEifQ==" @@ -766,15 +752,13 @@ paths: default: 100 minimum: 1 maximum: 1000 - - name: offset + - name: cursor in: query - description: Number of results to skip + description: | + Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. + Opaque (base64-encoded); keyset pagination. schema: - type: integer - format: int64 - default: 0 - minimum: 0 - maximum: 2147483647 + type: string responses: "200": description: OK @@ -782,6 +766,12 @@ paths: application/json: schema: $ref: '#/components/schemas/ListWebhooksOutputBody' + "400": + description: Bad Request (e.g. invalid cursor) + content: + application/problem+json: + schema: + $ref: '#/components/schemas/ErrorModel' default: description: Error content: @@ -1312,19 +1302,12 @@ components: type: integer description: Limit used in query format: int64 - offset: - type: integer - description: Offset used in query - format: int64 - total: - type: integer - description: Total count of feedback records matching filters - format: int64 + next_cursor: + type: string + description: Opaque cursor for the next page (keyset paging). Present only when there may be more results. Use as the cursor query param for the next page. required: - data - - total - limit - - offset CreateWebhookInputBody: type: object additionalProperties: false @@ -1370,23 +1353,16 @@ components: description: List of webhooks items: $ref: '#/components/schemas/WebhookData' - total: - type: integer - description: Total count matching filters - format: int64 limit: type: integer description: Limit used in query format: int64 - offset: - type: integer - description: Offset used in query - format: int64 + next_cursor: + type: string + description: Opaque cursor for the next page (keyset paging). Present only when there may be more results. Use as the cursor query param for the next page. required: - data - - total - limit - - offset UpdateWebhookInputBody: type: object additionalProperties: false diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go new file mode 100644 index 0000000..f9a53b6 --- /dev/null +++ b/pkg/cursor/cursor.go @@ -0,0 +1,63 @@ +// Package cursor provides encode/decode for keyset pagination cursors used by list endpoints. +// List endpoints (feedback records, webhooks) use (time.Time, uuid.UUID) as the keyset; +// search endpoints use a different format (see internal/service/search_cursor.go). +package cursor + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/google/uuid" +) + +// ErrInvalidCursor is returned when the cursor parameter is malformed or invalid. +var ErrInvalidCursor = errors.New("invalid cursor") + +type listCursorPayload struct { + T string `json:"t"` // RFC3339 timestamp (collected_at or created_at) + I string `json:"i"` // entity ID (UUID string) +} + +// Encode encodes a list cursor from the last row's timestamp and ID. +// Used for keyset pagination on ORDER BY timestamp DESC, id ASC. +func Encode(ts time.Time, id uuid.UUID) (string, error) { + b, err := json.Marshal(listCursorPayload{T: ts.UTC().Format(time.RFC3339Nano), I: id.String()}) + if err != nil { + return "", fmt.Errorf("encode list cursor: %w", err) + } + + return base64.URLEncoding.EncodeToString(b), nil +} + +// Decode parses a list cursor and returns (timestamp, id). +// Returns ErrInvalidCursor if the cursor is malformed. +func Decode(cursor string) (time.Time, uuid.UUID, error) { + if cursor == "" { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + raw, err := base64.URLEncoding.DecodeString(cursor) + if err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + var p listCursorPayload + if err := json.Unmarshal(raw, &p); err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + timestamp, err := time.Parse(time.RFC3339Nano, p.T) + if err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + id, err := uuid.Parse(p.I) + if err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + return timestamp, id, nil +} diff --git a/tests/integration_test.go b/tests/integration_test.go index d77c10a..c181099 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "os" "testing" @@ -397,6 +398,75 @@ func TestListFeedbackRecords(t *testing.T) { assert.Equal(t, "formbricks", exp.SourceType) } }) + + // Test cursor pagination + t.Run("Cursor pagination", func(t *testing.T) { + tenantID := "tenant-cursor-test" + // Create 3 records for pagination + for i := range 3 { + body, _ := json.Marshal(map[string]any{ + "source_type": "formbricks", + "submission_id": uuid.New().String(), + "tenant_id": tenantID, + "field_id": "q1", + "field_type": "text", + "value_text": fmt.Sprintf("record %d", i), + }) + req, _ := http.NewRequestWithContext(context.Background(), http.MethodPost, server.URL+"/v1/feedback-records", bytes.NewBuffer(body)) + req.Header.Set("Authorization", "Bearer "+testAPIKey) + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + } + + // First page (limit=2) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + server.URL+"/v1/feedback-records?tenant_id="+tenantID+"&limit=2", http.NoBody) + require.NoError(t, err) + req.Header.Set("Authorization", "Bearer "+testAPIKey) + resp, err := client.Do(req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var page1 models.ListFeedbackRecordsResponse + + err = decodeData(resp, &page1) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + assert.Len(t, page1.Data, 2) + assert.NotEmpty(t, page1.NextCursor) + + // Second page using cursor (URL-encode cursor since it may contain = padding) + listURL := fmt.Sprintf("%s/v1/feedback-records?tenant_id=%s&limit=2&cursor=%s", + server.URL, url.QueryEscape(tenantID), url.QueryEscape(page1.NextCursor)) + req2, err := http.NewRequestWithContext(context.Background(), http.MethodGet, listURL, http.NoBody) + require.NoError(t, err) + req2.Header.Set("Authorization", "Bearer "+testAPIKey) + resp2, err := client.Do(req2) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp2.StatusCode) + + var page2 models.ListFeedbackRecordsResponse + + err = decodeData(resp2, &page2) + require.NoError(t, err) + require.NoError(t, resp2.Body.Close()) + + assert.GreaterOrEqual(t, len(page2.Data), 1) + + // Invalid cursor returns 400 + req3, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + server.URL+"/v1/feedback-records?tenant_id="+tenantID+"&cursor=invalid", http.NoBody) + require.NoError(t, err) + req3.Header.Set("Authorization", "Bearer "+testAPIKey) + resp3, err := client.Do(req3) + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, resp3.StatusCode) + require.NoError(t, resp3.Body.Close()) + }) } func TestFeedbackRecordsSubmissionID(t *testing.T) { @@ -1027,9 +1097,20 @@ func TestWebhooksCRUD(t *testing.T) { err = decodeData(listResp, &listResult) require.NoError(t, err) require.NoError(t, listResp.Body.Close()) - assert.GreaterOrEqual(t, listResult.Total, int64(1)) assert.GreaterOrEqual(t, len(listResult.Data), 1) + // Test invalid cursor returns 400 + invalidCursorReq, err := http.NewRequestWithContext( + context.Background(), http.MethodGet, + server.URL+"/v1/webhooks?cursor=invalid", http.NoBody, + ) + require.NoError(t, err) + invalidCursorReq.Header.Set("Authorization", "Bearer "+testAPIKey) + invalidCursorResp, err := client.Do(invalidCursorReq) + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, invalidCursorResp.StatusCode) + require.NoError(t, invalidCursorResp.Body.Close()) + // Update webhook (including tenant_id) updateBody := map[string]any{ "url": "https://example.com/webhook-v2",