Skip to content

Commit 0f4e36f

Browse files
authored
feat: add tenant settings for language enrichment (#89)
* feat: add tenant settings for language enrichment Add a tenant-scoped settings store so language enrichment can be configured per tenant, starting with target_language (a normalized BCP-47 locale, e.g. en-US). First task in the language consolidation chain; unblocks per-tenant translated enrichment (ENG-1255). - migration 012: tenant_settings keyed by tenant_id (natural PK) with an open-ended settings JSONB, so new settings are added as struct fields without a schema migration - repository: tenant-scoped Get and a lock-serialized upsert (INSERT ... ON CONFLICT (tenant_id)) reusing the tenant write lock - service: GetSettings returns documented defaults when unset (never 404); target_language is validated/normalized via golang.org/x/text - HTTP: GET/PUT /v1/tenants/{tenant_id}/settings; tenant_id is path-only so a request can only ever address its own tenant - tenant data purge now also removes tenant_settings - OpenAPI spec, unit tests, and integration tests (round-trip, defaults, tenant isolation, purge cleanup) ENG-1254 * feat: cap tenant settings request body at 8KB (413) Cap PUT /v1/tenants/{tenant_id}/settings request bodies at 8 KiB via http.MaxBytesReader; an oversized body is rejected with 413 before it is read into memory. Add a dedicated content_too_large problem code and type to the shared response layer so a 413 is distinguishable from a malformed-body 400 via the machine-readable `code` clients branch on, and document it in the OpenAPI ErrorModel enum and the PUT responses. - handler: 8 KiB MaxBytesReader -> 413; malformed JSON still -> 400 - response: CodeContentTooLarge / ProblemTypeContentTooLarge + 413 arms in codeForStatus/problemTypeForStatus (+ unit test) - openapi: 413 response on PUT settings; content_too_large in the ErrorModel code enum - tests: integration test asserts the 413 status and content_too_large code ENG-1254 * feat: add PATCH endpoint for partial tenant settings updates Add PATCH /v1/tenants/{tenant_id}/settings for partial updates: only the fields present in the body change, omitted fields are left untouched, and an empty string clears a field. PUT remains a full replace. Implemented as an atomic top-level JSONB merge (settings = settings || patch) inside the existing shared tenant write lock, so concurrent writes don't race and settings the patch doesn't mention survive. Pointer fields in the request distinguish "absent" (leave unchanged) from "set" (including empty = clear). - repository: Patch (|| merge) + a shared writeSettings helper used by both Upsert and Patch - handler: Patch + a shared decodeSettingsBody helper (8KB cap, 413, unknown-field rejection) used by both Update and Patch - openapi: PATCH operation + PatchTenantSettingsInputBody schema - tests: service unit tests + integration tests, including a merge test proving a key the patch omits is preserved ENG-1254 * test: cover PATCH creating settings for a new tenant Add an integration test for the PATCH insert branch (a tenant with no settings row yet): PATCH creates the row with the patched field and a subsequent GET returns it. The other PATCH tests all seed a row first, so this is the only coverage of the advertised create-on-new-tenant path. ENG-1254 * test: cover PATCH invalid-locale rejection Extend TestTenantSettings_InvalidLocaleRejected to drive both PUT and PATCH. Both verbs normalize target_language through the same path, so both must reject an invalid locale with 400; previously only PUT was exercised, leaving the PATCH handler's error-response branch (77.8%, below the 80% bar) untested. The PATCH handler is now fully covered. ENG-1254 * style(migrations): use lowercase goose annotations AGENTS.md documents `-- +goose up` / `-- +goose down` (lowercase) as the migration annotation style; 012 used the capitalized form. goose treats the keywords case-insensitively, so this is style-only — no re-run, the applied schema is unchanged. ENG-1254 * test: assert content_too_large code on oversized PATCH body The PATCH oversized-body test only checked the 413 status, while the PUT test also asserts the RFC 9457 `content_too_large` problem code. Mirror it so a PATCH returning the wrong code under a correct 413 is caught. ENG-1254 * test: drop dead tenant_settings cleanup line CleanupTestData has no callers anywhere in the repo (there is no TestMain, and setupTestServer's cleanup never invokes it), so the DELETE FROM tenant_settings line it was extended with never ran — dead code giving false confidence of cleanup. The settings tests isolate via UUID-unique tenant ids (testTenantID), like the rest of the suite, so no cleanup is needed. Reverts tests/helpers.go to its prior state. ENG-1254 * refactor: make tenant settings PATCH an RFC 7396 merge patch PATCH now follows JSON Merge Patch (RFC 7396): a member with a value sets that setting, an explicit JSON null removes the key, and an omitted member is left unchanged. Previously an empty string cleared a field and there was no way to remove a key (the JSONB `||` merge could only add or overwrite). - models: add a small generic Optional[T] that distinguishes absent vs null vs value when decoding a merge-patch body; PatchTenantSettingsRequest uses it instead of *string. - service: translate the typed request into keys-to-set and keys-to-remove; reject an explicit empty string (clear via null), keeping the value contract "a valid BCP-47 locale, or null to remove". - repository: Patch applies (settings || set) - removeKeys::text[], so null members delete keys while unmentioned keys survive; writeSettings takes the extra bind. Upsert (full replace) is unchanged. - openapi: declare application/merge-patch+json (application/json still accepted), make target_language nullable, document the null-removes rule. - tests: Optional tri-state unit test; service set/remove/empty-rejected cases; integration PatchNullRemovesField (key deleted, siblings kept) and PatchEmptyStringRejected. ENG-1254 * fix: avoid spectral crash on null example in PATCH settings spec Spectral 6.16.0 (nimma) crashes with "Cannot read properties of null (reading 'enum')" when a lint rule traverses a literal null leaf in an example value — which the merge-patch `remove` example introduced (target_language: null). Drop the structured null example and show the null-removes form in the operation description instead. SDK codegen still learns nullability from the schema (target_language type: [string, "null"]), so the contract is unchanged. ENG-1254 * fix: restore target_language validation on RFC 7396 PATCH Switching the PATCH field to Optional[string] dropped the struct-tag validation (no_null_bytes, max=35) — go-playground's validator can't reach through the custom type — so PATCH accepted null bytes and over-long values that PUT and the OpenAPI maxLength:35 still reject. Enforce the same bounds in the PATCH value path (normalizeProvidedTargetLanguage), restoring PUT/PATCH parity and honoring the contract. Also pin settingKeyTargetLanguage to the EnrichmentSettings json tag with a unit test, so a tag rename can't silently break PATCH null-removal. ENG-1254 * feat: add translated-text columns to feedback records ENG-1255 (persistence): scaffold language-enrichment storage. Adds the value_text_translated + translation_lang_key columns (migration 013), surfaces them as read-only fields on FeedbackRecord and the OpenAPI output schema, includes them in every feedback-record read path, and adds a tenant-write-locked SetTranslation repo method that does not publish a domain event (so writing a translation can't loop the enrichment pipeline). The async translation worker that populates them follows. ENG-1255 * feat: add translation config + tenant-settings cache ENG-1255: add TranslationConfig (TRANSLATION_* env; translation stays disabled unless provider and model are set) and a per-process TTL+LRU tenant-settings cache — the cache deferred from ENG-1254. The cache wraps the settings accessor so the translation enqueue gate and worker can resolve a tenant's target language without a DB read per feedback event; staleness is TTL-bounded and self-corrects because the worker records the target it actually used. Registers a bounded "tenant_settings" cache metric label. TRANSLATION_BASE_URL is normalized/validated like EMBEDDING_BASE_URL. ENG-1255 * refactor: dedupe feedback-record scan + harden settings cache Re-review follow-ups on the ENG-1255 foundation: - Unify the feedback-record column/scan duplication onto one feedbackRecordColumns const + the existing scanFeedbackRecord helper. The repo previously inlined 4 column lists + 4 scans while taxonomy_repository.go had its own scanner; that scanner is now the single one (extended to the two translation columns) and the taxonomy node-records query selects them too. Collapses 8 sites to 2 so column and scan order cannot drift (a silent runtime scan error otherwise). - config: default TENANT_SETTINGS_CACHE_TTL in applyDefaults too, mirroring the other nested DurationSec defaults, so a dropped env-default tag can't silently disable the cache; disable explicitly via TENANT_SETTINGS_CACHE_SIZE=0. - observability: cache metric descriptions now list tenant_settings as a label. - tests: add a SetTranslation integration test (persist, clear, NotFound) — the highest-risk new write path, covering the value/NULL round-trip. ENG-1255 * feat: add TranslationClient + OpenAI/Google factory ENG-1255: the LLM seam for translation enrichment. Adds a TranslationClient interface (Translate(TranslateRequest) -> text) and a provider-agnostic factory mirroring the embedding factory — openai / google / google-gemini, explicit TRANSLATION_PROVIDER (no default), validating API key, base URL, and Gemini project+location. The OpenAI and Google SDK wrappers gain a low-level Translate (chat completions / generate-content at temperature 0); a prompt adapter builds a Formbricks-style "professional translator" prompt using human-readable language names (x/text/language/display), falling back to "original language" when the source is unknown. Unit tests cover config validation, the language-name helper, and prompt rendering. ENG-1255 * feat: add translation job args + enqueue provider ENG-1255: the enqueue side of translation enrichment, mirroring the embedding provider. Adds FeedbackTranslationArgs (River job, unique by record + target + value_text hash) and TranslationProvider — on a feedback-record create (non-empty open text) or an update that changed value_text, it enqueues a job, but only for text fields with non-empty value_text whose tenant has a target language configured (read via the settings cache). Failures are logged and swallowed so record ingestion is never blocked. Consolidates the two identical per-feature River inserter interfaces into one shared RiverJobInserter, and reuses TenantSettingsReader for the target lookup (the iface linter flagged the duplicates). Unit tests cover enqueue eligibility, target gating, update-changed-fields, and the error/skip paths. ENG-1255 * feat: add feedback translation worker ENG-1255: the worker side. FeedbackTranslationWorker loads the record, translates value_text into the job's target language via the TranslationClient — or copies value_text verbatim when the source language already shares the target's base language (no LLM call) — and persists it through FeedbackRecordsService.SetTranslation. Mirrors the embedding worker's error handling: a missing record completes the job, a tenant write conflict retries, a provider error retries then fails on the final attempt; value_text that became empty since enqueue is skipped. Registered in the River wiring, gated on a configured TranslationClient. Adds FeedbackRecordsService.SetTranslation (+ the repository interface method) and 8 worker unit tests (translate, source==target copy, skip-empty, not-found, provider retry/fail, tenant-write-conflict, record-gone-on-write). ENG-1255 * fix: clear stale translations + compare script in source==target Address review findings on the translation pipeline: - Clear-on-empty was unwired: editing value_text to empty left a stale value_text_translated forever. The provider now enqueues on an update that empties value_text (mirroring the embedding provider) and the worker clears the translation (SetTranslation nil) instead of skipping; the repository nulls both translation columns when the translation is nil. - The source==target short-circuit compared only the base language, so it would copy zh-Hans text as a zh-Hant "translation". It now compares base AND script (sameLanguageAndScript): different scripts translate, regional variants (en-US/en-GB) still copy. - Update the openai/googleai package docs to mention chat/generate-content. Adds provider (update-to-empty enqueues a clear) and worker (clears on empty, translates across scripts) tests. ENG-1255 * fix: translation source-language handling + env docs Address the full-feature review: - Undetermined source ("und") was coerced to a guessed base by likely-subtags, so the source==target short-circuit copied text untranslated. Guard against language.Und in sameLanguageAndScript so an undetermined tag always translates. - A source-language correction never re-translated: the provider only re-enqueued on value_text changes and the dedup hash ignored the source language. The provider now also enqueues on a language change, and the dedup hash folds in the source language (translationContentHash), so a correction produces a fresh job. - Drop the stray "backfill" mention from the job-args doc (no translation backfill exists) and document the TRANSLATION_* and TENANT_SETTINGS_CACHE_* env vars in .env.example, mirroring EMBEDDING_*. Tests: provider re-enqueues on a language change + the content hash varies by source language; worker translates (not copies) an "und" source. ENG-1255 * feat: wire translation enrichment into the api and worker ENG-1255: activate the translation pipeline, gated on TRANSLATION_PROVIDER + TRANSLATION_MODEL (disabled otherwise), mirroring the embedding wiring. - cmd/api: register the FeedbackTranslationWorker + translations queue (so the combined api process also translates, like embeddings) and register the TranslationProvider with the message manager, resolving the tenant target language through a short-TTL CachedTenantSettings over tenant settings. - cmd/worker: build the TranslationClient and populate RiverDeps so the dedicated worker process translates jobs. End to end: the provider enqueues on a feedback-record create/update for a text field whose tenant has a target language; the worker translates value_text (or copies when source==target) and persists it. ENG-1255 * feat: add translation backfill (command + service) ENG-1255: re-translate existing records when translation is first enabled or a tenant changes its target language. - repository.ListTranslationBackfillTargets joins tenant_settings to find text records with non-empty value_text whose tenant has a target language and whose stored translation_lang_key differs from it (never translated, or stale). - FeedbackRecordsService.BackfillTranslations enqueues a "backfill" job per target; the inserter/queue/attempts are caller-provided (a one-off command), so the shared service keeps no backfill-only dependency. - cmd/backfill-translations wires the client + a River producer and runs it, mirroring cmd/backfill-embeddings. Gated on TRANSLATION_PROVIDER+MODEL. Tests: service unit (enqueues one job per target; repo error propagates) and a Postgres integration test for the backfill query (untranslated/stale included, already-current and no-target excluded). ENG-1255 * test: add translation worker pipeline integration tests Drive FeedbackTranslationWorker end to end against Postgres with a fake TranslationClient: translate + persist (source value_text preserved), copy verbatim when source base+script matches the target (no provider call), and clear a stale translation when value_text is empty. Complements the repo-level SetTranslation and backfill-query integration tests. Also clarify the cmd/api comment: the API registers the translation worker only to satisfy River's insert-time validation; jobs are processed by hub-worker, not in the API process (mirrors the embedding wiring). ENG-1255 * feat: add OpenTelemetry metrics to the translation pipeline Mirror the embedding pipeline's metrics on the translation provider and worker so the new enrichment path is observable in production: hub_translation_jobs_enqueued_total hub_translation_provider_errors_total{reason} hub_translation_outcomes_total{status} hub_translation_worker_errors_total{reason} hub_translation_duration_seconds{status} TranslationMetrics mirrors EmbeddingMetrics (nil when metrics are disabled) and is wired through NewMetrics, RiverDeps, cmd/api, and cmd/worker. The provider records enqueue counts plus settings-read/enqueue errors; the worker records outcome + duration + worker errors at every branch (success, skip, clear, retry, failed_final). Backfill stays producer-only (nil metrics). Bounded label sets gate every reason/status to "other". Unlike the embedding worker — which emits "tenant_write_conflict" without listing it, so it buckets to "other" — the translation worker's reason set includes it, so the conflict metric is labeled accurately. Covered by observability metric tests (real SDK manual reader), provider/worker recording tests, and the existing pipeline tests. ENG-1255 * fix: address review — not-found classification, cache disable, langKey guard Worker: a not-found GetFeedbackRecord is a benign delete/purge race, not a terminal failure. Record it as "skipped" (consistent with the not-found-on-write path) instead of "failed_final" plus a worker error, so it no longer trips failure alerts. Config: honor an explicit TENANT_SETTINGS_CACHE_SIZE=0 to disable the cache (the documented behavior; NewCachedTenantSettings already treats size <= 0 as "no caching"). The previous `<= 0` default reset 0 to 2048, so disable could never take effect — now the size is defaulted only when the env var is unset. Service: SetTranslation rejects a (translated, "") pair via ErrTranslationLangKeyRequired so a translation can never persist without the locale it was produced in; clearing (nil translation) still passes through. Also: clear-path integration tests now assert translation_lang_key is nulled too, and the NewMetrics doc comment lists CacheMetrics. ENG-1255 * feat: re-translate a tenant's records when its target_language changes Changing a tenant's target_language previously had no effect on existing feedback records — only newly created/edited records were translated, and refreshing the backlog required the global backfill CLI. A settings change now automatically triggers a per-tenant re-translation backfill. Design (clean separation; generalizes to future enrichment settings): - TenantSettingsService gains a translation-free SettingsChangeListener port and fires it after a successful write with the keys that changed (PUT: all settable keys; PATCH: keys present, incl. a null removal; skipped on a no-op PATCH). A listener issue never fails the settings write. - EnrichmentSettingsListener (adapter) maps a changed key to the enrichment backfill it triggers via a config-built handler map: target_language enqueues a durable TenantTranslationBackfillArgs job. Not routed through the webhook-coupled, lossy event bus. Adding a future setting (e.g. sentiment_enabled) is one map entry. - TenantTranslationBackfillWorker keyset-paginates the tenant's stale records and enqueues the existing per-record FeedbackTranslationArgs jobs, off the request path. Unique by TenantID (one in-flight backfill per tenant); crash-safe via the idempotent "translation_lang_key IS DISTINCT FROM target_language" query. - Repo ListTranslationBackfillTargetsForTenant (keyset, shares SQL with the global query) + service BackfillTranslationsForTenant. Clearing target_language leaves existing translations in place (the query's non-empty guard makes the triggered backfill a no-op). The global backfill CLI remains the guaranteed recovery path. ENG-1255 * perf: stream the global translation backfill in keyset pages BackfillTranslations materialized every eligible target across all tenants in one slice; on a large deployment that is a memory spike when the CLI runs. It now keyset- paginates (id > cursor LIMIT n) like the per-tenant backfill, with the shared loop factored into backfillTranslationsPaged so both paths stream identically. ENG-1255 * feat: snooze translation jobs on provider rate limits (429) A provider 429 was treated as a generic error, so the translation worker burned through River's retry attempts on the fast backoff and dropped the work as failed_final, ignoring the provider's Retry-After / RetryInfo hint. The openai and google clients now classify a 429 (RESOURCE_EXHAUSTED) as a shared huberrors.RateLimitError carrying the provider's retry-after hint, and the worker snoozes for that delay (river.JobSnooze) instead of failing. Snoozing re-queues without consuming an attempt, so a burst against a rate-limited model defers rather than drops work. The delay is clamped (5s-5min, default 30s) and a per-job window (1h) bounds indefinite snoozing against a standing quota; past it the job fails normally and a backfill recovers it. A rate_limited worker-error metric records the deferral. * fix(translation): guard against stale-target overwrites in SetTranslation A feedback translation job carries the tenant's target language captured at enqueue time. If the target changed (or was read from a stale settings cache), an out-of-order older job could finish after a newer-target job and overwrite value_text_translated / translation_lang_key with the stale target. SetTranslation now persists only while the tenant's current target_language still matches the job's target (atomic UPDATE ... FROM tenant_settings); a stale write matches no row and returns huberrors.ErrTranslationSuperseded, which the worker records as a benign skip. The clear path stays unconditional. Adds a worker unit test, an end-to-end worker/repo regression test, and a distinct "superseded" worker-error metric label. * fix(translation): refresh settings cache on change and clear stale translation on edit Addresses two review findings on the tenant-settings / translation enqueue path: - Settings cache staleness: CachedTenantSettings now implements SettingsChangeListener and is composed with the backfill listener, so a settings write evicts the tenant's cached entry. A freshly enabled/changed target is visible to the enqueue gate immediately instead of after TTL, so records created in the former staleness window are no longer skipped. (Eviction is per-process; cross-replica stays TTL-bounded.) - Stale translation after a content edit: an Update that changes value_text or language now clears value_text_translated / translation_lang_key, but only when the value actually changes (CASE ... IS DISTINCT FROM the pre-update column). The row falls back to the original and becomes a backfill target, while an unchanged re-send keeps the valid translation (avoiding a deduped re-translation stranding it). Adds buildUpdateQuery + cache-eviction unit tests and an Update integration test. * feat: add TRANSLATION_DEFAULT_LANGUAGE fallback target Tenants with no target_language of their own fall back to a deployment-wide default (TRANSLATION_DEFAULT_LANGUAGE); an empty default keeps translation per-tenant opt-in. The fallback is applied at the enqueue gate and the global backfill. The SetTranslation write guard now also accepts a default-resolved write for a tenant with no stored target, while still rejecting stale writes for tenants that set an explicit target. * fix: make translation write-guard and per-tenant backfill default-aware The SetTranslation write-guard and the settings-change (per-tenant) backfill now resolve the tenant's effective target as COALESCE(its own target_language, TRANSLATION_DEFAULT_LANGUAGE), threaded through the service. A stale job carrying a tenant's former explicit target no longer writes once the tenant falls back to the default, and clearing a target re-translates existing records to the default instead of no-opping.
1 parent 0b66636 commit 0f4e36f

65 files changed

Lines changed: 6893 additions & 152 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.env.example

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,25 @@ WEBHOOK_MAX_COUNT=500
108108
# GOOGLE_APPLICATION_CREDENTIALS= (optional; for google-gemini when outside Google Cloud: path to service account key JSON)
109109
# EMBEDDING_MODEL=<model name> (required to enable embeddings; no default)
110110

111+
# Translation (language enrichment) is optional. To enable, set both TRANSLATION_PROVIDER and TRANSLATION_MODEL; if either is unset, translation is disabled and no translation jobs run.
112+
# Open-text feedback (value_text) is translated into each tenant's configured target_language (Hub tenant settings), falling back to TRANSLATION_DEFAULT_LANGUAGE when a tenant has none. Same providers/auth model as embeddings.
113+
# Configure identically in the API and worker processes (the API enqueues, the worker translates); otherwise jobs pile up unprocessed.
114+
# TRANSLATION_PROVIDER=openai
115+
# TRANSLATION_PROVIDER=google-gemini
116+
# TRANSLATION_PROVIDER_API_KEY=sk-... (required for openai and google; not used for google-gemini)
117+
# TRANSLATION_BASE_URL=https://llm.example.com/v1 (optional; only supported with TRANSLATION_PROVIDER=openai)
118+
# TRANSLATION_GOOGLE_CLOUD_PROJECT= (required for google-gemini; or use GOOGLE_CLOUD_PROJECT)
119+
# TRANSLATION_GOOGLE_CLOUD_LOCATION= (required for google-gemini, e.g. europe-west3; or use GOOGLE_CLOUD_LOCATION)
120+
# TRANSLATION_MODEL=<model name> (required to enable translation; no default; e.g. gemini-2.5-flash)
121+
# TRANSLATION_DEFAULT_LANGUAGE= (optional BCP-47 fallback target, e.g. en-US; used when a tenant has no target_language of its own. Empty = per-tenant opt-in only)
122+
# TRANSLATION_MAX_CONCURRENT=5 (worker concurrency; default 5)
123+
# TRANSLATION_MAX_ATTEMPTS=3 (River job retries before failing; default 3)
124+
125+
# Tenant settings cache (optional): an in-process LRU+TTL over tenant target_language reads on the translation enqueue path.
126+
# Set TENANT_SETTINGS_CACHE_SIZE=0 to disable the cache. Defaults: size 2048, TTL 60s.
127+
# TENANT_SETTINGS_CACHE_SIZE=2048
128+
# TENANT_SETTINGS_CACHE_TTL_SECONDS=60
129+
111130
# Tenant data purge (optional). How long DELETE /v1/tenants/{tenant_id}/data waits for in-flight
112131
# tenant-owned writes to drain before returning a retryable 409 (code tenant_write_conflict).
113132
# Must be a positive integer (seconds); non-positive values fall back to the default. Default: 5

cmd/api/app.go

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,16 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
174174
}
175175

176176
var (
177-
eventMetrics observability.EventMetrics
178-
webhookMetrics observability.WebhookMetrics
179-
embeddingMetrics observability.EmbeddingMetrics
177+
eventMetrics observability.EventMetrics
178+
webhookMetrics observability.WebhookMetrics
179+
embeddingMetrics observability.EmbeddingMetrics
180+
translationMetrics observability.TranslationMetrics
180181
)
181182
if metrics != nil {
182183
eventMetrics = metrics.Events
183184
webhookMetrics = metrics.Webhooks
184185
embeddingMetrics = metrics.Embeddings
186+
translationMetrics = metrics.Translation
185187
}
186188

187189
var tracerProvider *sdktrace.TracerProvider
@@ -241,6 +243,7 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
241243
nil, // riverClient set below after creation
242244
service.EmbeddingsQueueName,
243245
cfg.Embedding.MaxAttempts,
246+
cfg.Translation.DefaultLanguage,
244247
)
245248

246249
// Shared worker/queue registration first (webhook + optional embedding added below).
@@ -277,6 +280,40 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
277280
searchHandler = handlers.NewSearchHandler(nil) // 503 when embeddings disabled
278281
}
279282

283+
// Register the translation worker and declare its queue so the River client can
284+
// enqueue translation jobs (River requires the job kind registered and the queue
285+
// declared at insert time); the jobs are processed by hub-worker, not in this
286+
// process. Gated on TRANSLATION_PROVIDER+MODEL like embeddings; the enqueue
287+
// provider is registered below, after the River client and tenant settings exist.
288+
if cfg.Translation.Provider != "" && cfg.Translation.Model != "" {
289+
translationCfg := service.TranslationClientConfig{
290+
Provider: cfg.Translation.Provider,
291+
ProviderAPIKey: cfg.Translation.ProviderAPIKey,
292+
Model: cfg.Translation.Model,
293+
BaseURL: cfg.Translation.BaseURL,
294+
GoogleCloudProject: cfg.Translation.GoogleCloudProject,
295+
GoogleCloudLocation: cfg.Translation.GoogleCloudLocation,
296+
}
297+
298+
translationClient, translationErr := service.NewTranslationClient(context.Background(), translationCfg)
299+
if translationErr != nil {
300+
cleanupNewAppStartupFailure(context.Background(), messageManager, nil, tracerProvider, meterProvider)
301+
302+
return nil, fmt.Errorf("translation config: %w", translationErr)
303+
}
304+
305+
river.AddWorker(riverWorkers, workers.NewFeedbackTranslationWorker(feedbackRecordsService, translationClient, translationMetrics))
306+
307+
queues[service.TranslationsQueueName] = river.QueueConfig{MaxWorkers: 1}
308+
309+
// Per-tenant re-translation backfill, enqueued by the settings-change listener
310+
// below. Registered here only so the River client can validate the kind and queue
311+
// at insert time; the fan-out is processed by hub-worker.
312+
river.AddWorker(riverWorkers, workers.NewTenantTranslationBackfillWorker(feedbackRecordsService, cfg.Translation.MaxAttempts))
313+
314+
queues[service.TranslationBackfillsQueueName] = river.QueueConfig{MaxWorkers: 1}
315+
}
316+
280317
riverClient, err := river.NewClient(riverpgxv5.New(db), &river.Config{
281318
Queues: queues,
282319
Workers: riverWorkers,
@@ -321,6 +358,39 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
321358
tenantDataService := service.NewTenantDataService(tenantDataRepo)
322359
tenantDataHandler := handlers.NewTenantDataHandler(tenantDataService)
323360

361+
tenantSettingsRepo := repository.NewTenantSettingsRepository(db)
362+
tenantSettingsService := service.NewTenantSettingsService(tenantSettingsRepo)
363+
tenantSettingsHandler := handlers.NewTenantSettingsHandler(tenantSettingsService)
364+
365+
// Translation enqueue provider: on a feedback-record create/update it resolves the
366+
// tenant's target language (through a short-TTL cache over tenant settings) and
367+
// enqueues a translation job. Gated on TRANSLATION_PROVIDER+MODEL.
368+
if cfg.Translation.Provider != "" && cfg.Translation.Model != "" {
369+
var translationCacheMetrics observability.CacheMetrics
370+
if metrics != nil {
371+
translationCacheMetrics = metrics.Cache
372+
}
373+
374+
translationCache := service.NewCachedTenantSettings(
375+
tenantSettingsService,
376+
cfg.TenantSettingsCache.Size, cfg.TenantSettingsCache.TTL.Duration(),
377+
translationCacheMetrics,
378+
)
379+
messageManager.RegisterProvider(service.NewTranslationProvider(
380+
riverClient, translationCache, service.TranslationsQueueName, cfg.Translation.MaxAttempts,
381+
cfg.Translation.DefaultLanguage, translationMetrics))
382+
383+
// On a settings write: evict the tenant's cached settings (so a changed/enabled
384+
// target is visible to the enqueue gate immediately, not after TTL expiry) and
385+
// enqueue a per-tenant re-translation backfill (so existing records pick up a new
386+
// target, not only newly ingested ones).
387+
tenantSettingsService.SetSettingsChangeListener(service.NewCompositeSettingsChangeListener(
388+
translationCache,
389+
service.NewTranslationSettingsListener(
390+
riverClient, service.TranslationBackfillsQueueName, cfg.Translation.MaxAttempts),
391+
))
392+
}
393+
324394
taxonomyRepo := repository.NewTaxonomyRepository(db)
325395

326396
var taxonomyStarter service.TaxonomyRunStarter
@@ -358,7 +428,8 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
358428
}
359429

360430
server := newHTTPServer(
361-
cfg, healthHandler, openapiHandler, feedbackRecordsHandler, webhooksHandler, tenantDataHandler, searchHandler,
431+
cfg, healthHandler, openapiHandler, feedbackRecordsHandler, webhooksHandler, tenantDataHandler,
432+
tenantSettingsHandler, searchHandler,
362433
taxonomyHandler, taxonomyInternalHandler,
363434
meterProvider, tracerProvider,
364435
)
@@ -385,6 +456,7 @@ func newHTTPServer(
385456
feedback *handlers.FeedbackRecordsHandler,
386457
webhooks *handlers.WebhooksHandler,
387458
tenantData *handlers.TenantDataHandler,
459+
tenantSettings *handlers.TenantSettingsHandler,
388460
search *handlers.SearchHandler,
389461
taxonomy *handlers.TaxonomyHandler,
390462
taxonomyInternal *handlers.TaxonomyInternalHandler,
@@ -410,6 +482,9 @@ func newHTTPServer(
410482
protected.HandleFunc("PATCH /v1/webhooks/{id}", webhooks.Update)
411483
protected.HandleFunc("DELETE /v1/webhooks/{id}", webhooks.Delete)
412484
protected.HandleFunc("DELETE /v1/tenants/{tenant_id}/data", tenantData.Delete)
485+
protected.HandleFunc("GET /v1/tenants/{tenant_id}/settings", tenantSettings.Get)
486+
protected.HandleFunc("PUT /v1/tenants/{tenant_id}/settings", tenantSettings.Update)
487+
protected.HandleFunc("PATCH /v1/tenants/{tenant_id}/settings", tenantSettings.Patch)
413488

414489
// Search endpoints are always registered; when embeddings are disabled, the handler returns 503.
415490
protected.HandleFunc("POST /v1/feedback-records/search/semantic", search.SemanticSearch)

cmd/api/app_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ func newTestHTTPServerWithConfig(t *testing.T, publicBaseURL string, taxonomy co
374374
handlers.NewFeedbackRecordsHandler(nil),
375375
handlers.NewWebhooksHandler(nil),
376376
handlers.NewTenantDataHandler(nil),
377+
handlers.NewTenantSettingsHandler(nil),
377378
handlers.NewSearchHandler(nil),
378379
handlers.NewTaxonomyHandler(nil),
379380
handlers.NewTaxonomyInternalHandler(),

cmd/backfill-embeddings/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func run() int {
110110
nil, // inserter set below after River client is created
111111
service.EmbeddingsQueueName,
112112
maxAttempts,
113+
"", // translation default unused: embeddings backfill only
113114
)
114115

115116
embeddingClient, err := service.NewEmbeddingClient(ctx, embeddingCfg)

cmd/backfill-translations/main.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// backfill-translations enqueues River translation jobs for feedback records whose
2+
// tenant has a target language configured and whose value_text is not yet translated
3+
// to it (missing or stale). Run this one-off after enabling translation or changing a
4+
// tenant's target language; hub-worker (or the API process) runs the jobs.
5+
package main
6+
7+
import (
8+
"context"
9+
"errors"
10+
"fmt"
11+
"log/slog"
12+
"os"
13+
14+
"github.com/riverqueue/river"
15+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
16+
17+
"github.com/formbricks/hub/internal/config"
18+
"github.com/formbricks/hub/internal/repository"
19+
"github.com/formbricks/hub/internal/service"
20+
"github.com/formbricks/hub/internal/workers"
21+
"github.com/formbricks/hub/pkg/database"
22+
)
23+
24+
var (
25+
errTranslationProviderRequired = errors.New("TRANSLATION_PROVIDER is required")
26+
errTranslationModelRequired = errors.New("TRANSLATION_MODEL is required")
27+
)
28+
29+
const (
30+
defaultTranslationMaxAttempts = 3
31+
exitSuccess = 0
32+
exitFailure = 1
33+
)
34+
35+
func main() {
36+
os.Exit(run())
37+
}
38+
39+
func run() int {
40+
cfg, err := config.Load()
41+
if err != nil {
42+
slog.Error("Failed to load configuration", "error", err)
43+
44+
return exitFailure
45+
}
46+
47+
if cfg.Database.URL == "" || cfg.Database.URL == config.DefaultDatabaseURL {
48+
slog.Error("DATABASE_URL must be set explicitly for this binary (do not use the default test URL)")
49+
50+
return exitFailure
51+
}
52+
53+
if cfg.Translation.Provider == "" {
54+
slog.Error(errTranslationProviderRequired.Error())
55+
56+
return exitFailure
57+
}
58+
59+
if cfg.Translation.Model == "" {
60+
slog.Error(errTranslationModelRequired.Error())
61+
62+
return exitFailure
63+
}
64+
65+
maxAttempts := cfg.Translation.MaxAttempts
66+
if maxAttempts <= 0 {
67+
maxAttempts = defaultTranslationMaxAttempts
68+
}
69+
70+
ctx := context.Background()
71+
72+
db, err := database.NewPostgresPool(ctx, cfg.Database.URL,
73+
database.WithPoolConfig(cfg.Database.PoolConfig()),
74+
)
75+
if err != nil {
76+
slog.Error("Failed to connect to database", "error", err)
77+
78+
return exitFailure
79+
}
80+
defer db.Close()
81+
82+
translationCfg := service.TranslationClientConfig{
83+
Provider: cfg.Translation.Provider,
84+
ProviderAPIKey: cfg.Translation.ProviderAPIKey,
85+
Model: cfg.Translation.Model,
86+
BaseURL: cfg.Translation.BaseURL,
87+
GoogleCloudProject: cfg.Translation.GoogleCloudProject,
88+
GoogleCloudLocation: cfg.Translation.GoogleCloudLocation,
89+
}
90+
91+
translationClient, err := service.NewTranslationClient(ctx, translationCfg)
92+
if err != nil {
93+
slog.Error("Failed to create translation client", "error", err)
94+
95+
return exitFailure
96+
}
97+
98+
repo := repository.NewFeedbackRecordsRepository(db)
99+
feedbackRecordsService := service.NewFeedbackRecordsService(repo, nil, "", nil, nil, "", 0, cfg.Translation.DefaultLanguage)
100+
101+
// Producer-only: we only enqueue jobs; workers run in hub-worker (or the API process).
102+
// River requires the job kind registered (worker added) and MaxWorkers > 0 for a declared queue.
103+
riverWorkers := river.NewWorkers()
104+
river.AddWorker(riverWorkers, workers.NewFeedbackTranslationWorker(feedbackRecordsService, translationClient, nil))
105+
106+
riverClient, err := river.NewClient(riverpgxv5.New(db), &river.Config{
107+
Queues: map[string]river.QueueConfig{
108+
service.TranslationsQueueName: {MaxWorkers: 1},
109+
},
110+
Workers: riverWorkers,
111+
})
112+
if err != nil {
113+
slog.Error("Failed to create River client", "error", err)
114+
115+
return exitFailure
116+
}
117+
118+
enqueued, err := feedbackRecordsService.BackfillTranslations(
119+
ctx, riverClient, service.TranslationsQueueName, maxAttempts)
120+
if err != nil {
121+
slog.Error("Backfill failed", "error", err)
122+
123+
return exitFailure
124+
}
125+
126+
slog.Info("Backfill complete", "enqueued", enqueued)
127+
128+
fmt.Printf("Enqueued %d translation job(s).\n", enqueued)
129+
130+
return exitSuccess
131+
}

cmd/worker/app.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,15 @@ func NewWorkerApp(cfg *config.Config, db *pgxpool.Pool) (*WorkerApp, error) {
6868
webhooksRepo := repository.NewWebhooksRepository(db)
6969

7070
var (
71-
webhookMetrics observability.WebhookMetrics
72-
embeddingMetrics observability.EmbeddingMetrics
71+
webhookMetrics observability.WebhookMetrics
72+
embeddingMetrics observability.EmbeddingMetrics
73+
translationMetrics observability.TranslationMetrics
7374
)
7475

7576
if metrics != nil {
7677
webhookMetrics = metrics.Webhooks
7778
embeddingMetrics = metrics.Embeddings
79+
translationMetrics = metrics.Translation
7880
}
7981

8082
webhookSender := service.NewWebhookSenderImpl(
@@ -121,6 +123,7 @@ func NewWorkerApp(cfg *config.Config, db *pgxpool.Pool) (*WorkerApp, error) {
121123
nil,
122124
service.EmbeddingsQueueName,
123125
cfg.Embedding.MaxAttempts,
126+
"", // translation default unused: this service handles embeddings only
124127
)
125128
docPrefix := service.EmbeddingPrefixForProvider(providerName)
126129

@@ -130,6 +133,36 @@ func NewWorkerApp(cfg *config.Config, db *pgxpool.Pool) (*WorkerApp, error) {
130133
deps.EmbeddingMetrics = embeddingMetrics
131134
}
132135

136+
if cfg.Translation.Provider != "" && cfg.Translation.Model != "" {
137+
translationCfg := service.TranslationClientConfig{
138+
Provider: cfg.Translation.Provider,
139+
ProviderAPIKey: cfg.Translation.ProviderAPIKey,
140+
Model: cfg.Translation.Model,
141+
BaseURL: cfg.Translation.BaseURL,
142+
GoogleCloudProject: cfg.Translation.GoogleCloudProject,
143+
GoogleCloudLocation: cfg.Translation.GoogleCloudLocation,
144+
}
145+
146+
translationClient, err := service.NewTranslationClient(context.Background(), translationCfg)
147+
if err != nil {
148+
shutdownObservability(context.Background(), meterProvider, tracerProvider)
149+
150+
return nil, fmt.Errorf("translation config: %w", err)
151+
}
152+
153+
// The translation worker only reads the record and writes the translation, so
154+
// the embedding-specific service params are unused here.
155+
translationRecordsRepo := repository.NewFeedbackRecordsRepository(db)
156+
translationRecordsService := service.NewFeedbackRecordsService(
157+
translationRecordsRepo, nil, "", nil, nil, "", 0, cfg.Translation.DefaultLanguage)
158+
159+
deps.TranslationService = translationRecordsService
160+
deps.TranslationClient = translationClient
161+
deps.TranslationMetrics = translationMetrics
162+
deps.TranslationBackfillService = translationRecordsService
163+
deps.TranslationMaxAttempts = cfg.Translation.MaxAttempts
164+
}
165+
133166
riverWorkers, queues := workers.NewRiverWorkersAndQueues(cfg, deps, 0)
134167

135168
riverCfg := &river.Config{

0 commit comments

Comments
 (0)