feat(dashboard-api): supabase auth users sync background runner#2247
feat(dashboard-api): supabase auth users sync background runner#2247ben-fornefeld wants to merge 62 commits intomainfrom
Conversation
PR SummaryMedium Risk Overview Reviewed by Cursor Bugbot for commit fbb8835. Bugbot is set up for automated code reviews on this repo. Configure here. |
packages/db/pkg/auth/sql_queries/supabase_auth_user_sync/get_auth_user.sql
Outdated
Show resolved
Hide resolved
| ); | ||
|
|
||
| CREATE INDEX auth_user_sync_queue_pending_idx | ||
| ON auth.user_sync_queue (id) |
There was a problem hiding this comment.
The partial index only covers dead_lettered_at IS NULL AND locked_at IS NULL, so every unlocked non-dead-lettered row satisfies it — including rows with a future next_attempt_at. The claim query then discards those rows in a post-index filter. Under sustained retry load (many rows waiting with next_attempt_at > now()), the index returns a large candidate set that must be filtered in-memory before the LIMIT is applied. Adding next_attempt_at as the leading column — ON auth.user_sync_queue (next_attempt_at, id) WHERE dead_lettered_at IS NULL AND locked_at IS NULL — lets PostgreSQL range-scan only rows that are actually ready to be claimed.
|
|
||
| r.l.Debug(ctx, "claimed queue batch", zap.Int("count", len(items))) | ||
|
|
||
| for _, item := range items { |
There was a problem hiding this comment.
Items in a batch are processed sequentially. Each item does 2–3 synchronous DB round-trips (GetAuthUser + UpsertPublicUser/DeletePublicUser + Ack/Retry), so batch throughput is proportional to batchSize × DB latency. With the default BatchSize=50 and typical Postgres latency, a full batch can take several seconds — longer than PollInterval=2s. If sustained write volume is expected, consider processing items in a small goroutine pool.
… while enqueuing for processing
…om/e2b-dev/infra into feature/supabase-users-sync-worker
…d recovery mechanisms - Updated the sync runner to use `RunWithRestart` for improved error recovery. - Introduced a new `UserSyncQueue` model to manage user synchronization tasks. - Added SQL migration for creating the `user_sync_queue` table with necessary triggers. - Implemented tests for the processor and supervisor to ensure robust handling of retries and panics. - Refactored existing queries to target the new `public.user_sync_queue` table.
packages/dashboard-api/internal/supabaseauthusersync/runner_test.go
Outdated
Show resolved
Hide resolved
…agement - Introduced `supabase_auth_user_sync_enabled` variable to control user synchronization. - Updated Nomad job configuration to include the new sync setting. - Added Google Secret Manager resources for managing the sync configuration securely. - Enhanced the dashboard API to utilize the new sync configuration in processing logic. - Refactored related components to improve error handling and logging for the sync process.
packages/dashboard-api/internal/supabaseauthusersync/runner_test.go
Outdated
Show resolved
Hide resolved
- Replaced the previous `Store` implementation with a new structure that integrates both authentication and main database queries. - Updated the `Runner` and `NewRunner` functions to accommodate the new database client structure. - Removed obsolete SQL queries and migration files related to the `user_sync_queue` table. - Enhanced the test suite to reflect changes in the runner's initialization and database interactions.
- Updated the `TestSupabaseAuthUserSyncRunner_EndToEnd` to apply necessary database migrations before running tests. - Refactored the `SetupDatabase` function to include a new method `ApplyMigrations` for better migration management.
Made-with: Cursor
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3f56979748
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| -- +goose Up | ||
| -- +goose StatementBegin | ||
| CREATE TABLE public.user_sync_queue ( |
There was a problem hiding this comment.
Ensure auth queue migration is executed by db-migrator
The queue/table trigger migration was added under packages/db/pkg/auth/migrations, but the deployed migrator workflow only runs ./migrations from packages/db/scripts/migrator.go (and the db Dockerfile only copies db/migrations). That means the standard prestart migration job will never create public.user_sync_queue or the new triggers, so enabling the sync worker will fail at runtime when it tries to claim from a non-existent table.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
we want to execute this manually + it should not be tied to main db migrations since authDB will be separate going forward
- Introduced a new process outcome `ready_to_ack` to streamline acknowledgment handling. - Refactored the `process` method to prepare for batch acknowledgment of processed items. - Added a new `AckBatch` method in the store to handle multiple acknowledgments efficiently. - Updated the `Runner` to process items in batches and finalize acknowledgments accordingly. - Removed obsolete SQL query for single item acknowledgment as part of the refactor. - Enhanced tests to cover new deletion logic and acknowledgment scenarios.
| -- name: GetAuthUserByID :one | ||
| SELECT id, email | ||
| FROM auth.users | ||
| WHERE id = sqlc.arg(user_id)::uuid; |
There was a problem hiding this comment.
🔴 The SQL query selects email from auth.users without null handling, and the generated AuthUser.Email field is a non-nullable string; Supabase supports phone-only and anonymous auth where email is NULL, so pgx returns a scan error (not pgx.ErrNoRows) for those users. In reconcile(), that error triggers the generic retry path, causing indefinite retries until the item is dead-lettered and the user is never synced to public.users. Fix by changing the query to COALESCE(email, '') or by mapping Email to *string and skipping the upsert when absent.
Extended reasoning...
What the bug is and how it manifests
The file packages/db/pkg/auth/sql_queries/supabase_auth_user_sync/get_auth_user.sql (lines 1-4) contains:
-- name: GetAuthUserByID :one
SELECT id, email
FROM auth.users
WHERE id = sqlc.arg(user_id)::uuid;The generated code in get_auth_user.sql.go scans the result into AuthUser, whose Email field is declared as a non-nullable string in models.go. In pgx v5, scanning a SQL NULL into a plain Go string returns a scan error — it does not return pgx.ErrNoRows.
The specific code path that triggers it
In processor.go, reconcile() calls p.store.GetAuthUser(ctx, item.UserID). It checks errors.Is(err, pgx.ErrNoRows) to decide whether to delete the public user. A NULL-email scan error is not pgx.ErrNoRows, so it falls into the generic if err != nil { return "", fmt.Errorf(...) } branch, which propagates back to process(). Since the item is not yet at maxAttempts, process() calls store.Retry(...), scheduling the item for another attempt with exponential backoff.
Why existing code does not prevent it
The reconcile() function only has two exit paths for errors from GetAuthUser: the pgx.ErrNoRows special case (leading to a delete) and a generic retry path. There is no handling for scan errors caused by NULL columns, and no defensive COALESCE or nullable type in the generated SQL/struct.
Impact
Any Supabase deployment using phone-only or anonymous authentication will have users with auth.users.email = NULL. Every such user that enters the sync queue will be retried up to MaxAttempts (default 20) times with growing backoffs and then dead-lettered. Those users are permanently excluded from public.users, causing any feature that relies on public.users to silently omit them.
How to fix it
The simplest fix is to add COALESCE in the SQL query:
SELECT id, COALESCE(email, '') AS email
FROM auth.users
WHERE id = sqlc.arg(user_id)::uuid;Alternatively, change the sqlc config so email maps to *string, update AuthUser.Email to *string, and skip the upsert (or pass an empty string) when Email is nil.
Step-by-step proof
- A Supabase phone-only user signs up. The
auth.usersrow hasid = <uuid>,email = NULL. - The
sync_insert_auth_users_to_public_users_triggerfires, inserting(user_id, 'upsert')intopublic.user_sync_queue. - The worker calls
ClaimBatch, picks up the item, incrementsattempt_countto 1. processor.process()->processOnce()->reconcile()->store.GetAuthUser(ctx, userID).GetAuthUserByIDexecutesSELECT id, email FROM auth.users WHERE id = $1. The row exists so pgx does not returnpgx.ErrNoRows. Insteadrow.Scan(&i.ID, &i.Email)attempts to scan NULL intostringand returns a pgx scan error.- Back in
reconcile(),errors.Is(err, pgx.ErrNoRows)isfalse. The error is returned asfmt.Errorf("get auth.users %s: %w", ...). process()seesitem.AttemptCount (1) < maxAttempts (20), so it callsstore.Retry(...)with a 5s backoff.- Steps 3-7 repeat until
attempt_count >= 20, at which pointDeadLetteris called. - The user is now dead-lettered and will never be synced to
public.users.
There was a problem hiding this comment.
fine, we require emails for auth
| %{ for key, val in env } | ||
| ${ key } = "${ val }" | ||
| %{ endfor } |
There was a problem hiding this comment.
🟡 The %{ for key, val in env } loop in dashboard-api.hcl renders env var values directly inside double-quoted HCL strings without escaping. A value containing a double-quote produces syntactically invalid HCL (Nomad job submission fails), and a value containing ${ causes Terraform templatefile() to attempt expression evaluation at render time (plan-time error). DASHBOARD_API_ENV_VARS is documented as accepting arbitrary operator-supplied values, making these characters plausible; fix by escaping double-quotes in values or passing extra env vars via a Nomad template stanza instead.
Extended reasoning...
What the bug is and how it manifests
In iac/modules/job-dashboard-api/jobs/dashboard-api.hcl lines 86-88, a Terraform templatefile() directive renders arbitrary env var values directly inside double-quoted HCL strings:
%{ for key, val in env }
${ key } = "${ val }"
%{ endfor }
Two distinct injection vectors exist. First, if val contains a double-quote character, the rendered output becomes syntactically invalid HCL (e.g. MY_VAR = "conn"string" breaks the HCL parser), causing Nomad to reject the job spec at submit time. Second, if val contains ${, Terraform's templatefile() function interprets it as an expression interpolation during template rendering and produces a plan-time error before the job is ever submitted.
The specific code path that triggers it
The value flows through: DASHBOARD_API_ENV_VARS env var -> parsed as map(string) in iac/provider-gcp/variables.tf -> passed to module.nomad as dashboard_api_env_vars -> passed to module.dashboard_api as env -> filtered in local.env in main.tf (removes null/empty only) -> passed to templatefile() -> rendered at line 87 with no escaping.
Why existing code does not prevent it
The locals block in main.tf only filters out null/empty values (if value != null && value != ""); it does not escape or sanitize values. Terraform's templatefile() does not automatically escape template variable contents when they are interpolated. The HCL string delimiters are hard-coded double quotes with no escaping mechanism.
What the impact would be
For a double-quote injection (e.g. a PostgreSQL DSN like postgresql://user:p"ass@host/db), the Nomad job submission fails at terraform apply time with an HCL parse error -- the deployment is blocked. For a ${ injection (e.g. a JWT secret or template string containing ${), Terraform fails during plan or apply with an evaluation error. Neither failure is silent, but both are surprising and difficult to debug without knowing the root cause.
How to fix it
The safest fix is to use replace(val, "\"", "\\\"") to escape double-quotes within the current templatefile() approach, or pass extra env vars through a Nomad template stanza which has proper escaping semantics. The ${ vector can be addressed by replacing ${ with $${ (Terraform's escape sequence for a literal ${).
Step-by-step proof
- Operator sets
DASHBOARD_API_ENV_VARS='{"DB_URL":"postgresql://user:p\"ass@host/db"}'in their env file. - Terraform receives
var.dashboard_api_env_vars = {DB_URL = "postgresql://user:p\"ass@host/db"}. local.envpasses it through unchanged (non-null, non-empty).templatefile()renders line 87 as:DB_URL = "postgresql://user:p"ass@host/db"-- the embedded"terminates the HCL string prematurely.- Nomad's job spec parser receives invalid HCL and rejects the job with a parse error.
- Alternatively, for a value containing
${SOME_VAR}:templatefile()evaluates${SOME_VAR}as a Terraform expression, returning an undefined variable error duringterraform apply.
Note: the identical ${ key } = "${ value }" pattern exists in job-api and job-orchestrator for their job_env_vars, but those maps contain hardcoded infrastructure constants (VOLUME_TOKEN_ISSUER, etc.) unlikely to contain special characters. The new DASHBOARD_API_ENV_VARS is explicitly documented as a freeform map for arbitrary operator-supplied values, removing that safety assumption.
packages/dashboard-api/internal/backgroundworker/auth_user_sync.go
Outdated
Show resolved
Hide resolved
| -- +goose StatementBegin | ||
|
|
||
| DROP ROLE IF EXISTS trigger_user; | ||
| DROP SCHEMA IF EXISTS auth_custom; |
There was a problem hiding this comment.
Down migration drops schema without CASCADE, will fail
Medium Severity
The down migration uses DROP SCHEMA IF EXISTS auth_custom without CASCADE, but the schema will contain River library tables (created by the programmatic RunRiverMigrations call between goose migration steps 1 and 3). Since goose has no knowledge of the interleaved River migrations, running the down migration will fail with a "schema is not empty" error because auth_custom.river_job and other River tables still exist.
Reviewed by Cursor Bugbot for commit 4862139. Configure here.
There was a problem hiding this comment.
desired, we have other services potentially depend on the schema and don't want to cascade
| }) | ||
| } | ||
|
|
||
| return g.Wait() |
There was a problem hiding this comment.
Stopping already-stopped River client causes spurious shutdown error
Low Severity
When the River client stops unexpectedly, waitForServiceStop returns via riverStoppedCh. Then shutdownService calls riverClient.Stop(ctx) on the already-stopped client, which returns ErrClientAlreadyStopped. This causes shutdownService to return an error and the process exits with code 1 with a misleading "shutdown River client" error, even though the HTTP server shutdown succeeded. The shutdownService function lacks a guard for the already-stopped case.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 29bbf59. Configure here.
There was a problem hiding this comment.
invalid, the channel returns nil either way
Replace the broad backlog case with smaller tests that cover stale upsert cleanup and same-email trigger behavior. This keeps the worker coverage focused on the branch's critical correctness signals without extra soak-style runtime.
Allow dashboard-api to use a dedicated SUPABASE_DB_CONNECTION_STRING while keeping the existing fallback to POSTGRES_CONNECTION_STRING. Thread the new setting through Terraform so deployments can configure the worker without reusing the auth DB connection string.
| NODE_ID = "$${node.unique.id}" | ||
| PORT = "$${NOMAD_PORT_api}" |
There was a problem hiding this comment.
this should stay in the dashboard-api.hcl config
.env.gcp.template
Outdated
| # Additional dashboard-api env vars passed directly to the Nomad job (default: {}) | ||
| # Values here are merged into the job env and can override module defaults. | ||
| # Example: '{"ENABLE_AUTH_USER_SYNC_BACKGROUND_WORKER":"true"}' | ||
| DASHBOARD_API_ENV_VARS= |
There was a problem hiding this comment.
can we remove this altogether and just use the newly added TF_vars env variables?
iac/provider-gcp/Makefile
Outdated
| $(call tfvar, LOKI_BOOT_DISK_TYPE) \ | ||
| $(call tfvar, LOKI_USE_V13_SCHEMA_FROM) \ | ||
| $(call tfvar, DASHBOARD_API_COUNT) \ | ||
| $(call tfvar, DASHBOARD_API_ENV_VARS) \ |
There was a problem hiding this comment.
we should be able to not add this anymore now
Keep dashboard-api specific env wiring explicit, restore NODE_ID and PORT in the Nomad job spec, and remove generic dashboard-api env passthrough. Also trim unrelated lint churn from the branch and scope the remaining test/migration helpers to the auth user sync worker flow.
packages/db/pkg/supabase/migrations/20260401000003_river_auth_user_sync_triggers.sql
Show resolved
Hide resolved
packages/db/pkg/supabase/migrations/20260401000001_river_auth_custom_schema.sql
Show resolved
Hide resolved
Remove the copied trigger_user scaffolding from the Supabase River migrations because these SECURITY DEFINER functions never use that role. Keep the test bootstrap aligned with the simplified migration flow.
Replace direct pgx no-rows checks in dashboard-api handlers with dberrors.IsNotFoundError so the package uses the shared database error handling consistently.
packages/dashboard-api/Makefile
Outdated
| define export_extra_env | ||
| export $$(printf '%s' "$$DASHBOARD_API_ENV_VARS" | jq -r '(if .=="" then empty elif type=="string" then (fromjson? // empty) else . end) | to_entries? // [] | map("\(.key)=\(.value|tostring)") | .[]' 2>/dev/null) 2>/dev/null; | ||
| endef |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit fbb8835. Configure here.
|
|
||
| riverClient, err = backgroundworker.StartAuthUserSyncWorker( | ||
| ctx, | ||
| ctx, |
There was a problem hiding this comment.
River client started with wrong context for lifecycle
Medium Severity
StartAuthUserSyncWorker accepts separate setupCtx and runCtx parameters, but main.go passes the same base ctx for both. The runCtx controls the River client's lifecycle via riverClient.Start(runCtx). Since this is the base context (only cancelled by defer cancel() when run() returns), the riverClient.Stopped() channel used in waitForServiceStop will effectively never fire on its own — River won't self-stop because its context is never cancelled independently. If signalCtx were passed as runCtx, River would begin graceful shutdown as soon as a signal arrives, in parallel with the explicit shutdownService call, improving shutdown responsiveness.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit fbb8835. Configure here.


Motivation
We're migrating our Database and need to de-couple current database triggers between auth and core schema. This PR allows having a sync between these two schema, that supports separate db configurations respectively.