From 840dfd7c5ad4f1c531a6db0ef80595da4bbe3605 Mon Sep 17 00:00:00 2001 From: Tiago Farto Date: Wed, 4 Mar 2026 13:48:19 +0200 Subject: [PATCH 1/6] chore: cursor pagination on feedback_records and webhooks --- .../api/handlers/feedback_records_handler.go | 7 ++ internal/api/handlers/webhooks_handler.go | 7 ++ internal/models/feedback_records.go | 10 ++- internal/models/webhooks.go | 10 ++- .../repository/feedback_records_repository.go | 73 ++++++++++++++- internal/repository/webhooks_repository.go | 74 +++++++++++++++- internal/service/feedback_records_service.go | 61 ++++++++++++- internal/service/webhook_provider_test.go | 6 ++ internal/service/webhook_sender_test.go | 6 ++ internal/service/webhooks_service.go | 62 ++++++++++++- internal/service/webhooks_service_test.go | 7 ++ migrations/006_list_keyset_indexes.sql | 21 +++++ openapi.yaml | 37 +++++--- pkg/cursor/cursor.go | 63 +++++++++++++ tests/integration_test.go | 88 ++++++++++++++++++- 15 files changed, 503 insertions(+), 29 deletions(-) create mode 100644 migrations/006_list_keyset_indexes.sql create mode 100644 pkg/cursor/cursor.go diff --git a/internal/api/handlers/feedback_records_handler.go b/internal/api/handlers/feedback_records_handler.go index a592295..6b6ca83 100644 --- a/internal/api/handlers/feedback_records_handler.go +++ b/internal/api/handlers/feedback_records_handler.go @@ -14,6 +14,7 @@ import ( "github.com/formbricks/hub/internal/api/validation" "github.com/formbricks/hub/internal/huberrors" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // FeedbackRecordsService defines the interface for feedback records business logic. @@ -123,6 +124,12 @@ func (h *FeedbackRecordsHandler) List(w http.ResponseWriter, r *http.Request) { result, err := h.service.ListFeedbackRecords(r.Context(), filters) if err != nil { + if errors.Is(err, cursor.ErrInvalidCursor) { + response.RespondBadRequest(w, "Invalid cursor: omit for first page, or use the exact next_cursor value from the previous response") + + return + } + response.RespondInternalServerError(w, "An unexpected error occurred") return diff --git a/internal/api/handlers/webhooks_handler.go b/internal/api/handlers/webhooks_handler.go index 4d98a3c..8f6ea86 100644 --- a/internal/api/handlers/webhooks_handler.go +++ b/internal/api/handlers/webhooks_handler.go @@ -13,6 +13,7 @@ import ( "github.com/formbricks/hub/internal/api/validation" "github.com/formbricks/hub/internal/huberrors" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // WebhooksService defines the interface for webhooks business logic. @@ -122,6 +123,12 @@ func (h *WebhooksHandler) List(w http.ResponseWriter, r *http.Request) { result, err := h.service.ListWebhooks(r.Context(), filters) if err != nil { + if errors.Is(err, cursor.ErrInvalidCursor) { + response.RespondBadRequest(w, "Invalid cursor: omit for first page, or use the exact next_cursor value from the previous response") + + return + } + slog.Error("Failed to list webhooks", "method", r.Method, "path", r.URL.Path, "error", err) response.RespondInternalServerError(w, "An unexpected error occurred") diff --git a/internal/models/feedback_records.go b/internal/models/feedback_records.go index 113fcb8..41b5de9 100644 --- a/internal/models/feedback_records.go +++ b/internal/models/feedback_records.go @@ -187,14 +187,16 @@ type ListFeedbackRecordsFilters struct { Until *time.Time `form:"until" validate:"omitempty"` Limit int `form:"limit" validate:"omitempty,min=1,max=1000"` Offset int `form:"offset" validate:"omitempty,min=0"` + Cursor string `form:"cursor" validate:"omitempty"` // keyset cursor; when set, offset is ignored } // ListFeedbackRecordsResponse represents the response for listing feedback records. type ListFeedbackRecordsResponse struct { - Data []FeedbackRecord `json:"data"` - Total int64 `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Data []FeedbackRecord `json:"data"` + Total *int64 `json:"total,omitempty"` // set when offset-based; omitted when using cursor (avoids extra COUNT) + Limit int `json:"limit"` + Offset *int `json:"offset,omitempty"` // set when offset-based; omitted when using cursor + NextCursor string `json:"next_cursor,omitempty"` // present when there may be more results } // BulkDeleteFilters represents query parameters for bulk delete operation. diff --git a/internal/models/webhooks.go b/internal/models/webhooks.go index 0fa97a5..fc99e23 100644 --- a/internal/models/webhooks.go +++ b/internal/models/webhooks.go @@ -199,12 +199,14 @@ type ListWebhooksFilters struct { TenantID *string `form:"tenant_id" validate:"omitempty,no_null_bytes"` Limit int `form:"limit" validate:"omitempty,min=1,max=1000"` Offset int `form:"offset" validate:"omitempty,min=0"` + Cursor string `form:"cursor" validate:"omitempty"` // keyset cursor; when set, offset is ignored } // ListWebhooksResponse represents the response for listing webhooks. type ListWebhooksResponse struct { - Data []Webhook `json:"data"` - Total int64 `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Data []Webhook `json:"data"` + Total *int64 `json:"total,omitempty"` // set when offset-based; omitted when using cursor (avoids extra COUNT) + Limit int `json:"limit"` + Offset *int `json:"offset,omitempty"` // set when offset-based; omitted when using cursor + NextCursor string `json:"next_cursor,omitempty"` // present when there may be more results } diff --git a/internal/repository/feedback_records_repository.go b/internal/repository/feedback_records_repository.go index ad461f1..7fad65a 100644 --- a/internal/repository/feedback_records_repository.go +++ b/internal/repository/feedback_records_repository.go @@ -195,7 +195,7 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li query += whereClause argCount := len(args) + 1 - query += " ORDER BY collected_at DESC" + query += " ORDER BY collected_at DESC, id ASC" if filters.Limit > 0 { query += fmt.Sprintf(" LIMIT $%d", argCount) @@ -242,6 +242,77 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li return records, nil } +// ListAfterCursor retrieves feedback records after the given keyset cursor (collected_at, id). +// Order is collected_at DESC, id ASC. The cursor represents the last row of the previous page. +func (r *FeedbackRecordsRepository) ListAfterCursor( + ctx context.Context, filters *models.ListFeedbackRecordsFilters, cursorCollectedAt time.Time, cursorID uuid.UUID, +) ([]models.FeedbackRecord, error) { + query := ` + SELECT id, collected_at, created_at, updated_at, + source_type, source_id, source_name, + field_id, field_label, field_type, field_group_id, field_group_label, + value_text, value_number, value_boolean, value_date, + metadata, language, user_identifier, tenant_id, submission_id + FROM feedback_records + ` + + whereClause, args := buildFilterConditions(filters) + query += whereClause + + // Keyset condition: next page = (collected_at < cursor) OR (collected_at = cursor AND id > cursorID) + // For ORDER BY collected_at DESC, id ASC (two cursor params: collected_at, id) + argTime := len(args) + 1 + + argID := len(args) + 2 //nolint:mnd // second keyset param + if whereClause != "" { + query += fmt.Sprintf(" AND (collected_at < $%d OR (collected_at = $%d AND id > $%d))", argTime, argTime, argID) + } else { + query += fmt.Sprintf(" WHERE (collected_at < $%d OR (collected_at = $%d AND id > $%d))", argTime, argTime, argID) + } + + args = append(args, cursorCollectedAt, cursorID) + argCount := len(args) + 1 + + query += " ORDER BY collected_at DESC, id ASC" + + if filters.Limit > 0 { + query += fmt.Sprintf(" LIMIT $%d", argCount) + + args = append(args, filters.Limit) + } + + rows, err := r.db.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to list feedback records after cursor: %w", err) + } + defer rows.Close() + + records := []models.FeedbackRecord{} + + for rows.Next() { + var record models.FeedbackRecord + + err := rows.Scan( + &record.ID, &record.CollectedAt, &record.CreatedAt, &record.UpdatedAt, + &record.SourceType, &record.SourceID, &record.SourceName, + &record.FieldID, &record.FieldLabel, &record.FieldType, &record.FieldGroupID, &record.FieldGroupLabel, + &record.ValueText, &record.ValueNumber, &record.ValueBoolean, &record.ValueDate, + &record.Metadata, &record.Language, &record.UserIdentifier, &record.TenantID, &record.SubmissionID, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan feedback record: %w", err) + } + + records = append(records, record) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating feedback records: %w", err) + } + + return records, nil +} + // Count returns the total count of feedback records matching the filters. func (r *FeedbackRecordsRepository) Count(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (int64, error) { query := `SELECT COUNT(*) FROM feedback_records` diff --git a/internal/repository/webhooks_repository.go b/internal/repository/webhooks_repository.go index 0288fec..d296d13 100644 --- a/internal/repository/webhooks_repository.go +++ b/internal/repository/webhooks_repository.go @@ -138,7 +138,7 @@ func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebho query += whereClause argCount := len(args) + 1 - query += " ORDER BY created_at DESC" + query += " ORDER BY created_at DESC, id ASC" if filters.Limit > 0 { query += fmt.Sprintf(" LIMIT $%d", argCount) @@ -191,6 +191,78 @@ func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebho return webhooks, nil } +// ListAfterCursor retrieves webhooks after the given keyset cursor (created_at, id). +// Order is created_at DESC, id ASC. The cursor represents the last row of the previous page. +func (r *WebhooksRepository) ListAfterCursor( + ctx context.Context, filters *models.ListWebhooksFilters, cursorCreatedAt time.Time, cursorID uuid.UUID, +) ([]models.Webhook, error) { + query := ` + SELECT id, url, signing_key, enabled, tenant_id, created_at, updated_at, event_types, disabled_reason, disabled_at + FROM webhooks + ` + + whereClause, args := buildWebhookFilterConditions(filters) + query += whereClause + + // Keyset condition: next page = (created_at < cursor) OR (created_at = cursor AND id > cursorID) + argTime := len(args) + 1 + + argID := len(args) + 2 //nolint:mnd // second keyset param + if whereClause != "" { + query += fmt.Sprintf(" AND (created_at < $%d OR (created_at = $%d AND id > $%d))", argTime, argTime, argID) + } else { + query += fmt.Sprintf(" WHERE (created_at < $%d OR (created_at = $%d AND id > $%d))", argTime, argTime, argID) + } + + args = append(args, cursorCreatedAt, cursorID) + argCount := len(args) + 1 + + query += " ORDER BY created_at DESC, id ASC" + + if filters.Limit > 0 { + query += fmt.Sprintf(" LIMIT $%d", argCount) + + args = append(args, filters.Limit) + } + + rows, err := r.db.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to list webhooks after cursor: %w", err) + } + defer rows.Close() + + webhooks := []models.Webhook{} + + for rows.Next() { + var ( + webhook models.Webhook + dbEventTypes []string + ) + + err := rows.Scan( + &webhook.ID, &webhook.URL, &webhook.SigningKey, &webhook.Enabled, + &webhook.TenantID, &webhook.CreatedAt, &webhook.UpdatedAt, &dbEventTypes, + &webhook.DisabledReason, &webhook.DisabledAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan webhook: %w", err) + } + + webhook.EventTypes, err = parseDBEventTypes(dbEventTypes) + if err != nil { + return nil, err + } + + webhooks = append(webhooks, webhook) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating webhooks: %w", err) + } + + return webhooks, nil +} + // Count returns the total count of webhooks matching the filters. func (r *WebhooksRepository) Count(ctx context.Context, filters *models.ListWebhooksFilters) (int64, error) { query := `SELECT COUNT(*) FROM webhooks` diff --git a/internal/service/feedback_records_service.go b/internal/service/feedback_records_service.go index a2784a3..2f62f40 100644 --- a/internal/service/feedback_records_service.go +++ b/internal/service/feedback_records_service.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/google/uuid" @@ -12,6 +13,7 @@ import ( "github.com/formbricks/hub/internal/datatypes" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // ErrUserIdentifierRequired is returned when bulk delete is called without user_identifier (err113). @@ -27,6 +29,10 @@ type FeedbackRecordsRepository interface { Create(ctx context.Context, req *models.CreateFeedbackRecordRequest) (*models.FeedbackRecord, error) GetByID(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error) List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) + ListAfterCursor( + ctx context.Context, filters *models.ListFeedbackRecordsFilters, + cursorCollectedAt time.Time, cursorID uuid.UUID, + ) ([]models.FeedbackRecord, error) Count(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (int64, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error) Delete(ctx context.Context, id uuid.UUID) error @@ -109,6 +115,7 @@ func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid. } // ListFeedbackRecords retrieves a list of feedback records with optional filters. +// When filters.Cursor is set, uses keyset pagination (no Count query); otherwise uses offset. func (s *FeedbackRecordsService) ListFeedbackRecords( ctx context.Context, filters *models.ListFeedbackRecordsFilters, ) (*models.ListFeedbackRecordsResponse, error) { @@ -117,6 +124,37 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( filters.Limit = 100 // Default limit } + cursorStr := strings.TrimSpace(filters.Cursor) + + if cursorStr != "" { + collectedAt, id, err := cursor.Decode(cursorStr) + if err != nil { + return nil, fmt.Errorf("decode cursor: %w", err) + } + + records, err := s.repo.ListAfterCursor(ctx, filters, collectedAt, id) + if err != nil { + return nil, fmt.Errorf("list feedback records after cursor: %w", err) + } + + var nextCursor string + + if len(records) == filters.Limit && len(records) > 0 { + last := records[len(records)-1] + + nextCursor, err = cursor.Encode(last.CollectedAt, last.ID) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) + } + } + + return &models.ListFeedbackRecordsResponse{ + Data: records, + Limit: filters.Limit, + NextCursor: nextCursor, + }, nil + } + records, err := s.repo.List(ctx, filters) if err != nil { return nil, fmt.Errorf("list feedback records: %w", err) @@ -127,11 +165,26 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( return nil, fmt.Errorf("count feedback records: %w", err) } + totalPtr := total + offsetVal := filters.Offset + + var nextCursor string + + if len(records) == filters.Limit && len(records) > 0 { + last := records[len(records)-1] + + nextCursor, err = cursor.Encode(last.CollectedAt, last.ID) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) + } + } + return &models.ListFeedbackRecordsResponse{ - Data: records, - Total: total, - Limit: filters.Limit, - Offset: filters.Offset, + Data: records, + Total: &totalPtr, + Limit: filters.Limit, + Offset: &offsetVal, + NextCursor: nextCursor, }, nil } diff --git a/internal/service/webhook_provider_test.go b/internal/service/webhook_provider_test.go index b3b330d..d14775a 100644 --- a/internal/service/webhook_provider_test.go +++ b/internal/service/webhook_provider_test.go @@ -64,6 +64,12 @@ func (m *mockProviderRepo) List(_ context.Context, _ *models.ListWebhooksFilters return nil, errors.New("not implemented") } +func (m *mockProviderRepo) ListAfterCursor( + _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, +) ([]models.Webhook, error) { + return nil, errors.New("not implemented") +} + func (m *mockProviderRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { return 0, errors.New("not implemented") } diff --git a/internal/service/webhook_sender_test.go b/internal/service/webhook_sender_test.go index e6d8aef..7de0f9b 100644 --- a/internal/service/webhook_sender_test.go +++ b/internal/service/webhook_sender_test.go @@ -36,6 +36,12 @@ func (m *mockSenderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) return nil, nil } +func (m *mockSenderRepo) ListAfterCursor( + _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, +) ([]models.Webhook, error) { + return nil, nil +} + func (m *mockSenderRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { return 0, nil } diff --git a/internal/service/webhooks_service.go b/internal/service/webhooks_service.go index 5a58d83..86488c2 100644 --- a/internal/service/webhooks_service.go +++ b/internal/service/webhooks_service.go @@ -5,6 +5,8 @@ import ( "crypto/rand" "encoding/base64" "fmt" + "strings" + "time" "github.com/google/uuid" standardwebhooks "github.com/standard-webhooks/standard-webhooks/libraries/go" @@ -12,6 +14,7 @@ import ( "github.com/formbricks/hub/internal/datatypes" "github.com/formbricks/hub/internal/huberrors" "github.com/formbricks/hub/internal/models" + "github.com/formbricks/hub/pkg/cursor" ) // WebhooksRepository defines the interface for webhooks data access. @@ -19,6 +22,10 @@ type WebhooksRepository interface { Create(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error) GetByID(ctx context.Context, id uuid.UUID) (*models.Webhook, error) List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, error) + ListAfterCursor( + ctx context.Context, filters *models.ListWebhooksFilters, + cursorCreatedAt time.Time, cursorID uuid.UUID, + ) ([]models.Webhook, error) Count(ctx context.Context, filters *models.ListWebhooksFilters) (int64, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error) Delete(ctx context.Context, id uuid.UUID) error @@ -114,11 +121,43 @@ func (s *WebhooksService) GetWebhook(ctx context.Context, id uuid.UUID) (*models } // ListWebhooks retrieves a list of webhooks with optional filters. +// When filters.Cursor is set, uses keyset pagination (no Count query); otherwise uses offset. func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.ListWebhooksFilters) (*models.ListWebhooksResponse, error) { if filters.Limit <= 0 { filters.Limit = 100 } + cursorStr := strings.TrimSpace(filters.Cursor) + + if cursorStr != "" { + createdAt, id, err := cursor.Decode(cursorStr) + if err != nil { + return nil, fmt.Errorf("decode cursor: %w", err) + } + + webhooks, err := s.repo.ListAfterCursor(ctx, filters, createdAt, id) + if err != nil { + return nil, fmt.Errorf("list webhooks after cursor: %w", err) + } + + var nextCursor string + + if len(webhooks) == filters.Limit && len(webhooks) > 0 { + last := webhooks[len(webhooks)-1] + + nextCursor, err = cursor.Encode(last.CreatedAt, last.ID) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) + } + } + + return &models.ListWebhooksResponse{ + Data: webhooks, + Limit: filters.Limit, + NextCursor: nextCursor, + }, nil + } + webhooks, err := s.repo.List(ctx, filters) if err != nil { return nil, fmt.Errorf("list webhooks: %w", err) @@ -129,11 +168,26 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List return nil, fmt.Errorf("count webhooks: %w", err) } + totalPtr := total + offsetVal := filters.Offset + + var nextCursor string + + if len(webhooks) == filters.Limit && len(webhooks) > 0 { + last := webhooks[len(webhooks)-1] + + nextCursor, err = cursor.Encode(last.CreatedAt, last.ID) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) + } + } + return &models.ListWebhooksResponse{ - Data: webhooks, - Total: total, - Limit: filters.Limit, - Offset: filters.Offset, + Data: webhooks, + Total: &totalPtr, + Limit: filters.Limit, + Offset: &offsetVal, + NextCursor: nextCursor, }, nil } diff --git a/internal/service/webhooks_service_test.go b/internal/service/webhooks_service_test.go index 1bb126d..3c4fdcf 100644 --- a/internal/service/webhooks_service_test.go +++ b/internal/service/webhooks_service_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/google/uuid" @@ -28,6 +29,12 @@ func (m *mockWebhooksRepo) List(_ context.Context, _ *models.ListWebhooksFilters return nil, nil } +func (m *mockWebhooksRepo) ListAfterCursor( + _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, +) ([]models.Webhook, error) { + return nil, nil +} + func (m *mockWebhooksRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { return m.count, nil } diff --git a/migrations/006_list_keyset_indexes.sql b/migrations/006_list_keyset_indexes.sql new file mode 100644 index 0000000..7efe2e4 --- /dev/null +++ b/migrations/006_list_keyset_indexes.sql @@ -0,0 +1,21 @@ +-- +goose NO TRANSACTION +-- +goose Up +-- Indexes for optimal keyset (cursor) pagination on list endpoints. +-- CONCURRENTLY avoids blocking writes on large tables (requires NO TRANSACTION). +-- +-- feedback_records: list/ListAfterCursor ORDER BY collected_at DESC, id ASC with tenant_id filter. +-- Replaces idx_feedback_records_tenant_collected_at with id for tie-break; drop the older one. +DROP INDEX CONCURRENTLY IF EXISTS idx_feedback_records_tenant_collected_at; +CREATE INDEX CONCURRENTLY idx_feedback_records_tenant_collected_at_id + ON feedback_records (tenant_id, collected_at DESC, id); + +-- webhooks: list/ListAfterCursor ORDER BY created_at DESC, id ASC with tenant_id filter. +CREATE INDEX CONCURRENTLY idx_webhooks_tenant_created_at_id + ON webhooks (tenant_id, created_at DESC, id); + +-- +goose Down +DROP INDEX CONCURRENTLY IF EXISTS idx_webhooks_tenant_created_at_id; + +DROP INDEX CONCURRENTLY IF EXISTS idx_feedback_records_tenant_collected_at_id; +CREATE INDEX CONCURRENTLY idx_feedback_records_tenant_collected_at + ON feedback_records (tenant_id, collected_at); diff --git a/openapi.yaml b/openapi.yaml index bf9d2cd..11a3528 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -147,7 +147,7 @@ paths: maximum: 1000 - name: offset in: query - description: Number of results to skip + description: Number of results to skip (offset-based paging). Ignored when cursor is set. schema: type: integer description: Number of results to skip @@ -155,6 +155,14 @@ paths: default: 0 minimum: 0 maximum: 2147483647 + - name: cursor + in: query + description: | + Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. + Opaque (base64-encoded); when set, offset is ignored and keyset pagination is used. + schema: + type: string + example: "eyJ0IjoiMjAyNC0wMS0xNVQxMDozMDowMFoiLCJpIjoiMDE4ZTEyMzQtNTY3OC05YWJjLWRlZjAtMTIzNDU2Nzg5YWJjIn0=" responses: "200": description: OK @@ -768,13 +776,20 @@ paths: maximum: 1000 - name: offset in: query - description: Number of results to skip + description: Number of results to skip (offset-based paging). Ignored when cursor is set. schema: type: integer format: int64 default: 0 minimum: 0 maximum: 2147483647 + - name: cursor + in: query + description: | + Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. + Opaque (base64-encoded); when set, offset is ignored and keyset pagination is used. + schema: + type: string responses: "200": description: OK @@ -1314,17 +1329,18 @@ components: format: int64 offset: type: integer - description: Offset used in query + description: Offset used in query (present when offset-based paging; omitted when using cursor) format: int64 total: type: integer - description: Total count of feedback records matching filters + description: Total count of feedback records matching filters (present when offset-based; omitted when using cursor to avoid extra COUNT query) format: int64 + next_cursor: + type: string + description: Opaque cursor for the next page (keyset paging). Present only when there may be more results. Use as the cursor query param for the next page. required: - data - - total - limit - - offset CreateWebhookInputBody: type: object additionalProperties: false @@ -1372,7 +1388,7 @@ components: $ref: '#/components/schemas/WebhookData' total: type: integer - description: Total count matching filters + description: Total count matching filters (present when offset-based; omitted when using cursor) format: int64 limit: type: integer @@ -1380,13 +1396,14 @@ components: format: int64 offset: type: integer - description: Offset used in query + description: Offset used in query (present when offset-based; omitted when using cursor) format: int64 + next_cursor: + type: string + description: Opaque cursor for the next page (keyset paging). Present only when there may be more results. Use as the cursor query param for the next page. required: - data - - total - limit - - offset UpdateWebhookInputBody: type: object additionalProperties: false diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go new file mode 100644 index 0000000..f9a53b6 --- /dev/null +++ b/pkg/cursor/cursor.go @@ -0,0 +1,63 @@ +// Package cursor provides encode/decode for keyset pagination cursors used by list endpoints. +// List endpoints (feedback records, webhooks) use (time.Time, uuid.UUID) as the keyset; +// search endpoints use a different format (see internal/service/search_cursor.go). +package cursor + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/google/uuid" +) + +// ErrInvalidCursor is returned when the cursor parameter is malformed or invalid. +var ErrInvalidCursor = errors.New("invalid cursor") + +type listCursorPayload struct { + T string `json:"t"` // RFC3339 timestamp (collected_at or created_at) + I string `json:"i"` // entity ID (UUID string) +} + +// Encode encodes a list cursor from the last row's timestamp and ID. +// Used for keyset pagination on ORDER BY timestamp DESC, id ASC. +func Encode(ts time.Time, id uuid.UUID) (string, error) { + b, err := json.Marshal(listCursorPayload{T: ts.UTC().Format(time.RFC3339Nano), I: id.String()}) + if err != nil { + return "", fmt.Errorf("encode list cursor: %w", err) + } + + return base64.URLEncoding.EncodeToString(b), nil +} + +// Decode parses a list cursor and returns (timestamp, id). +// Returns ErrInvalidCursor if the cursor is malformed. +func Decode(cursor string) (time.Time, uuid.UUID, error) { + if cursor == "" { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + raw, err := base64.URLEncoding.DecodeString(cursor) + if err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + var p listCursorPayload + if err := json.Unmarshal(raw, &p); err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + timestamp, err := time.Parse(time.RFC3339Nano, p.T) + if err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + id, err := uuid.Parse(p.I) + if err != nil { + return time.Time{}, uuid.Nil, ErrInvalidCursor + } + + return timestamp, id, nil +} diff --git a/tests/integration_test.go b/tests/integration_test.go index d77c10a..425a628 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "os" "testing" @@ -397,6 +398,78 @@ func TestListFeedbackRecords(t *testing.T) { assert.Equal(t, "formbricks", exp.SourceType) } }) + + // Test cursor pagination + t.Run("Cursor pagination", func(t *testing.T) { + tenantID := "tenant-cursor-test" + // Create 3 records for pagination + for i := range 3 { + body, _ := json.Marshal(map[string]any{ + "source_type": "formbricks", + "submission_id": uuid.New().String(), + "tenant_id": tenantID, + "field_id": "q1", + "field_type": "text", + "value_text": fmt.Sprintf("record %d", i), + }) + req, _ := http.NewRequestWithContext(context.Background(), http.MethodPost, server.URL+"/v1/feedback-records", bytes.NewBuffer(body)) + req.Header.Set("Authorization", "Bearer "+testAPIKey) + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + } + + // First page (limit=2) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + server.URL+"/v1/feedback-records?tenant_id="+tenantID+"&limit=2", http.NoBody) + require.NoError(t, err) + req.Header.Set("Authorization", "Bearer "+testAPIKey) + resp, err := client.Do(req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var page1 models.ListFeedbackRecordsResponse + + err = decodeData(resp, &page1) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + assert.Len(t, page1.Data, 2) + assert.NotEmpty(t, page1.NextCursor) + + // Second page using cursor (URL-encode cursor since it may contain = padding) + listURL := fmt.Sprintf("%s/v1/feedback-records?tenant_id=%s&limit=2&cursor=%s", + server.URL, url.QueryEscape(tenantID), url.QueryEscape(page1.NextCursor)) + req2, err := http.NewRequestWithContext(context.Background(), http.MethodGet, listURL, http.NoBody) + require.NoError(t, err) + req2.Header.Set("Authorization", "Bearer "+testAPIKey) + resp2, err := client.Do(req2) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp2.StatusCode) + + var page2 models.ListFeedbackRecordsResponse + + err = decodeData(resp2, &page2) + require.NoError(t, err) + require.NoError(t, resp2.Body.Close()) + + assert.GreaterOrEqual(t, len(page2.Data), 1) + // Cursor path omits total and offset + assert.Nil(t, page2.Total) + assert.Nil(t, page2.Offset) + + // Invalid cursor returns 400 + req3, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + server.URL+"/v1/feedback-records?tenant_id="+tenantID+"&cursor=invalid", http.NoBody) + require.NoError(t, err) + req3.Header.Set("Authorization", "Bearer "+testAPIKey) + resp3, err := client.Do(req3) + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, resp3.StatusCode) + require.NoError(t, resp3.Body.Close()) + }) } func TestFeedbackRecordsSubmissionID(t *testing.T) { @@ -1027,9 +1100,22 @@ func TestWebhooksCRUD(t *testing.T) { err = decodeData(listResp, &listResult) require.NoError(t, err) require.NoError(t, listResp.Body.Close()) - assert.GreaterOrEqual(t, listResult.Total, int64(1)) + require.NotNil(t, listResult.Total) + assert.GreaterOrEqual(t, *listResult.Total, int64(1)) assert.GreaterOrEqual(t, len(listResult.Data), 1) + // Test invalid cursor returns 400 + invalidCursorReq, err := http.NewRequestWithContext( + context.Background(), http.MethodGet, + server.URL+"/v1/webhooks?cursor=invalid", http.NoBody, + ) + require.NoError(t, err) + invalidCursorReq.Header.Set("Authorization", "Bearer "+testAPIKey) + invalidCursorResp, err := client.Do(invalidCursorReq) + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, invalidCursorResp.StatusCode) + require.NoError(t, invalidCursorResp.Body.Close()) + // Update webhook (including tenant_id) updateBody := map[string]any{ "url": "https://example.com/webhook-v2", From 431ee00568ed1ab8b57ba07dfa2d74777856226d Mon Sep 17 00:00:00 2001 From: Tiago Farto Date: Wed, 4 Mar 2026 15:11:31 +0200 Subject: [PATCH 2/6] chore: refactoring --- .../repository/feedback_records_repository.go | 116 +++++++----------- internal/service/feedback_records_service.go | 41 +++---- internal/service/pagination.go | 36 ++++++ internal/service/webhooks_service.go | 41 +++---- openapi.yaml | 12 ++ 5 files changed, 129 insertions(+), 117 deletions(-) create mode 100644 internal/service/pagination.go diff --git a/internal/repository/feedback_records_repository.go b/internal/repository/feedback_records_repository.go index 7fad65a..19f087b 100644 --- a/internal/repository/feedback_records_repository.go +++ b/internal/repository/feedback_records_repository.go @@ -180,9 +180,7 @@ func buildFilterConditions(filters *models.ListFeedbackRecordsFilters) (whereCla return whereClause, args } -// List retrieves feedback records with optional filters. Embedding is not selected (API reads stay lean). -func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) { - query := ` +const feedbackRecordsListSelect = ` SELECT id, collected_at, created_at, updated_at, source_type, source_id, source_name, field_id, field_label, field_type, field_group_id, field_group_label, @@ -191,6 +189,10 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li FROM feedback_records ` +// List retrieves feedback records with optional filters. Embedding is not selected (API reads stay lean). +func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) { + query := feedbackRecordsListSelect + whereClause, args := buildFilterConditions(filters) query += whereClause argCount := len(args) + 1 @@ -210,36 +212,7 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li args = append(args, filters.Offset) } - rows, err := r.db.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to list feedback records: %w", err) - } - defer rows.Close() - - records := []models.FeedbackRecord{} // Initialize as empty slice, not nil - - for rows.Next() { - var record models.FeedbackRecord - - err := rows.Scan( - &record.ID, &record.CollectedAt, &record.CreatedAt, &record.UpdatedAt, - &record.SourceType, &record.SourceID, &record.SourceName, - &record.FieldID, &record.FieldLabel, &record.FieldType, &record.FieldGroupID, &record.FieldGroupLabel, - &record.ValueText, &record.ValueNumber, &record.ValueBoolean, &record.ValueDate, - &record.Metadata, &record.Language, &record.UserIdentifier, &record.TenantID, &record.SubmissionID, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan feedback record: %w", err) - } - - records = append(records, record) - } - - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating feedback records: %w", err) - } - - return records, nil + return r.fetchFeedbackRecords(ctx, query, args...) } // ListAfterCursor retrieves feedback records after the given keyset cursor (collected_at, id). @@ -247,14 +220,7 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li func (r *FeedbackRecordsRepository) ListAfterCursor( ctx context.Context, filters *models.ListFeedbackRecordsFilters, cursorCollectedAt time.Time, cursorID uuid.UUID, ) ([]models.FeedbackRecord, error) { - query := ` - SELECT id, collected_at, created_at, updated_at, - source_type, source_id, source_name, - field_id, field_label, field_type, field_group_id, field_group_label, - value_text, value_number, value_boolean, value_date, - metadata, language, user_identifier, tenant_id, submission_id - FROM feedback_records - ` + query := feedbackRecordsListSelect whereClause, args := buildFilterConditions(filters) query += whereClause @@ -281,36 +247,7 @@ func (r *FeedbackRecordsRepository) ListAfterCursor( args = append(args, filters.Limit) } - rows, err := r.db.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to list feedback records after cursor: %w", err) - } - defer rows.Close() - - records := []models.FeedbackRecord{} - - for rows.Next() { - var record models.FeedbackRecord - - err := rows.Scan( - &record.ID, &record.CollectedAt, &record.CreatedAt, &record.UpdatedAt, - &record.SourceType, &record.SourceID, &record.SourceName, - &record.FieldID, &record.FieldLabel, &record.FieldType, &record.FieldGroupID, &record.FieldGroupLabel, - &record.ValueText, &record.ValueNumber, &record.ValueBoolean, &record.ValueDate, - &record.Metadata, &record.Language, &record.UserIdentifier, &record.TenantID, &record.SubmissionID, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan feedback record: %w", err) - } - - records = append(records, record) - } - - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating feedback records: %w", err) - } - - return records, nil + return r.fetchFeedbackRecords(ctx, query, args...) } // Count returns the total count of feedback records matching the filters. @@ -491,3 +428,40 @@ func (r *FeedbackRecordsRepository) BulkDelete(ctx context.Context, userIdentifi return ids, nil } + +// fetchFeedbackRecords executes the given query and scans rows into FeedbackRecord slices. +// Used by List and ListAfterCursor to avoid duplicating SELECT/scan logic. +func (r *FeedbackRecordsRepository) fetchFeedbackRecords( + ctx context.Context, query string, args ...any, +) ([]models.FeedbackRecord, error) { + rows, err := r.db.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to list feedback records: %w", err) + } + defer rows.Close() + + records := []models.FeedbackRecord{} + + for rows.Next() { + var record models.FeedbackRecord + + err := rows.Scan( + &record.ID, &record.CollectedAt, &record.CreatedAt, &record.UpdatedAt, + &record.SourceType, &record.SourceID, &record.SourceName, + &record.FieldID, &record.FieldLabel, &record.FieldType, &record.FieldGroupID, &record.FieldGroupLabel, + &record.ValueText, &record.ValueNumber, &record.ValueBoolean, &record.ValueDate, + &record.Metadata, &record.Language, &record.UserIdentifier, &record.TenantID, &record.SubmissionID, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan feedback record: %w", err) + } + + records = append(records, record) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating feedback records: %w", err) + } + + return records, nil +} diff --git a/internal/service/feedback_records_service.go b/internal/service/feedback_records_service.go index 2f62f40..523c910 100644 --- a/internal/service/feedback_records_service.go +++ b/internal/service/feedback_records_service.go @@ -137,21 +137,21 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( return nil, fmt.Errorf("list feedback records after cursor: %w", err) } - var nextCursor string - - if len(records) == filters.Limit && len(records) > 0 { + meta, err := BuildListPaginationMeta(nil, 0, filters.Limit, len(records), func() (string, error) { last := records[len(records)-1] - nextCursor, err = cursor.Encode(last.CollectedAt, last.ID) - if err != nil { - return nil, fmt.Errorf("encode next cursor: %w", err) - } + return cursor.Encode(last.CollectedAt, last.ID) + }) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) } return &models.ListFeedbackRecordsResponse{ Data: records, - Limit: filters.Limit, - NextCursor: nextCursor, + Total: meta.Total, + Limit: meta.Limit, + Offset: meta.Offset, + NextCursor: meta.NextCursor, }, nil } @@ -165,26 +165,21 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( return nil, fmt.Errorf("count feedback records: %w", err) } - totalPtr := total - offsetVal := filters.Offset - - var nextCursor string - - if len(records) == filters.Limit && len(records) > 0 { + meta, err := BuildListPaginationMeta(&total, filters.Offset, filters.Limit, len(records), func() (string, error) { last := records[len(records)-1] - nextCursor, err = cursor.Encode(last.CollectedAt, last.ID) - if err != nil { - return nil, fmt.Errorf("encode next cursor: %w", err) - } + return cursor.Encode(last.CollectedAt, last.ID) + }) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) } return &models.ListFeedbackRecordsResponse{ Data: records, - Total: &totalPtr, - Limit: filters.Limit, - Offset: &offsetVal, - NextCursor: nextCursor, + Total: meta.Total, + Limit: meta.Limit, + Offset: meta.Offset, + NextCursor: meta.NextCursor, }, nil } diff --git a/internal/service/pagination.go b/internal/service/pagination.go new file mode 100644 index 0000000..fb877c7 --- /dev/null +++ b/internal/service/pagination.go @@ -0,0 +1,36 @@ +package service + +// ListPaginationMeta holds pagination metadata for list endpoints (feedback records, webhooks). +type ListPaginationMeta struct { + Total *int64 + Offset *int + Limit int + NextCursor string +} + +// BuildListPaginationMeta builds pagination metadata for list responses. +// When total is nil (cursor-based path), Total and Offset are nil. +// encodeLast is called only when there may be more results (recordCount == limit && recordCount > 0). +func BuildListPaginationMeta( + total *int64, offset, limit, recordCount int, encodeLast func() (string, error), +) (ListPaginationMeta, error) { + meta := ListPaginationMeta{Limit: limit} + + if total != nil { + t := *total + meta.Total = &t + o := offset + meta.Offset = &o + } + + if recordCount == limit && recordCount > 0 && encodeLast != nil { + next, err := encodeLast() + if err != nil { + return meta, err + } + + meta.NextCursor = next + } + + return meta, nil +} diff --git a/internal/service/webhooks_service.go b/internal/service/webhooks_service.go index 86488c2..22ca441 100644 --- a/internal/service/webhooks_service.go +++ b/internal/service/webhooks_service.go @@ -140,21 +140,21 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List return nil, fmt.Errorf("list webhooks after cursor: %w", err) } - var nextCursor string - - if len(webhooks) == filters.Limit && len(webhooks) > 0 { + meta, err := BuildListPaginationMeta(nil, 0, filters.Limit, len(webhooks), func() (string, error) { last := webhooks[len(webhooks)-1] - nextCursor, err = cursor.Encode(last.CreatedAt, last.ID) - if err != nil { - return nil, fmt.Errorf("encode next cursor: %w", err) - } + return cursor.Encode(last.CreatedAt, last.ID) + }) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) } return &models.ListWebhooksResponse{ Data: webhooks, - Limit: filters.Limit, - NextCursor: nextCursor, + Total: meta.Total, + Limit: meta.Limit, + Offset: meta.Offset, + NextCursor: meta.NextCursor, }, nil } @@ -168,26 +168,21 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List return nil, fmt.Errorf("count webhooks: %w", err) } - totalPtr := total - offsetVal := filters.Offset - - var nextCursor string - - if len(webhooks) == filters.Limit && len(webhooks) > 0 { + meta, err := BuildListPaginationMeta(&total, filters.Offset, filters.Limit, len(webhooks), func() (string, error) { last := webhooks[len(webhooks)-1] - nextCursor, err = cursor.Encode(last.CreatedAt, last.ID) - if err != nil { - return nil, fmt.Errorf("encode next cursor: %w", err) - } + return cursor.Encode(last.CreatedAt, last.ID) + }) + if err != nil { + return nil, fmt.Errorf("encode next cursor: %w", err) } return &models.ListWebhooksResponse{ Data: webhooks, - Total: &totalPtr, - Limit: filters.Limit, - Offset: &offsetVal, - NextCursor: nextCursor, + Total: meta.Total, + Limit: meta.Limit, + Offset: meta.Offset, + NextCursor: meta.NextCursor, }, nil } diff --git a/openapi.yaml b/openapi.yaml index 11a3528..3d0a71a 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -191,6 +191,12 @@ paths: total: 1 limit: 100 offset: 0 + "400": + description: Bad Request (e.g. invalid cursor) + content: + application/problem+json: + schema: + $ref: '#/components/schemas/ErrorModel' default: description: Error content: @@ -797,6 +803,12 @@ paths: application/json: schema: $ref: '#/components/schemas/ListWebhooksOutputBody' + "400": + description: Bad Request (e.g. invalid cursor) + content: + application/problem+json: + schema: + $ref: '#/components/schemas/ErrorModel' default: description: Error content: From dfa49843f9dbf3fddc24b5628f7cb92cab16578c Mon Sep 17 00:00:00 2001 From: Tiago Farto Date: Wed, 4 Mar 2026 15:58:14 +0200 Subject: [PATCH 3/6] chore: removed offset style pagination --- internal/api/handlers/search_handler.go | 43 ++----------- internal/api/handlers/search_handler_test.go | 24 ++++---- internal/models/feedback_records.go | 5 +- internal/models/webhooks.go | 5 +- internal/repository/embeddings_repository.go | 14 ++--- .../repository/feedback_records_repository.go | 24 -------- internal/repository/webhooks_repository.go | 7 --- internal/service/feedback_records_service.go | 51 +++++----------- internal/service/pagination.go | 14 +---- internal/service/search_service.go | 17 +++--- internal/service/search_service_test.go | 26 ++++---- internal/service/webhooks_service.go | 47 ++++---------- openapi.yaml | 61 ++----------------- tests/integration_test.go | 5 -- 14 files changed, 75 insertions(+), 268 deletions(-) diff --git a/internal/api/handlers/search_handler.go b/internal/api/handlers/search_handler.go index 4b7ba66..7597d70 100644 --- a/internal/api/handlers/search_handler.go +++ b/internal/api/handlers/search_handler.go @@ -18,9 +18,9 @@ import ( // SearchService defines the interface for semantic search and similar feedback. type SearchService interface { - SemanticSearch(ctx context.Context, query, tenantID string, limit, offset int, minScore float64, cursor string) ( + SemanticSearch(ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string) ( service.SearchResult, error) - SimilarFeedback(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, + SimilarFeedback(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string) (service.SearchResult, error) } @@ -55,14 +55,7 @@ type SemanticSearchResultItem struct { ValueText string `json:"value_text"` // value_text of the feedback record (the text that was embedded) } -// maxSearchOffset caps how far paging can go. With OFFSET-based paging the database -// still computes and discards all rows before the offset, so large offsets (e.g. 5000) -// make queries slow. Clamping keeps latency predictable and discourages deep paging. -// To support deeper paging without this limit, switch to cursor-based (keyset) pagination: -// return a cursor (e.g. last score + last feedback_record_id), and query WHERE (score, id) after cursor -// instead of OFFSET. const ( - maxSearchOffset = 1000 defaultSearchLimit = 10 maxSearchLimit = 100 ) @@ -99,17 +92,10 @@ func (h *SearchHandler) SemanticSearch(w http.ResponseWriter, r *http.Request) { } limit := parseLimit(r.URL.Query().Get("limit"), defaultSearchLimit, maxSearchLimit) - cursor := strings.TrimSpace(r.URL.Query().Get("cursor")) - - offset := 0 - if cursor == "" { - offset = min(parseOffset(r.URL.Query().Get("offset")), maxSearchOffset) - } - minScore := parseMinScore(r.URL.Query().Get("min_score")) - res, err := h.service.SemanticSearch(r.Context(), req.Query, req.TenantID, limit, offset, minScore, cursor) + res, err := h.service.SemanticSearch(r.Context(), req.Query, req.TenantID, limit, minScore, cursor) if err != nil { if errors.Is(err, service.ErrMissingTenantID) { response.RespondBadRequest(w, "tenant_id is required") @@ -177,17 +163,10 @@ func (h *SearchHandler) SimilarFeedback(w http.ResponseWriter, r *http.Request) } limit := parseLimit(r.URL.Query().Get("limit"), defaultSearchLimit, maxSearchLimit) - cursor := strings.TrimSpace(r.URL.Query().Get("cursor")) - - offset := 0 - if cursor == "" { - offset = min(parseOffset(r.URL.Query().Get("offset")), maxSearchOffset) - } - minScore := parseMinScore(r.URL.Query().Get("min_score")) - res, err := h.service.SimilarFeedback(r.Context(), id, tenantID, limit, offset, minScore, cursor) + res, err := h.service.SimilarFeedback(r.Context(), id, tenantID, limit, minScore, cursor) if err != nil { if errors.Is(err, service.ErrEmbeddingNotFound) { response.RespondNotFound(w, "Feedback record has no embedding for the current model") @@ -233,20 +212,6 @@ func parseLimit(s string, def, upperBound int) int { return min(n, upperBound) } -// parseOffset returns the query param "offset" as a non-negative int; default 0. -func parseOffset(s string) int { - if s == "" { - return 0 - } - - n, err := strconv.Atoi(s) - if err != nil || n < 0 { - return 0 - } - - return n -} - // defaultMinScore is the default minimum similarity score when the query param is omitted (reduces noise). const defaultMinScore = 0.7 diff --git a/internal/api/handlers/search_handler_test.go b/internal/api/handlers/search_handler_test.go index efd54d9..9f228e9 100644 --- a/internal/api/handlers/search_handler_test.go +++ b/internal/api/handlers/search_handler_test.go @@ -17,27 +17,27 @@ import ( ) type mockSearchService struct { - semanticFunc func(ctx context.Context, query, tenantID string, limit, offset int, minScore float64, + semanticFunc func(ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string) (service.SearchResult, error) - similarFunc func(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, + similarFunc func(ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string) (service.SearchResult, error) } func (m *mockSearchService) SemanticSearch( - ctx context.Context, query, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { if m.semanticFunc != nil { - return m.semanticFunc(ctx, query, tenantID, limit, offset, minScore, cursor) + return m.semanticFunc(ctx, query, tenantID, limit, minScore, cursor) } return service.SearchResult{}, nil } func (m *mockSearchService) SimilarFeedback( - ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { if m.similarFunc != nil { - return m.similarFunc(ctx, feedbackRecordID, tenantID, limit, offset, minScore, cursor) + return m.similarFunc(ctx, feedbackRecordID, tenantID, limit, minScore, cursor) } return service.SearchResult{}, nil @@ -60,7 +60,7 @@ func TestSearchHandler_SemanticSearch(t *testing.T) { t.Run("empty query returns 400", func(t *testing.T) { called := false mock := &mockSearchService{ - semanticFunc: func(_ context.Context, _, _ string, _, _ int, _ float64, _ string) (service.SearchResult, error) { + semanticFunc: func(_ context.Context, _, _ string, _ int, _ float64, _ string) (service.SearchResult, error) { called = true return service.SearchResult{}, service.ErrEmptyQuery @@ -85,13 +85,12 @@ func TestSearchHandler_SemanticSearch(t *testing.T) { val1 := "Login is very slow." val2 := "Dashboard loads fast." mock := &mockSearchService{ - semanticFunc: func(_ context.Context, query, tenantID string, limit, offset int, minScore float64, + semanticFunc: func(_ context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { assert.Equal(t, "login is slow", query) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 0, offset) assert.InDelta(t, 0.7, minScore, 1e-9) assert.Empty(t, cursor) @@ -132,7 +131,7 @@ func TestSearchHandler_SemanticSearch(t *testing.T) { t.Run("invalid cursor returns 400", func(t *testing.T) { mock := &mockSearchService{ - semanticFunc: func(_ context.Context, _, _ string, _, _ int, _ float64, cursor string) (service.SearchResult, error) { + semanticFunc: func(_ context.Context, _, _ string, _ int, _ float64, cursor string) (service.SearchResult, error) { if cursor != "" { return service.SearchResult{}, service.ErrInvalidCursor } @@ -171,7 +170,7 @@ func TestSearchHandler_SimilarFeedback(t *testing.T) { t.Run("embedding not found returns 404", func(t *testing.T) { mock := &mockSearchService{ - similarFunc: func(_ context.Context, _ uuid.UUID, _ string, _, _ int, _ float64, _ string) (service.SearchResult, error) { + similarFunc: func(_ context.Context, _ uuid.UUID, _ string, _ int, _ float64, _ string) (service.SearchResult, error) { return service.SearchResult{}, service.ErrEmbeddingNotFound }, } @@ -191,13 +190,12 @@ func TestSearchHandler_SimilarFeedback(t *testing.T) { similarID := uuid.MustParse("018e1234-5678-9abc-def0-aaaaaaaaaaaa") similarVal := "Similar feedback text." mock := &mockSearchService{ - similarFunc: func(_ context.Context, fid uuid.UUID, tenantID string, limit, offset int, minScore float64, + similarFunc: func(_ context.Context, fid uuid.UUID, tenantID string, limit int, minScore float64, cursor string, ) (service.SearchResult, error) { assert.Equal(t, id, fid) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 0, offset) assert.InDelta(t, 0.7, minScore, 1e-9) assert.Empty(t, cursor) diff --git a/internal/models/feedback_records.go b/internal/models/feedback_records.go index 41b5de9..f3c7fbb 100644 --- a/internal/models/feedback_records.go +++ b/internal/models/feedback_records.go @@ -186,16 +186,13 @@ type ListFeedbackRecordsFilters struct { Since *time.Time `form:"since" validate:"omitempty"` Until *time.Time `form:"until" validate:"omitempty"` Limit int `form:"limit" validate:"omitempty,min=1,max=1000"` - Offset int `form:"offset" validate:"omitempty,min=0"` - Cursor string `form:"cursor" validate:"omitempty"` // keyset cursor; when set, offset is ignored + Cursor string `form:"cursor" validate:"omitempty"` // keyset; omit for first page, use next_cursor for next } // ListFeedbackRecordsResponse represents the response for listing feedback records. type ListFeedbackRecordsResponse struct { Data []FeedbackRecord `json:"data"` - Total *int64 `json:"total,omitempty"` // set when offset-based; omitted when using cursor (avoids extra COUNT) Limit int `json:"limit"` - Offset *int `json:"offset,omitempty"` // set when offset-based; omitted when using cursor NextCursor string `json:"next_cursor,omitempty"` // present when there may be more results } diff --git a/internal/models/webhooks.go b/internal/models/webhooks.go index fc99e23..06594a2 100644 --- a/internal/models/webhooks.go +++ b/internal/models/webhooks.go @@ -198,15 +198,12 @@ type ListWebhooksFilters struct { Enabled *bool `form:"enabled"` TenantID *string `form:"tenant_id" validate:"omitempty,no_null_bytes"` Limit int `form:"limit" validate:"omitempty,min=1,max=1000"` - Offset int `form:"offset" validate:"omitempty,min=0"` - Cursor string `form:"cursor" validate:"omitempty"` // keyset cursor; when set, offset is ignored + Cursor string `form:"cursor" validate:"omitempty"` // keyset cursor; omit for first page, use next_cursor for subsequent pages } // ListWebhooksResponse represents the response for listing webhooks. type ListWebhooksResponse struct { Data []Webhook `json:"data"` - Total *int64 `json:"total,omitempty"` // set when offset-based; omitted when using cursor (avoids extra COUNT) Limit int `json:"limit"` - Offset *int `json:"offset,omitempty"` // set when offset-based; omitted when using cursor NextCursor string `json:"next_cursor,omitempty"` // present when there may be more results } diff --git a/internal/repository/embeddings_repository.go b/internal/repository/embeddings_repository.go index e9e8561..006c252 100644 --- a/internal/repository/embeddings_repository.go +++ b/internal/repository/embeddings_repository.go @@ -169,22 +169,18 @@ func (r *EmbeddingsRepository) GetEmbeddingByFeedbackRecordAndModelAndTenant( // filtered in application code (not in WHERE) so pgvector's iterative index scan can run. Uses // full-precision query vector (no quantization); sets hnsw.ef_search for better recall. Over-fetches // then trims to limit to account for tenant/minScore filtering. excludeID optionally excludes one -// feedback record (e.g. for "similar" endpoint). offset is the number of rows to skip (for paging). +// feedback record (e.g. for "similar" endpoint). First page only; use NearestFeedbackRecordsByEmbeddingAfterCursor for next pages. func (r *EmbeddingsRepository) NearestFeedbackRecordsByEmbedding( - ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { if len(queryEmbedding) != models.EmbeddingVectorDimensions { return nil, false, fmt.Errorf("%w: got %d, want %d", ErrEmbeddingDimensionMismatch, len(queryEmbedding), models.EmbeddingVectorDimensions) } - if offset < 0 { - offset = 0 - } - // Full-precision query vector (ephemeral); pgvector compares vector vs halfvec natively. queryVec := pgvector.NewVector(queryEmbedding) - fetchLimit := min(limit*nearestOverFetchFactor+offset, maxNearestFetchLimit) + fetchLimit := min(limit*nearestOverFetchFactor, maxNearestFetchLimit) dbTx, err := r.db.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -210,7 +206,7 @@ func (r *EmbeddingsRepository) NearestFeedbackRecordsByEmbedding( INNER JOIN feedback_records fr ON fr.id = e.feedback_record_id WHERE e.model = $2 AND fr.tenant_id = $3 ORDER BY (e.embedding <=> $1), e.feedback_record_id - LIMIT $4 OFFSET $5`, queryVec, model, tenantID, fetchLimit, offset) + LIMIT $4`, queryVec, model, tenantID, fetchLimit) } else { rows, err = dbTx.Query(ctx, ` SELECT e.feedback_record_id, (1 - (e.embedding <=> $1)) AS score, COALESCE(fr.field_label, ''), fr.value_text @@ -218,7 +214,7 @@ func (r *EmbeddingsRepository) NearestFeedbackRecordsByEmbedding( INNER JOIN feedback_records fr ON fr.id = e.feedback_record_id WHERE e.model = $2 AND fr.tenant_id = $3 AND e.feedback_record_id != $4 ORDER BY (e.embedding <=> $1), e.feedback_record_id - LIMIT $5 OFFSET $6`, queryVec, model, tenantID, *excludeID, fetchLimit, offset) + LIMIT $5`, queryVec, model, tenantID, *excludeID, fetchLimit) } if err != nil { diff --git a/internal/repository/feedback_records_repository.go b/internal/repository/feedback_records_repository.go index 19f087b..d16f79d 100644 --- a/internal/repository/feedback_records_repository.go +++ b/internal/repository/feedback_records_repository.go @@ -203,13 +203,6 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li query += fmt.Sprintf(" LIMIT $%d", argCount) args = append(args, filters.Limit) - argCount++ - } - - if filters.Offset > 0 { - query += fmt.Sprintf(" OFFSET $%d", argCount) - - args = append(args, filters.Offset) } return r.fetchFeedbackRecords(ctx, query, args...) @@ -250,23 +243,6 @@ func (r *FeedbackRecordsRepository) ListAfterCursor( return r.fetchFeedbackRecords(ctx, query, args...) } -// Count returns the total count of feedback records matching the filters. -func (r *FeedbackRecordsRepository) Count(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (int64, error) { - query := `SELECT COUNT(*) FROM feedback_records` - - whereClause, args := buildFilterConditions(filters) - query += whereClause - - var count int64 - - err := r.db.QueryRow(ctx, query, args...).Scan(&count) - if err != nil { - return 0, fmt.Errorf("failed to count feedback records: %w", err) - } - - return count, nil -} - // buildUpdateQuery builds an UPDATE query with SET clause and arguments. // Returns the query string, arguments, and a boolean indicating if any updates were provided. func buildUpdateQuery( diff --git a/internal/repository/webhooks_repository.go b/internal/repository/webhooks_repository.go index d296d13..98a2235 100644 --- a/internal/repository/webhooks_repository.go +++ b/internal/repository/webhooks_repository.go @@ -144,13 +144,6 @@ func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebho query += fmt.Sprintf(" LIMIT $%d", argCount) args = append(args, filters.Limit) - argCount++ - } - - if filters.Offset > 0 { - query += fmt.Sprintf(" OFFSET $%d", argCount) - - args = append(args, filters.Offset) } rows, err := r.db.Query(ctx, query, args...) diff --git a/internal/service/feedback_records_service.go b/internal/service/feedback_records_service.go index 523c910..bc74f4d 100644 --- a/internal/service/feedback_records_service.go +++ b/internal/service/feedback_records_service.go @@ -33,7 +33,6 @@ type FeedbackRecordsRepository interface { ctx context.Context, filters *models.ListFeedbackRecordsFilters, cursorCollectedAt time.Time, cursorID uuid.UUID, ) ([]models.FeedbackRecord, error) - Count(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (int64, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error) Delete(ctx context.Context, id uuid.UUID) error BulkDelete(ctx context.Context, userIdentifier string, tenantID *string) ([]uuid.UUID, error) @@ -115,57 +114,37 @@ func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid. } // ListFeedbackRecords retrieves a list of feedback records with optional filters. -// When filters.Cursor is set, uses keyset pagination (no Count query); otherwise uses offset. +// Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages. func (s *FeedbackRecordsService) ListFeedbackRecords( ctx context.Context, filters *models.ListFeedbackRecordsFilters, ) (*models.ListFeedbackRecordsResponse, error) { - // Set default limit if not provided (validation ensures it's within bounds if provided) if filters.Limit <= 0 { - filters.Limit = 100 // Default limit + filters.Limit = 100 } cursorStr := strings.TrimSpace(filters.Cursor) - if cursorStr != "" { - collectedAt, id, err := cursor.Decode(cursorStr) - if err != nil { - return nil, fmt.Errorf("decode cursor: %w", err) - } - - records, err := s.repo.ListAfterCursor(ctx, filters, collectedAt, id) - if err != nil { - return nil, fmt.Errorf("list feedback records after cursor: %w", err) - } + var ( + records []models.FeedbackRecord + err error + ) - meta, err := BuildListPaginationMeta(nil, 0, filters.Limit, len(records), func() (string, error) { - last := records[len(records)-1] - - return cursor.Encode(last.CollectedAt, last.ID) - }) - if err != nil { - return nil, fmt.Errorf("encode next cursor: %w", err) + if cursorStr != "" { + collectedAt, id, decErr := cursor.Decode(cursorStr) + if decErr != nil { + return nil, fmt.Errorf("decode cursor: %w", decErr) } - return &models.ListFeedbackRecordsResponse{ - Data: records, - Total: meta.Total, - Limit: meta.Limit, - Offset: meta.Offset, - NextCursor: meta.NextCursor, - }, nil + records, err = s.repo.ListAfterCursor(ctx, filters, collectedAt, id) + } else { + records, err = s.repo.List(ctx, filters) } - records, err := s.repo.List(ctx, filters) if err != nil { return nil, fmt.Errorf("list feedback records: %w", err) } - total, err := s.repo.Count(ctx, filters) - if err != nil { - return nil, fmt.Errorf("count feedback records: %w", err) - } - - meta, err := BuildListPaginationMeta(&total, filters.Offset, filters.Limit, len(records), func() (string, error) { + meta, err := BuildListPaginationMeta(filters.Limit, len(records), func() (string, error) { last := records[len(records)-1] return cursor.Encode(last.CollectedAt, last.ID) @@ -176,9 +155,7 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( return &models.ListFeedbackRecordsResponse{ Data: records, - Total: meta.Total, Limit: meta.Limit, - Offset: meta.Offset, NextCursor: meta.NextCursor, }, nil } diff --git a/internal/service/pagination.go b/internal/service/pagination.go index fb877c7..61df04a 100644 --- a/internal/service/pagination.go +++ b/internal/service/pagination.go @@ -2,27 +2,17 @@ package service // ListPaginationMeta holds pagination metadata for list endpoints (feedback records, webhooks). type ListPaginationMeta struct { - Total *int64 - Offset *int Limit int NextCursor string } -// BuildListPaginationMeta builds pagination metadata for list responses. -// When total is nil (cursor-based path), Total and Offset are nil. +// BuildListPaginationMeta builds pagination metadata for cursor-based list responses. // encodeLast is called only when there may be more results (recordCount == limit && recordCount > 0). func BuildListPaginationMeta( - total *int64, offset, limit, recordCount int, encodeLast func() (string, error), + limit, recordCount int, encodeLast func() (string, error), ) (ListPaginationMeta, error) { meta := ListPaginationMeta{Limit: limit} - if total != nil { - t := *total - meta.Total = &t - o := offset - meta.Offset = &o - } - if recordCount == limit && recordCount > 0 && encodeLast != nil { next, err := encodeLast() if err != nil { diff --git a/internal/service/search_service.go b/internal/service/search_service.go index 6139f21..056c878 100644 --- a/internal/service/search_service.go +++ b/internal/service/search_service.go @@ -32,7 +32,7 @@ type EmbeddingsRepositoryForSearch interface { ctx context.Context, feedbackRecordID uuid.UUID, model, tenantID string, ) ([]float32, error) NearestFeedbackRecordsByEmbedding( - ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) NearestFeedbackRecordsByEmbeddingAfterCursor( ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, @@ -79,11 +79,10 @@ func NewSearchService(p SearchServiceParams) *SearchService { } // SemanticSearch returns feedback record IDs and similarity scores for the given query, scoped to tenantID. -// Requires non-empty tenantID and non-empty (after trim) query. If cursor is non-empty it is used for -// keyset paging (offset is ignored); otherwise offset is used. minScore is the minimum similarity score (0..1). -// NextCursor is set when there may be a next page (full page returned). +// Requires non-empty tenantID and non-empty (after trim) query. Uses cursor-based pagination. +// minScore is the minimum similarity score (0..1). NextCursor is set when there may be a next page. func (s *SearchService) SemanticSearch( - ctx context.Context, query, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (SearchResult, error) { out := SearchResult{} if tenantID == "" { @@ -126,7 +125,7 @@ func (s *SearchService) SemanticSearch( ctx, s.model, embedding, tenantID, limit, lastDistance, lastID, nil, minScore) } else { results, hasMore, err = s.embeddingsRepo.NearestFeedbackRecordsByEmbedding( - ctx, s.model, embedding, tenantID, limit, offset, nil, minScore) + ctx, s.model, embedding, tenantID, limit, nil, minScore) } if err != nil { @@ -152,9 +151,9 @@ func (s *SearchService) SemanticSearch( // SimilarFeedback returns feedback record IDs and similarity scores for records similar to the given one, scoped to tenantID. // Requires non-empty tenantID. Returns ErrEmbeddingNotFound when the record has no embedding for the current model. -// If cursor is non-empty it is used for keyset paging (offset is ignored); otherwise offset is used. +// Uses cursor-based pagination. func (s *SearchService) SimilarFeedback( - ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit, offset int, minScore float64, cursor string, + ctx context.Context, feedbackRecordID uuid.UUID, tenantID string, limit int, minScore float64, cursor string, ) (SearchResult, error) { out := SearchResult{} if tenantID == "" { @@ -191,7 +190,7 @@ func (s *SearchService) SimilarFeedback( ctx, s.model, embedding, tenantID, limit, lastDistance, lastID, &feedbackRecordID, minScore) } else { results, hasMore, err = s.embeddingsRepo.NearestFeedbackRecordsByEmbedding( - ctx, s.model, embedding, tenantID, limit, offset, &feedbackRecordID, minScore) + ctx, s.model, embedding, tenantID, limit, &feedbackRecordID, minScore) } if err != nil { diff --git a/internal/service/search_service_test.go b/internal/service/search_service_test.go index f0a5163..f270be7 100644 --- a/internal/service/search_service_test.go +++ b/internal/service/search_service_test.go @@ -38,7 +38,7 @@ type mockEmbeddingsRepoForSearch struct { getEmbeddingByTenantFunc func(ctx context.Context, feedbackRecordID uuid.UUID, model, tenantID string) ([]float32, error) nearestFunc func( ctx context.Context, model string, queryEmbedding []float32, - tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) nearestAfterFunc func( ctx context.Context, model string, queryEmbedding []float32, @@ -57,10 +57,10 @@ func (m *mockEmbeddingsRepoForSearch) GetEmbeddingByFeedbackRecordAndModelAndTen } func (m *mockEmbeddingsRepoForSearch) NearestFeedbackRecordsByEmbedding( - ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { if m.nearestFunc != nil { - return m.nearestFunc(ctx, model, queryEmbedding, tenantID, limit, offset, excludeID, minScore) + return m.nearestFunc(ctx, model, queryEmbedding, tenantID, limit, excludeID, minScore) } return nil, false, nil @@ -84,7 +84,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), "query", "", 10, 0, 0, "") + res, err := svc.SemanticSearch(context.Background(), "query", "", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, ErrMissingTenantID) }) @@ -95,7 +95,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), " ", "tenant-1", 10, 0, 0, "") + res, err := svc.SemanticSearch(context.Background(), " ", "tenant-1", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, ErrEmptyQuery) }) @@ -122,7 +122,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{ nearestFunc: func( _ context.Context, model string, queryEmbedding []float32, - tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { nearestCalled = true @@ -130,7 +130,6 @@ func TestSearchService_SemanticSearch(t *testing.T) { assert.Equal(t, []float32{0.1, 0.2}, queryEmbedding) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 2, offset) assert.Nil(t, excludeID) assert.InDelta(t, 0.5, minScore, 1e-9) @@ -141,7 +140,7 @@ func TestSearchService_SemanticSearch(t *testing.T) { }, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), "login slow", "env-1", 10, 2, 0.5, "") + res, err := svc.SemanticSearch(context.Background(), "login slow", "env-1", 10, 0.5, "") require.NoError(t, err) require.True(t, queryClientCalled) require.True(t, nearestCalled) @@ -158,7 +157,7 @@ func TestSearchService_SimilarFeedback(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SimilarFeedback(context.Background(), uuid.MustParse("018e1234-5678-9abc-def0-123456789abc"), "", 10, 0, 0, "") + res, err := svc.SimilarFeedback(context.Background(), uuid.MustParse("018e1234-5678-9abc-def0-123456789abc"), "", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, ErrMissingTenantID) }) @@ -177,7 +176,7 @@ func TestSearchService_SimilarFeedback(t *testing.T) { }, Model: "test-model", }) - res, err := svc.SimilarFeedback(context.Background(), rid, "env-1", 10, 0, 0, "") + res, err := svc.SimilarFeedback(context.Background(), rid, "env-1", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, repository.ErrEmbeddingNotFound) }) @@ -197,12 +196,11 @@ func TestSearchService_SimilarFeedback(t *testing.T) { }, nearestFunc: func( _ context.Context, model string, _ []float32, - tenantID string, limit, offset int, excludeID *uuid.UUID, minScore float64, + tenantID string, limit int, excludeID *uuid.UUID, minScore float64, ) ([]models.FeedbackRecordWithScore, bool, error) { assert.Equal(t, "test-model", model) assert.Equal(t, "env-1", tenantID) assert.Equal(t, 10, limit) - assert.Equal(t, 0, offset) require.NotNil(t, excludeID) assert.Equal(t, sourceID, *excludeID) assert.InDelta(t, 0.5, minScore, 1e-9) @@ -214,7 +212,7 @@ func TestSearchService_SimilarFeedback(t *testing.T) { }, Model: "test-model", }) - res, err := svc.SimilarFeedback(context.Background(), sourceID, "env-1", 10, 0, 0.5, "") + res, err := svc.SimilarFeedback(context.Background(), sourceID, "env-1", 10, 0.5, "") require.NoError(t, err) require.Len(t, res.Results, 1) assert.Equal(t, similarID, res.Results[0].FeedbackRecordID) @@ -238,7 +236,7 @@ func TestSearchService_SemanticSearch_EmbeddingError(t *testing.T) { EmbeddingsRepo: &mockEmbeddingsRepoForSearch{}, Model: "test-model", }) - res, err := svc.SemanticSearch(context.Background(), "query", "env-1", 10, 0, 0, "") + res, err := svc.SemanticSearch(context.Background(), "query", "env-1", 10, 0, "") assert.Empty(t, res.Results) assert.ErrorIs(t, err, embeddingErr) } diff --git a/internal/service/webhooks_service.go b/internal/service/webhooks_service.go index 22ca441..4069037 100644 --- a/internal/service/webhooks_service.go +++ b/internal/service/webhooks_service.go @@ -121,7 +121,7 @@ func (s *WebhooksService) GetWebhook(ctx context.Context, id uuid.UUID) (*models } // ListWebhooks retrieves a list of webhooks with optional filters. -// When filters.Cursor is set, uses keyset pagination (no Count query); otherwise uses offset. +// Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages. func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.ListWebhooksFilters) (*models.ListWebhooksResponse, error) { if filters.Limit <= 0 { filters.Limit = 100 @@ -129,46 +129,27 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List cursorStr := strings.TrimSpace(filters.Cursor) - if cursorStr != "" { - createdAt, id, err := cursor.Decode(cursorStr) - if err != nil { - return nil, fmt.Errorf("decode cursor: %w", err) - } - - webhooks, err := s.repo.ListAfterCursor(ctx, filters, createdAt, id) - if err != nil { - return nil, fmt.Errorf("list webhooks after cursor: %w", err) - } - - meta, err := BuildListPaginationMeta(nil, 0, filters.Limit, len(webhooks), func() (string, error) { - last := webhooks[len(webhooks)-1] + var ( + webhooks []models.Webhook + err error + ) - return cursor.Encode(last.CreatedAt, last.ID) - }) - if err != nil { - return nil, fmt.Errorf("encode next cursor: %w", err) + if cursorStr != "" { + createdAt, id, decErr := cursor.Decode(cursorStr) + if decErr != nil { + return nil, fmt.Errorf("decode cursor: %w", decErr) } - return &models.ListWebhooksResponse{ - Data: webhooks, - Total: meta.Total, - Limit: meta.Limit, - Offset: meta.Offset, - NextCursor: meta.NextCursor, - }, nil + webhooks, err = s.repo.ListAfterCursor(ctx, filters, createdAt, id) + } else { + webhooks, err = s.repo.List(ctx, filters) } - webhooks, err := s.repo.List(ctx, filters) if err != nil { return nil, fmt.Errorf("list webhooks: %w", err) } - total, err := s.repo.Count(ctx, filters) - if err != nil { - return nil, fmt.Errorf("count webhooks: %w", err) - } - - meta, err := BuildListPaginationMeta(&total, filters.Offset, filters.Limit, len(webhooks), func() (string, error) { + meta, err := BuildListPaginationMeta(filters.Limit, len(webhooks), func() (string, error) { last := webhooks[len(webhooks)-1] return cursor.Encode(last.CreatedAt, last.ID) @@ -179,9 +160,7 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List return &models.ListWebhooksResponse{ Data: webhooks, - Total: meta.Total, Limit: meta.Limit, - Offset: meta.Offset, NextCursor: meta.NextCursor, }, nil } diff --git a/openapi.yaml b/openapi.yaml index 3d0a71a..243d8cd 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -145,21 +145,11 @@ paths: default: 100 minimum: 1 maximum: 1000 - - name: offset - in: query - description: Number of results to skip (offset-based paging). Ignored when cursor is set. - schema: - type: integer - description: Number of results to skip - format: int64 - default: 0 - minimum: 0 - maximum: 2147483647 - name: cursor in: query description: | Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. - Opaque (base64-encoded); when set, offset is ignored and keyset pagination is used. + Opaque (base64-encoded); keyset pagination. schema: type: string example: "eyJ0IjoiMjAyNC0wMS0xNVQxMDozMDowMFoiLCJpIjoiMDE4ZTEyMzQtNTY3OC05YWJjLWRlZjAtMTIzNDU2Nzg5YWJjIn0=" @@ -188,9 +178,7 @@ paths: source_name: "Q1 NPS Survey" user_identifier: "user-abc-123" tenant_id: "org-123" - total: 1 limit: 100 - offset: 0 "400": description: Bad Request (e.g. invalid cursor) content: @@ -598,19 +586,11 @@ paths: default: 10 minimum: 1 maximum: 100 - - name: offset - in: query - description: Number of results to skip (OFFSET-based paging). Ignored if cursor is set. Capped at 1000 for performance. - schema: - type: integer - minimum: 0 - maximum: 1000 - default: 0 - name: cursor in: query description: | Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. - Opaque (base64-encoded); when set, offset is ignored and the next page after this cursor is returned. + Opaque (base64-encoded); keyset pagination. schema: type: string example: "eyJkIjowLjEsImkiOiIwMThlMTIzNC01Njc4LTlhYmMtZGVmMC0xMTExMTExMTExMTEifQ==" @@ -693,19 +673,11 @@ paths: default: 10 minimum: 1 maximum: 100 - - name: offset - in: query - description: Number of results to skip (OFFSET-based paging). Ignored if cursor is set. Capped at 1000 for performance. - schema: - type: integer - minimum: 0 - maximum: 1000 - default: 0 - name: cursor in: query description: | Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. - Opaque (base64-encoded); when set, offset is ignored and the next page after this cursor is returned. + Opaque (base64-encoded); keyset pagination. schema: type: string example: "eyJkIjowLjEsImkiOiIwMThlMTIzNC01Njc4LTlhYmMtZGVmMC0xMTExMTExMTExMTEifQ==" @@ -780,20 +752,11 @@ paths: default: 100 minimum: 1 maximum: 1000 - - name: offset - in: query - description: Number of results to skip (offset-based paging). Ignored when cursor is set. - schema: - type: integer - format: int64 - default: 0 - minimum: 0 - maximum: 2147483647 - name: cursor in: query description: | Omit for the first page. For the next page, use the exact value from the previous response's next_cursor. - Opaque (base64-encoded); when set, offset is ignored and keyset pagination is used. + Opaque (base64-encoded); keyset pagination. schema: type: string responses: @@ -1339,14 +1302,6 @@ components: type: integer description: Limit used in query format: int64 - offset: - type: integer - description: Offset used in query (present when offset-based paging; omitted when using cursor) - format: int64 - total: - type: integer - description: Total count of feedback records matching filters (present when offset-based; omitted when using cursor to avoid extra COUNT query) - format: int64 next_cursor: type: string description: Opaque cursor for the next page (keyset paging). Present only when there may be more results. Use as the cursor query param for the next page. @@ -1398,18 +1353,10 @@ components: description: List of webhooks items: $ref: '#/components/schemas/WebhookData' - total: - type: integer - description: Total count matching filters (present when offset-based; omitted when using cursor) - format: int64 limit: type: integer description: Limit used in query format: int64 - offset: - type: integer - description: Offset used in query (present when offset-based; omitted when using cursor) - format: int64 next_cursor: type: string description: Opaque cursor for the next page (keyset paging). Present only when there may be more results. Use as the cursor query param for the next page. diff --git a/tests/integration_test.go b/tests/integration_test.go index 425a628..c181099 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -456,9 +456,6 @@ func TestListFeedbackRecords(t *testing.T) { require.NoError(t, resp2.Body.Close()) assert.GreaterOrEqual(t, len(page2.Data), 1) - // Cursor path omits total and offset - assert.Nil(t, page2.Total) - assert.Nil(t, page2.Offset) // Invalid cursor returns 400 req3, err := http.NewRequestWithContext(context.Background(), http.MethodGet, @@ -1100,8 +1097,6 @@ func TestWebhooksCRUD(t *testing.T) { err = decodeData(listResp, &listResult) require.NoError(t, err) require.NoError(t, listResp.Body.Close()) - require.NotNil(t, listResult.Total) - assert.GreaterOrEqual(t, *listResult.Total, int64(1)) assert.GreaterOrEqual(t, len(listResult.Data), 1) // Test invalid cursor returns 400 From ed825634708dce90dd30e774bc8222abe99e90a0 Mon Sep 17 00:00:00 2001 From: Tiago Farto Date: Wed, 4 Mar 2026 16:20:45 +0200 Subject: [PATCH 4/6] chore: refactoring --- .../repository/feedback_records_repository.go | 50 ++++-- internal/repository/webhooks_repository.go | 153 +++++++++--------- internal/service/feedback_records_service.go | 15 +- internal/service/pagination.go | 7 +- internal/service/webhook_provider_test.go | 8 +- internal/service/webhook_sender_test.go | 8 +- internal/service/webhooks_service.go | 15 +- internal/service/webhooks_service_test.go | 8 +- 8 files changed, 152 insertions(+), 112 deletions(-) diff --git a/internal/repository/feedback_records_repository.go b/internal/repository/feedback_records_repository.go index d16f79d..1ba6d0a 100644 --- a/internal/repository/feedback_records_repository.go +++ b/internal/repository/feedback_records_repository.go @@ -190,7 +190,10 @@ const feedbackRecordsListSelect = ` ` // List retrieves feedback records with optional filters. Embedding is not selected (API reads stay lean). -func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) { +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. +func (r *FeedbackRecordsRepository) List( + ctx context.Context, filters *models.ListFeedbackRecordsFilters, +) ([]models.FeedbackRecord, bool, error) { query := feedbackRecordsListSelect whereClause, args := buildFilterConditions(filters) @@ -199,20 +202,34 @@ func (r *FeedbackRecordsRepository) List(ctx context.Context, filters *models.Li query += " ORDER BY collected_at DESC, id ASC" - if filters.Limit > 0 { - query += fmt.Sprintf(" LIMIT $%d", argCount) + limit := filters.Limit + if limit <= 0 { + limit = 100 + } + + query += fmt.Sprintf(" LIMIT $%d", argCount) - args = append(args, filters.Limit) + args = append(args, limit+1) + + records, err := r.fetchFeedbackRecords(ctx, query, args...) + if err != nil { + return nil, false, err } - return r.fetchFeedbackRecords(ctx, query, args...) + hasMore := len(records) > limit + if hasMore { + records = records[:limit] + } + + return records, hasMore, nil } // ListAfterCursor retrieves feedback records after the given keyset cursor (collected_at, id). // Order is collected_at DESC, id ASC. The cursor represents the last row of the previous page. +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. func (r *FeedbackRecordsRepository) ListAfterCursor( ctx context.Context, filters *models.ListFeedbackRecordsFilters, cursorCollectedAt time.Time, cursorID uuid.UUID, -) ([]models.FeedbackRecord, error) { +) ([]models.FeedbackRecord, bool, error) { query := feedbackRecordsListSelect whereClause, args := buildFilterConditions(filters) @@ -234,13 +251,26 @@ func (r *FeedbackRecordsRepository) ListAfterCursor( query += " ORDER BY collected_at DESC, id ASC" - if filters.Limit > 0 { - query += fmt.Sprintf(" LIMIT $%d", argCount) + limit := filters.Limit + if limit <= 0 { + limit = 100 + } + + query += fmt.Sprintf(" LIMIT $%d", argCount) + + args = append(args, limit+1) + + records, err := r.fetchFeedbackRecords(ctx, query, args...) + if err != nil { + return nil, false, err + } - args = append(args, filters.Limit) + hasMore := len(records) > limit + if hasMore { + records = records[:limit] } - return r.fetchFeedbackRecords(ctx, query, args...) + return records, hasMore, nil } // buildUpdateQuery builds an UPDATE query with SET clause and arguments. diff --git a/internal/repository/webhooks_repository.go b/internal/repository/webhooks_repository.go index 98a2235..0e0c694 100644 --- a/internal/repository/webhooks_repository.go +++ b/internal/repository/webhooks_repository.go @@ -127,72 +127,51 @@ func buildWebhookFilterConditions(filters *models.ListWebhooksFilters) (whereCla return whereClause, args } -// List retrieves webhooks with optional filters. -func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, error) { - query := ` +const webhooksListSelect = ` SELECT id, url, signing_key, enabled, tenant_id, created_at, updated_at, event_types, disabled_reason, disabled_at FROM webhooks ` +// List retrieves webhooks with optional filters. +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. +func (r *WebhooksRepository) List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + query := webhooksListSelect + whereClause, args := buildWebhookFilterConditions(filters) query += whereClause argCount := len(args) + 1 query += " ORDER BY created_at DESC, id ASC" - if filters.Limit > 0 { - query += fmt.Sprintf(" LIMIT $%d", argCount) - - args = append(args, filters.Limit) - } - - rows, err := r.db.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to list webhooks: %w", err) + limit := filters.Limit + if limit <= 0 { + limit = 100 } - defer rows.Close() - webhooks := []models.Webhook{} - - for rows.Next() { - var ( - webhook models.Webhook - dbEventTypes []string - ) + query += fmt.Sprintf(" LIMIT $%d", argCount) - err := rows.Scan( - &webhook.ID, &webhook.URL, &webhook.SigningKey, &webhook.Enabled, - &webhook.TenantID, &webhook.CreatedAt, &webhook.UpdatedAt, &dbEventTypes, - &webhook.DisabledReason, &webhook.DisabledAt, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan webhook: %w", err) - } + args = append(args, limit+1) - webhook.EventTypes, err = parseDBEventTypes(dbEventTypes) - if err != nil { - return nil, err - } - - webhooks = append(webhooks, webhook) + webhooks, err := r.fetchWebhooks(ctx, query, args...) + if err != nil { + return nil, false, err } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating webhooks: %w", err) + hasMore := len(webhooks) > limit + if hasMore { + webhooks = webhooks[:limit] } - return webhooks, nil + return webhooks, hasMore, nil } // ListAfterCursor retrieves webhooks after the given keyset cursor (created_at, id). // Order is created_at DESC, id ASC. The cursor represents the last row of the previous page. +// Fetches limit+1 as sentinel to determine hasMore; returns trimmed slice and hasMore. func (r *WebhooksRepository) ListAfterCursor( ctx context.Context, filters *models.ListWebhooksFilters, cursorCreatedAt time.Time, cursorID uuid.UUID, -) ([]models.Webhook, error) { - query := ` - SELECT id, url, signing_key, enabled, tenant_id, created_at, updated_at, event_types, disabled_reason, disabled_at - FROM webhooks - ` +) ([]models.Webhook, bool, error) { + query := webhooksListSelect whereClause, args := buildWebhookFilterConditions(filters) query += whereClause @@ -212,48 +191,26 @@ func (r *WebhooksRepository) ListAfterCursor( query += " ORDER BY created_at DESC, id ASC" - if filters.Limit > 0 { - query += fmt.Sprintf(" LIMIT $%d", argCount) - - args = append(args, filters.Limit) + limit := filters.Limit + if limit <= 0 { + limit = 100 } - rows, err := r.db.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to list webhooks after cursor: %w", err) - } - defer rows.Close() - - webhooks := []models.Webhook{} - - for rows.Next() { - var ( - webhook models.Webhook - dbEventTypes []string - ) - - err := rows.Scan( - &webhook.ID, &webhook.URL, &webhook.SigningKey, &webhook.Enabled, - &webhook.TenantID, &webhook.CreatedAt, &webhook.UpdatedAt, &dbEventTypes, - &webhook.DisabledReason, &webhook.DisabledAt, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan webhook: %w", err) - } + query += fmt.Sprintf(" LIMIT $%d", argCount) - webhook.EventTypes, err = parseDBEventTypes(dbEventTypes) - if err != nil { - return nil, err - } + args = append(args, limit+1) - webhooks = append(webhooks, webhook) + webhooks, err := r.fetchWebhooks(ctx, query, args...) + if err != nil { + return nil, false, err } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating webhooks: %w", err) + hasMore := len(webhooks) > limit + if hasMore { + webhooks = webhooks[:limit] } - return webhooks, nil + return webhooks, hasMore, nil } // Count returns the total count of webhooks matching the filters. @@ -429,7 +386,9 @@ func (r *WebhooksRepository) ListEnabled(ctx context.Context) ([]models.Webhook, }(), } - return r.List(ctx, filters) + webhooks, _, err := r.List(ctx, filters) + + return webhooks, err } // ListEnabledForEventType retrieves all enabled webhooks that should receive a specific event type. @@ -479,3 +438,43 @@ func (r *WebhooksRepository) ListEnabledForEventType(ctx context.Context, eventT return webhooks, nil } + +// fetchWebhooks executes the given query and scans rows into Webhook slices. +func (r *WebhooksRepository) fetchWebhooks(ctx context.Context, query string, args ...any) ([]models.Webhook, error) { + rows, err := r.db.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to list webhooks: %w", err) + } + defer rows.Close() + + webhooks := []models.Webhook{} + + for rows.Next() { + var ( + webhook models.Webhook + dbEventTypes []string + ) + + err := rows.Scan( + &webhook.ID, &webhook.URL, &webhook.SigningKey, &webhook.Enabled, + &webhook.TenantID, &webhook.CreatedAt, &webhook.UpdatedAt, &dbEventTypes, + &webhook.DisabledReason, &webhook.DisabledAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan webhook: %w", err) + } + + webhook.EventTypes, err = parseDBEventTypes(dbEventTypes) + if err != nil { + return nil, err + } + + webhooks = append(webhooks, webhook) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating webhooks: %w", err) + } + + return webhooks, nil +} diff --git a/internal/service/feedback_records_service.go b/internal/service/feedback_records_service.go index bc74f4d..9cfc6af 100644 --- a/internal/service/feedback_records_service.go +++ b/internal/service/feedback_records_service.go @@ -28,11 +28,11 @@ const uniqueByPeriodEmbedding = 24 * time.Hour type FeedbackRecordsRepository interface { Create(ctx context.Context, req *models.CreateFeedbackRecordRequest) (*models.FeedbackRecord, error) GetByID(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error) - List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, error) + List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, bool, error) ListAfterCursor( ctx context.Context, filters *models.ListFeedbackRecordsFilters, cursorCollectedAt time.Time, cursorID uuid.UUID, - ) ([]models.FeedbackRecord, error) + ) ([]models.FeedbackRecord, bool, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error) Delete(ctx context.Context, id uuid.UUID) error BulkDelete(ctx context.Context, userIdentifier string, tenantID *string) ([]uuid.UUID, error) @@ -118,6 +118,10 @@ func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid. func (s *FeedbackRecordsService) ListFeedbackRecords( ctx context.Context, filters *models.ListFeedbackRecordsFilters, ) (*models.ListFeedbackRecordsResponse, error) { + if filters == nil { + filters = &models.ListFeedbackRecordsFilters{} + } + if filters.Limit <= 0 { filters.Limit = 100 } @@ -126,6 +130,7 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( var ( records []models.FeedbackRecord + hasMore bool err error ) @@ -135,16 +140,16 @@ func (s *FeedbackRecordsService) ListFeedbackRecords( return nil, fmt.Errorf("decode cursor: %w", decErr) } - records, err = s.repo.ListAfterCursor(ctx, filters, collectedAt, id) + records, hasMore, err = s.repo.ListAfterCursor(ctx, filters, collectedAt, id) } else { - records, err = s.repo.List(ctx, filters) + records, hasMore, err = s.repo.List(ctx, filters) } if err != nil { return nil, fmt.Errorf("list feedback records: %w", err) } - meta, err := BuildListPaginationMeta(filters.Limit, len(records), func() (string, error) { + meta, err := BuildListPaginationMeta(filters.Limit, hasMore, func() (string, error) { last := records[len(records)-1] return cursor.Encode(last.CollectedAt, last.ID) diff --git a/internal/service/pagination.go b/internal/service/pagination.go index 61df04a..8a300d9 100644 --- a/internal/service/pagination.go +++ b/internal/service/pagination.go @@ -7,13 +7,14 @@ type ListPaginationMeta struct { } // BuildListPaginationMeta builds pagination metadata for cursor-based list responses. -// encodeLast is called only when there may be more results (recordCount == limit && recordCount > 0). +// hasMore indicates a sentinel row was fetched (limit+1 returned, trimmed to limit). +// encodeLast is called only when hasMore is true to produce next_cursor. func BuildListPaginationMeta( - limit, recordCount int, encodeLast func() (string, error), + limit int, hasMore bool, encodeLast func() (string, error), ) (ListPaginationMeta, error) { meta := ListPaginationMeta{Limit: limit} - if recordCount == limit && recordCount > 0 && encodeLast != nil { + if hasMore && encodeLast != nil { next, err := encodeLast() if err != nil { return meta, err diff --git a/internal/service/webhook_provider_test.go b/internal/service/webhook_provider_test.go index d14775a..f3029f1 100644 --- a/internal/service/webhook_provider_test.go +++ b/internal/service/webhook_provider_test.go @@ -60,14 +60,14 @@ func (m *mockProviderRepo) GetByID(_ context.Context, _ uuid.UUID) (*models.Webh return nil, errors.New("not implemented") } -func (m *mockProviderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, error) { - return nil, errors.New("not implemented") +func (m *mockProviderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + return nil, false, errors.New("not implemented") } func (m *mockProviderRepo) ListAfterCursor( _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, -) ([]models.Webhook, error) { - return nil, errors.New("not implemented") +) ([]models.Webhook, bool, error) { + return nil, false, errors.New("not implemented") } func (m *mockProviderRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { diff --git a/internal/service/webhook_sender_test.go b/internal/service/webhook_sender_test.go index 7de0f9b..09789bd 100644 --- a/internal/service/webhook_sender_test.go +++ b/internal/service/webhook_sender_test.go @@ -32,14 +32,14 @@ func (m *mockSenderRepo) GetByID(_ context.Context, _ uuid.UUID) (*models.Webhoo return nil, nil } -func (m *mockSenderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, error) { - return nil, nil +func (m *mockSenderRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + return nil, false, nil } func (m *mockSenderRepo) ListAfterCursor( _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, -) ([]models.Webhook, error) { - return nil, nil +) ([]models.Webhook, bool, error) { + return nil, false, nil } func (m *mockSenderRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { diff --git a/internal/service/webhooks_service.go b/internal/service/webhooks_service.go index 4069037..33f4a34 100644 --- a/internal/service/webhooks_service.go +++ b/internal/service/webhooks_service.go @@ -21,11 +21,11 @@ import ( type WebhooksRepository interface { Create(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error) GetByID(ctx context.Context, id uuid.UUID) (*models.Webhook, error) - List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, error) + List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, bool, error) ListAfterCursor( ctx context.Context, filters *models.ListWebhooksFilters, cursorCreatedAt time.Time, cursorID uuid.UUID, - ) ([]models.Webhook, error) + ) ([]models.Webhook, bool, error) Count(ctx context.Context, filters *models.ListWebhooksFilters) (int64, error) Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error) Delete(ctx context.Context, id uuid.UUID) error @@ -123,6 +123,10 @@ func (s *WebhooksService) GetWebhook(ctx context.Context, id uuid.UUID) (*models // ListWebhooks retrieves a list of webhooks with optional filters. // Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages. func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.ListWebhooksFilters) (*models.ListWebhooksResponse, error) { + if filters == nil { + filters = &models.ListWebhooksFilters{} + } + if filters.Limit <= 0 { filters.Limit = 100 } @@ -131,6 +135,7 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List var ( webhooks []models.Webhook + hasMore bool err error ) @@ -140,16 +145,16 @@ func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.List return nil, fmt.Errorf("decode cursor: %w", decErr) } - webhooks, err = s.repo.ListAfterCursor(ctx, filters, createdAt, id) + webhooks, hasMore, err = s.repo.ListAfterCursor(ctx, filters, createdAt, id) } else { - webhooks, err = s.repo.List(ctx, filters) + webhooks, hasMore, err = s.repo.List(ctx, filters) } if err != nil { return nil, fmt.Errorf("list webhooks: %w", err) } - meta, err := BuildListPaginationMeta(filters.Limit, len(webhooks), func() (string, error) { + meta, err := BuildListPaginationMeta(filters.Limit, hasMore, func() (string, error) { last := webhooks[len(webhooks)-1] return cursor.Encode(last.CreatedAt, last.ID) diff --git a/internal/service/webhooks_service_test.go b/internal/service/webhooks_service_test.go index 3c4fdcf..adcc598 100644 --- a/internal/service/webhooks_service_test.go +++ b/internal/service/webhooks_service_test.go @@ -25,14 +25,14 @@ func (m *mockWebhooksRepo) GetByID(_ context.Context, _ uuid.UUID) (*models.Webh return nil, nil } -func (m *mockWebhooksRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, error) { - return nil, nil +func (m *mockWebhooksRepo) List(_ context.Context, _ *models.ListWebhooksFilters) ([]models.Webhook, bool, error) { + return nil, false, nil } func (m *mockWebhooksRepo) ListAfterCursor( _ context.Context, _ *models.ListWebhooksFilters, _ time.Time, _ uuid.UUID, -) ([]models.Webhook, error) { - return nil, nil +) ([]models.Webhook, bool, error) { + return nil, false, nil } func (m *mockWebhooksRepo) Count(_ context.Context, _ *models.ListWebhooksFilters) (int64, error) { From 6ccef4540a86d5898a065b423d37aa778c8dbb23 Mon Sep 17 00:00:00 2001 From: Tiago Farto Date: Wed, 4 Mar 2026 17:49:49 +0200 Subject: [PATCH 5/6] chore: fix webhook list limit --- internal/repository/webhooks_repository.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/repository/webhooks_repository.go b/internal/repository/webhooks_repository.go index 0e0c694..5a13ffe 100644 --- a/internal/repository/webhooks_repository.go +++ b/internal/repository/webhooks_repository.go @@ -376,19 +376,16 @@ func parseDBEventTypes(ss []string) ([]datatypes.EventType, error) { return out, nil } -// ListEnabled retrieves all enabled webhooks. +// ListEnabled retrieves all enabled webhooks (unbounded; used for delivery fan-out). func (r *WebhooksRepository) ListEnabled(ctx context.Context) ([]models.Webhook, error) { - filters := &models.ListWebhooksFilters{ - Enabled: func() *bool { - b := true + query := webhooksListSelect + ` WHERE enabled = true ORDER BY created_at DESC, id ASC` - return &b - }(), + webhooks, err := r.fetchWebhooks(ctx, query) + if err != nil { + return nil, fmt.Errorf("list enabled webhooks: %w", err) } - webhooks, _, err := r.List(ctx, filters) - - return webhooks, err + return webhooks, nil } // ListEnabledForEventType retrieves all enabled webhooks that should receive a specific event type. From 9dd22e992b2da6f93e7b389d330e1fb699cdd325 Mon Sep 17 00:00:00 2001 From: Tiago Farto Date: Thu, 5 Mar 2026 14:03:11 +0200 Subject: [PATCH 6/6] chore: sorting list --- internal/repository/webhooks_repository.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/repository/webhooks_repository.go b/internal/repository/webhooks_repository.go index 5a13ffe..8509153 100644 --- a/internal/repository/webhooks_repository.go +++ b/internal/repository/webhooks_repository.go @@ -396,6 +396,7 @@ func (r *WebhooksRepository) ListEnabledForEventType(ctx context.Context, eventT FROM webhooks WHERE enabled = true AND (event_types IS NULL OR event_types = '{}' OR event_types @> ARRAY[$1]::VARCHAR(64)[]) + ORDER BY id ` rows, err := r.db.Query(ctx, query, eventType)