@@ -6,8 +6,10 @@ import (
66 "fmt"
77 "log/slog"
88 "net/http"
9+ "strings"
910 "time"
1011
12+ lru "github.com/hashicorp/golang-lru/v2"
1113 "github.com/jackc/pgx/v5"
1214 "github.com/jackc/pgx/v5/pgxpool"
1315 "github.com/riverqueue/river"
@@ -41,7 +43,10 @@ type App struct {
4143 metrics * observability.Metrics
4244}
4345
44- var errUnsupportedEmbeddingProvider = errors .New ("unsupported embedding provider" )
46+ var (
47+ errUnsupportedEmbeddingProvider = errors .New ("unsupported embedding provider" )
48+ errEmbeddingProviderAPIKeyRequired = errors .New ("EMBEDDING_PROVIDER_API_KEY is required for this provider" )
49+ )
4550
4651const (
4752 embeddingProviderOpenAI = "openai"
@@ -55,22 +60,25 @@ var supportedEmbeddingProviders = map[string]struct{}{
5560
5661const riverQueueDepthInterval = 15 * time .Second
5762
58- // embeddingProviderAndModel returns (provider, model) when embeddings are enabled: EMBEDDING_PROVIDER
59- // is set and supported. Model and API key are optional (e.g. local provider may not need them).
60- // Otherwise returns ("", "") so no embedding provider or jobs run. Embeddings are optional; no defaults.
63+ // embeddingProviderAndModel returns (provider, model) when embeddings are enabled: both EMBEDDING_PROVIDER
64+ // and EMBEDDING_MODEL must be set and the provider must be supported. Otherwise returns ("", "") so no
65+ // embedding provider or jobs run. No default for model; embeddings are disabled if either is unset.
66+ // Provider name is normalized to lowercase so that "OpenAI", "openai", and "OPENAI" behave the same
67+ // (consistent with backfill-embeddings and EmbeddingPrefixForProvider).
6168func embeddingProviderAndModel (cfg * config.Config ) (provider , model string ) {
62- if cfg .EmbeddingProvider == "" {
69+ if cfg .EmbeddingProvider == "" || cfg . EmbeddingModel == "" {
6370 return "" , ""
6471 }
6572
66- if _ , ok := supportedEmbeddingProviders [cfg .EmbeddingProvider ]; ! ok {
73+ providerCanonical := strings .ToLower (strings .TrimSpace (cfg .EmbeddingProvider ))
74+ if _ , ok := supportedEmbeddingProviders [providerCanonical ]; ! ok {
6775 slog .Info ("embeddings disabled: unsupported EMBEDDING_PROVIDER" ,
6876 "provider" , cfg .EmbeddingProvider , "model" , cfg .EmbeddingModel )
6977
7078 return "" , ""
7179 }
7280
73- return cfg . EmbeddingProvider , cfg .EmbeddingModel
81+ return providerCanonical , cfg .EmbeddingModel
7482}
7583
7684// setupMetrics creates meter provider and hub metrics when metrics are enabled.
@@ -165,17 +173,19 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
165173 river .AddWorker (riverWorkers , webhookWorker )
166174
167175 queues := map [string ]river.QueueConfig {
168- river .QueueDefault : {MaxWorkers : cfg .WebhookDeliveryMaxConcurrent },
169- service .EmbeddingsQueueName : {MaxWorkers : cfg .EmbeddingMaxConcurrent },
176+ river .QueueDefault : {MaxWorkers : cfg .WebhookDeliveryMaxConcurrent },
170177 }
171178
172179 feedbackRecordsRepo := repository .NewFeedbackRecordsRepository (db )
173180 embeddingsRepo := repository .NewEmbeddingsRepository (db )
174181 embeddingProviderName , embeddingModel := embeddingProviderAndModel (cfg )
175- // Model for DB/jobs: required for embeddings.model column; use "default" when provider has no model name (e.g. local ).
182+ // Model for DB/jobs: required for embeddings.model column; only set when embeddings are enabled (both provider and model set ).
176183 embeddingModelForDB := embeddingModel
177- if embeddingModelForDB == "" {
178- embeddingModelForDB = "default"
184+
185+ var embeddingDocPrefix string
186+ if embeddingProviderName != "" {
187+ embeddingDocPrefix = service .EmbeddingPrefixForProvider (embeddingProviderName )
188+ queues [service .EmbeddingsQueueName ] = river.QueueConfig {MaxWorkers : cfg .EmbeddingMaxConcurrent }
179189 }
180190
181191 feedbackRecordsService := service .NewFeedbackRecordsService (
@@ -188,17 +198,27 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
188198 cfg .EmbeddingMaxAttempts ,
189199 )
190200
201+ var searchHandler * handlers.SearchHandler
202+
191203 if embeddingProviderName != "" {
204+ // Fail fast when a provider that requires an API key is configured without one (consistent with backfill-embeddings).
205+ if (embeddingProviderName == embeddingProviderOpenAI || embeddingProviderName == embeddingProviderGoogle ) &&
206+ cfg .EmbeddingProviderAPIKey == "" {
207+ return nil , fmt .Errorf ("%w: %s" , errEmbeddingProviderAPIKeyRequired , embeddingProviderName )
208+ }
209+
192210 var embeddingClient service.EmbeddingClient
193211
194212 switch embeddingProviderName {
195213 case embeddingProviderOpenAI :
196214 embeddingClient = openai .NewClient (cfg .EmbeddingProviderAPIKey ,
197215 openai .WithModel (embeddingModel ),
216+ openai .WithNormalize (cfg .EmbeddingNormalize ),
198217 )
199218 case embeddingProviderGoogle :
200219 googleClient , err := googleai .NewClient (context .Background (), cfg .EmbeddingProviderAPIKey ,
201220 googleai .WithModel (embeddingModel ),
221+ googleai .WithNormalize (cfg .EmbeddingNormalize ),
202222 )
203223 if err != nil {
204224 return nil , fmt .Errorf ("create google embedding client: %w" , err )
@@ -209,8 +229,33 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
209229 return nil , fmt .Errorf ("%w: %s" , errUnsupportedEmbeddingProvider , embeddingProviderName )
210230 }
211231
212- embeddingWorker := workers .NewFeedbackEmbeddingWorker (feedbackRecordsService , embeddingClient , embeddingMetrics )
232+ embeddingWorker := workers .NewFeedbackEmbeddingWorker (
233+ feedbackRecordsService , embeddingClient , embeddingDocPrefix , embeddingMetrics )
213234 river .AddWorker (riverWorkers , embeddingWorker )
235+
236+ const searchQueryCacheSize = 1000
237+
238+ queryCache , err := lru.New [string , []float32 ](searchQueryCacheSize )
239+ if err != nil {
240+ return nil , fmt .Errorf ("create search query cache: %w" , err )
241+ }
242+
243+ var cacheMetrics observability.CacheMetrics
244+ if metrics != nil {
245+ cacheMetrics = metrics .Cache
246+ }
247+
248+ searchService := service .NewSearchService (service.SearchServiceParams {
249+ EmbeddingClient : embeddingClient ,
250+ EmbeddingsRepo : embeddingsRepo ,
251+ Model : embeddingModel ,
252+ QueryCache : queryCache ,
253+ CacheMetrics : cacheMetrics ,
254+ Logger : slog .Default (),
255+ })
256+ searchHandler = handlers .NewSearchHandler (searchService )
257+ } else {
258+ searchHandler = handlers .NewSearchHandler (nil ) // 503 when embeddings disabled
214259 }
215260
216261 riverClient , err := river .NewClient (riverpgxv5 .New (db ), & river.Config {
@@ -252,6 +297,7 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
252297 embeddingModelForDB ,
253298 service .EmbeddingsQueueName ,
254299 cfg .EmbeddingMaxAttempts ,
300+ embeddingDocPrefix ,
255301 embeddingMetrics ,
256302 )
257303 messageManager .RegisterProvider (embeddingProv )
@@ -264,7 +310,7 @@ func NewApp(cfg *config.Config, db *pgxpool.Pool) (*App, error) {
264310 healthHandler := handlers .NewHealthHandler ()
265311
266312 server := newHTTPServer (
267- cfg , healthHandler , feedbackRecordsHandler , webhooksHandler ,
313+ cfg , healthHandler , feedbackRecordsHandler , webhooksHandler , searchHandler ,
268314 meterProvider , tracerProvider ,
269315 )
270316
@@ -287,6 +333,7 @@ func newHTTPServer(
287333 health * handlers.HealthHandler ,
288334 feedback * handlers.FeedbackRecordsHandler ,
289335 webhooks * handlers.WebhooksHandler ,
336+ search * handlers.SearchHandler ,
290337 meterProvider * sdkmetric.MeterProvider ,
291338 tracerProvider * sdktrace.TracerProvider ,
292339) * http.Server {
@@ -307,6 +354,10 @@ func newHTTPServer(
307354 protected .HandleFunc ("PATCH /v1/webhooks/{id}" , webhooks .Update )
308355 protected .HandleFunc ("DELETE /v1/webhooks/{id}" , webhooks .Delete )
309356
357+ // Search endpoints are always registered; when embeddings are disabled, the handler returns 503.
358+ protected .HandleFunc ("POST /v1/feedback-records/search/semantic" , search .SemanticSearch )
359+ protected .HandleFunc ("GET /v1/feedback-records/{id}/similar" , search .SimilarFeedback )
360+
310361 protectedWithAuth := middleware .Auth (cfg .APIKey )(protected )
311362 mux := http .NewServeMux ()
312363 mux .Handle ("/v1/" , protectedWithAuth )
0 commit comments