diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index e8272ecd237..9f8b0349da8 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -311,7 +311,7 @@ func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int, if err != nil { return err } - ser := server.NewGrpcServingServiceServer(s.fs, loggingService) + ser := server.NewGrpcServingServiceServer(s.fs, loggingService, nil, nil) log.Printf("Starting a gRPC server on host %s port %d\n", host, port) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { @@ -366,7 +366,7 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int, if err != nil { return err } - ser := server.NewHttpServer(s.fs, loggingService) + ser := server.NewHttpServer(s.fs, loggingService, nil, nil) log.Printf("Starting a HTTP server on host %s port %d\n", host, port) go func() { diff --git a/go/internal/feast/metrics/client.go b/go/internal/feast/metrics/client.go new file mode 100644 index 00000000000..f0e38208fc6 --- /dev/null +++ b/go/internal/feast/metrics/client.go @@ -0,0 +1,12 @@ +package metrics + +// StatsdClient wraps DogStatsD so tests can inject a fake. +// The real github.com/DataDog/datadog-go/v5/statsd.Client satisfies this interface. +type StatsdClient interface { + Count(name string, value int64, tags []string, rate float64) error +} + +// NoOpStatsdClient does nothing when metrics are disabled. +type NoOpStatsdClient struct{} + +func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil } diff --git a/go/internal/feast/metrics/config.go b/go/internal/feast/metrics/config.go new file mode 100644 index 00000000000..316715e619f --- /dev/null +++ b/go/internal/feast/metrics/config.go @@ -0,0 +1,37 @@ +package metrics + +import ( + "fmt" + "os" + "strings" + + "github.com/feast-dev/feast/go/internal/feast/registry" +) + +func IsMissingKeyMetricsEnabled() bool { + return strings.ToLower(os.Getenv("ENABLE_MISSING_KEY_METRICS")) == "true" +} + +func GetOnlineStoreType(config *registry.RepoConfig) string { + if storeType, ok := config.OnlineStore["type"]; ok { + return fmt.Sprintf("%v", storeType) + } + return "unknown" +} + +// GetStatsDAddress returns the DogStatsD address from environment variables. +// Returns empty string if DD_AGENT_HOST is not set. +// Port can be configured via DD_DOGSTATSD_PORT (defaults to 8125). +func GetStatsDAddress() string { + host := os.Getenv("DD_AGENT_HOST") + if host == "" { + return "" + } + + port := os.Getenv("DD_DOGSTATSD_PORT") + if port == "" { + port = "8125" + } + + return fmt.Sprintf("%s:%s", host, port) +} diff --git a/go/internal/feast/metrics/config_test.go b/go/internal/feast/metrics/config_test.go new file mode 100644 index 00000000000..0336ab4deb1 --- /dev/null +++ b/go/internal/feast/metrics/config_test.go @@ -0,0 +1,70 @@ +package metrics + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetStatsDAddress(t *testing.T) { + tests := []struct { + name string + host string + port string + expected string + }{ + { + name: "no env vars set", + host: "", + port: "", + expected: "", + }, + { + name: "only host set, default port", + host: "datadog-agent", + port: "", + expected: "datadog-agent:8125", + }, + { + name: "host and custom port", + host: "datadog-agent", + port: "9125", + expected: "datadog-agent:9125", + }, + { + name: "localhost with default port", + host: "localhost", + port: "", + expected: "localhost:8125", + }, + { + name: "IP address with custom port", + host: "10.0.0.5", + port: "8126", + expected: "10.0.0.5:8126", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set env vars + if tt.host != "" { + os.Setenv("DD_AGENT_HOST", tt.host) + defer os.Unsetenv("DD_AGENT_HOST") + } else { + os.Unsetenv("DD_AGENT_HOST") + } + + if tt.port != "" { + os.Setenv("DD_DOGSTATSD_PORT", tt.port) + defer os.Unsetenv("DD_DOGSTATSD_PORT") + } else { + os.Unsetenv("DD_DOGSTATSD_PORT") + } + + result := GetStatsDAddress() + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go new file mode 100644 index 00000000000..334b1555934 --- /dev/null +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -0,0 +1,140 @@ +package metrics + +import ( + "math/rand" + "os" + "strconv" + "strings" + + "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/protos/feast/serving" +) + +// extractFeatureView extracts the feature view name from a full feature name. +// Feature names follow the format: feature_view__feature_name +// Example: "hotel_fv__price" -> "hotel_fv" +func extractFeatureView(featureName string) string { + parts := strings.SplitN(featureName, "__", 2) + if len(parts) == 2 { + return parts[0] + } + return "unknown" +} + +type LookupMetricsAggregator struct { + notFound map[string]int64 + nullOrExpired map[string]int64 + project string + onlineStore string + client StatsdClient + sampleRate float64 +} + +func NewLookupMetricsAggregator( + project, onlineStore string, + client StatsdClient, +) *LookupMetricsAggregator { + if client == nil { + return nil + } + + // Read sampling rate from environment (default: 1.0 = no sampling) + sampleRate := 1.0 + if rateStr := os.Getenv("FEAST_METRICS_SAMPLE_RATE"); rateStr != "" { + if rate, err := strconv.ParseFloat(rateStr, 64); err == nil { + if rate > 0 && rate <= 1.0 { + sampleRate = rate + } + } + } + + return &LookupMetricsAggregator{ + notFound: make(map[string]int64), + nullOrExpired: make(map[string]int64), + project: project, + onlineStore: onlineStore, + client: client, + sampleRate: sampleRate, + } +} + +func (m *LookupMetricsAggregator) Record(featureID string, status serving.FieldStatus) { + if m == nil { + return + } + switch status { + case serving.FieldStatus_NOT_FOUND: + m.notFound[featureID]++ + case serving.FieldStatus_NULL_VALUE, serving.FieldStatus_OUTSIDE_MAX_AGE: + m.nullOrExpired[featureID]++ + } +} + +func (m *LookupMetricsAggregator) RecordFromFeatureVectors(vectors []*onlineserving.FeatureVector) { + if m == nil { + return + } + for _, vector := range vectors { + for _, status := range vector.Statuses { + m.Record(vector.Name, status) + } + } +} + +func (m *LookupMetricsAggregator) RecordFromRangeFeatureVectors(vectors []*onlineserving.RangeFeatureVector) { + if m == nil { + return + } + for _, vector := range vectors { + for _, entityStatuses := range vector.RangeStatuses { + for _, status := range entityStatuses { + m.Record(vector.Name, status) + } + } + } +} + +func (m *LookupMetricsAggregator) Emit() { + if m == nil || m.client == nil { + return + } + + // Probabilistic sampling: skip this request's metrics based on sample_rate + if m.sampleRate < 1.0 && rand.Float64() > m.sampleRate { + return + } + + // Calculate multiplier to preserve statistical accuracy + // If sampleRate=0.1, we only emit 10% of the time, so multiply counts by 10 + multiplier := 1.0 / m.sampleRate + + baseTags := []string{ + "project:" + m.project, + "online_store_type:" + m.onlineStore, + } + + for featureID, count := range m.notFound { + if count == 0 { + continue + } + // Adjust count to preserve accuracy when sampling + adjustedCount := int64(float64(count) * multiplier) + tags := make([]string, len(baseTags)+2) + copy(tags, baseTags) + tags[len(baseTags)] = "feature:" + featureID + tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) + m.client.Count("mlpfs.featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0) + } + + for featureID, count := range m.nullOrExpired { + if count == 0 { + continue + } + adjustedCount := int64(float64(count) * multiplier) + tags := make([]string, len(baseTags)+2) + copy(tags, baseTags) + tags[len(baseTags)] = "feature:" + featureID + tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) + m.client.Count("mlpfs.featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) + } +} diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go new file mode 100644 index 00000000000..b4f1651bf52 --- /dev/null +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -0,0 +1,314 @@ +package metrics + +import ( + "os" + "testing" + + "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/stretchr/testify/assert" +) + +type metricCall struct { + name string + value int64 + tags []string +} + +type fakeStatsdClient struct { + calls []metricCall +} + +func (f *fakeStatsdClient) Count(name string, value int64, tags []string, rate float64) error { + f.calls = append(f.calls, metricCall{name: name, value: value, tags: tags}) + return nil +} + +func newTestAggregator(client StatsdClient) *LookupMetricsAggregator { + return NewLookupMetricsAggregator("test_project", "redis", client) +} + +func TestAggregator_AllNotFound(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) + agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) + agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, "mlpfs.featureserver.feature_lookup_not_found", fake.calls[0].name) + assert.Equal(t, int64(3), fake.calls[0].value) + assert.Contains(t, fake.calls[0].tags, "feature:user_fv__age") +} + +func TestAggregator_AllNullOrExpired(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("order_fv__amt", serving.FieldStatus_NULL_VALUE) + agg.Record("order_fv__amt", serving.FieldStatus_NULL_VALUE) + agg.Record("order_fv__amt", serving.FieldStatus_OUTSIDE_MAX_AGE) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, "mlpfs.featureserver.feature_lookup_null_or_expired", fake.calls[0].name) + assert.Equal(t, int64(3), fake.calls[0].value) + assert.Contains(t, fake.calls[0].tags, "feature:order_fv__amt") +} + +func TestAggregator_MixedStatuses(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("fv_a__f1", serving.FieldStatus_PRESENT) + agg.Record("fv_a__f1", serving.FieldStatus_NOT_FOUND) + agg.Record("fv_b__f2", serving.FieldStatus_NULL_VALUE) + agg.Record("fv_b__f2", serving.FieldStatus_PRESENT) + agg.Record("fv_b__f2", serving.FieldStatus_OUTSIDE_MAX_AGE) + agg.Emit() + + assert.Len(t, fake.calls, 2) + + callsByName := map[string]metricCall{} + for _, c := range fake.calls { + callsByName[c.name+":"+findTag(c.tags, "feature:")] = c + } + + nf := callsByName["mlpfs.featureserver.feature_lookup_not_found:fv_a__f1"] + assert.Equal(t, int64(1), nf.value) + + ne := callsByName["mlpfs.featureserver.feature_lookup_null_or_expired:fv_b__f2"] + assert.Equal(t, int64(2), ne.value) +} + +func TestAggregator_AllPresent(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("fv__f1", serving.FieldStatus_PRESENT) + agg.Record("fv__f1", serving.FieldStatus_PRESENT) + agg.Emit() + + assert.Len(t, fake.calls, 0) +} + +func TestAggregator_NilSafe(t *testing.T) { + var agg *LookupMetricsAggregator + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.RecordFromFeatureVectors(nil) + agg.RecordFromRangeFeatureVectors(nil) + agg.Emit() +} + +func TestAggregator_NilClient(t *testing.T) { + agg := NewLookupMetricsAggregator("p", "r", nil) + assert.Nil(t, agg) +} + +func TestAggregator_Tags(t *testing.T) { + fake := &fakeStatsdClient{} + agg := NewLookupMetricsAggregator("mlpfs", "eg-valkey", fake) + + agg.Record("hotel_fv__price", serving.FieldStatus_NOT_FOUND) + agg.Emit() + + assert.Len(t, fake.calls, 1) + tags := fake.calls[0].tags + assert.Contains(t, tags, "project:mlpfs") + assert.Contains(t, tags, "online_store_type:eg-valkey") + assert.Contains(t, tags, "feature:hotel_fv__price") + assert.Contains(t, tags, "feature_view:hotel_fv") +} + +func TestRecordFromFeatureVectors(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + vectors := []*onlineserving.FeatureVector{ + { + Name: "fv_a__f1", + Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT, serving.FieldStatus_NOT_FOUND}, + }, + { + Name: "fv_a__f2", + Statuses: []serving.FieldStatus{serving.FieldStatus_NOT_FOUND, serving.FieldStatus_NOT_FOUND}, + }, + } + + agg.RecordFromFeatureVectors(vectors) + agg.Emit() + + assert.Len(t, fake.calls, 2) + + callsByFeature := map[string]int64{} + for _, c := range fake.calls { + callsByFeature[findTag(c.tags, "feature:")] = c.value + } + assert.Equal(t, int64(1), callsByFeature["fv_a__f1"]) + assert.Equal(t, int64(2), callsByFeature["fv_a__f2"]) +} + +func TestRecordFromRangeFeatureVectors(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + vectors := []*onlineserving.RangeFeatureVector{ + { + Name: "sfv__f1", + RangeStatuses: [][]serving.FieldStatus{ + {serving.FieldStatus_PRESENT, serving.FieldStatus_NOT_FOUND}, + {serving.FieldStatus_NOT_FOUND}, + }, + }, + } + + agg.RecordFromRangeFeatureVectors(vectors) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, int64(2), fake.calls[0].value) + assert.Equal(t, "mlpfs.featureserver.feature_lookup_not_found", fake.calls[0].name) +} + +func TestIsMissingKeyMetricsEnabled(t *testing.T) { + os.Unsetenv("ENABLE_MISSING_KEY_METRICS") + assert.False(t, IsMissingKeyMetricsEnabled()) + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "true") + assert.True(t, IsMissingKeyMetricsEnabled()) + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "TRUE") + assert.True(t, IsMissingKeyMetricsEnabled()) + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "false") + assert.False(t, IsMissingKeyMetricsEnabled()) + + os.Unsetenv("ENABLE_MISSING_KEY_METRICS") +} + +func TestGetOnlineStoreType(t *testing.T) { +} + +func TestExtractFeatureView(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {"standard format", "hotel_fv__price", "hotel_fv"}, + {"with underscore in feature", "hotel_fv__review_score_avg", "hotel_fv"}, + {"multiple feature views", "user_fv__age", "user_fv"}, + {"long feature view name", "ranking_signals_fv__score", "ranking_signals_fv"}, + {"no double underscore", "age", "unknown"}, + {"colon separator", "hotel_fv:price", "unknown"}, + {"empty string", "", "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := extractFeatureView(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +// findTag extracts the value portion of a tag matching the given prefix. +func findTag(tags []string, prefix string) string { + for _, tag := range tags { + if len(tag) > len(prefix) && tag[:len(prefix)] == prefix { + return tag[len(prefix):] + } + } + return "" +} + +func TestSampling_DefaultNoSampling(t *testing.T) { + os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + assert.Equal(t, 1.0, agg.sampleRate, "Default sample rate should be 1.0") +} + +func TestSampling_ReadFromEnv(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "0.5") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + assert.Equal(t, 0.5, agg.sampleRate, "Should read sample rate from environment") +} + +func TestSampling_InvalidValues(t *testing.T) { + testCases := []struct { + value string + expected float64 + }{ + {"-0.5", 1.0}, // Negative + {"1.5", 1.0}, // > 1.0 + {"0", 1.0}, // Zero + {"abc", 1.0}, // Non-numeric + {"", 1.0}, // Empty (unset uses default) + } + + for _, tc := range testCases { + t.Run(tc.value, func(t *testing.T) { + if tc.value == "" { + os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + } else { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", tc.value) + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + } + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + assert.Equal(t, tc.expected, agg.sampleRate) + }) + } +} + +func TestSampling_AdjustsCountsCorrectly(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "0.5") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + // Record 2 missing keys + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + + // Try multiple times to ensure at least one emit happens + emitted := false + for i := 0; i < 50; i++ { + fake.calls = nil + agg.Emit() + if len(fake.calls) > 0 { + emitted = true + // With sample_rate=0.5, count of 2 should become 4 (2 / 0.5) + assert.Equal(t, int64(4), fake.calls[0].value, "Count should be adjusted by 1/sample_rate") + break + } + } + + assert.True(t, emitted, "Should have emitted at least once in 50 tries") +} + +func TestSampling_NoAdjustmentWhenNotSampling(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "1.0") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, int64(2), fake.calls[0].value, "Count should not be adjusted with sample_rate=1.0") +} diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index 5077683b68e..d9a928c7edb 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -6,6 +6,8 @@ import ( "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/errors" + "github.com/feast-dev/feast/go/internal/feast/metrics" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/internal/feast/version" "github.com/feast-dev/feast/go/protos/feast/serving" @@ -27,11 +29,13 @@ const feastServerVersion = "0.0.1" type grpcServingServiceServer struct { fs *feast.FeatureStore loggingService *logging.LoggingService + metricsClient metrics.StatsdClient + config *registry.RepoConfig serving.UnimplementedServingServiceServer } -func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *grpcServingServiceServer { - return &grpcServingServiceServer{fs: fs, loggingService: loggingService} +func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) *grpcServingServiceServer { + return &grpcServingServiceServer{fs: fs, loggingService: loggingService, metricsClient: metricsClient, config: config} } func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { @@ -86,6 +90,16 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques return nil, errors.GrpcFromError(err) } + if s.metricsClient != nil && s.config != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + agg.RecordFromFeatureVectors(featureVectors) + agg.Emit() + } + resp := &serving.GetOnlineFeaturesResponse{ Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, 0), Metadata: &serving.GetOnlineFeaturesResponseMetadata{ @@ -168,6 +182,16 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r return nil, errors.GrpcFromError(err) } + if s.metricsClient != nil && s.config != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) + agg.Emit() + } + entities := request.GetEntities() results := make([]*serving.GetOnlineFeaturesRangeResponse_RangeFeatureVector, 0, len(rangeFeatureVectors)) featureNames := make([]string, 0, len(rangeFeatureVectors)) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 68e71e0aabe..73538f43dd8 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -18,8 +18,10 @@ import ( httptrace "github.com/DataDog/dd-trace-go/contrib/net/http/v2" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/metrics" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" @@ -30,6 +32,8 @@ import ( type HttpServer struct { fs *feast.FeatureStore loggingService *logging.LoggingService + metricsClient metrics.StatsdClient + config *registry.RepoConfig server *http.Server } @@ -320,8 +324,8 @@ type getOnlineFeaturesRequest struct { RequestContext map[string]repeatedValue `json:"request_context"` } -func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *HttpServer { - return &HttpServer{fs: fs, loggingService: loggingService} +func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) *HttpServer { + return &HttpServer{fs: fs, loggingService: loggingService, metricsClient: metricsClient, config: config} } func parseIncludeMetadata(r *http.Request) (bool, error) { @@ -424,6 +428,16 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { return } + if s.metricsClient != nil && s.config != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + agg.RecordFromFeatureVectors(featureVectors) + agg.Emit() + } + var featureNames []string var results []map[string]interface{} for _, vector := range featureVectors { @@ -628,6 +642,16 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque return } + if s.metricsClient != nil && s.config != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) + agg.Emit() + } + featureNames, entities, results, err := processFeatureVectors( rangeFeatureVectors, includeMetadata, entitiesProto) if err != nil { diff --git a/go/internal/feast/server/server_test_utils.go b/go/internal/feast/server/server_test_utils.go index 762da7a9dad..a3fffc6ca4f 100644 --- a/go/internal/feast/server/server_test_utils.go +++ b/go/internal/feast/server/server_test_utils.go @@ -45,7 +45,7 @@ func GetClient(ctx context.Context, basePath string, logPath string) (serving.Se if err != nil { panic(err) } - servingServiceServer := NewGrpcServingServiceServer(fs, loggingService) + servingServiceServer := NewGrpcServingServiceServer(fs, loggingService, nil, nil) serving.RegisterServingServiceServer(server, servingServiceServer) go func() { @@ -109,5 +109,5 @@ func GetHttpServer(basePath string, logPath string) *HttpServer { if err != nil { panic(err) } - return NewHttpServer(fs, loggingService) + return NewHttpServer(fs, loggingService, nil, config) } diff --git a/go/main.go b/go/main.go index f6d1662c3d3..91a4af5d9cd 100644 --- a/go/main.go +++ b/go/main.go @@ -12,6 +12,7 @@ import ( "syscall" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/metrics" "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server" "github.com/feast-dev/feast/go/internal/feast/server/logging" @@ -24,26 +25,23 @@ import ( ) type ServerStarter interface { - StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error - StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error - StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService) error - // TODO: MERGE-CONFLICT resolve different logging setups - //StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error - //StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error + StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error + StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error + StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error } type RealServerStarter struct{} -func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { - return StartHttpServer(fs, host, port, loggingService) +func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + return StartHttpServer(fs, host, port, loggingService, metricsClient, config) } -func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { - return StartGrpcServer(fs, host, port, loggingService) +func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + return StartGrpcServer(fs, host, port, loggingService, metricsClient, config) } -func (s *RealServerStarter) StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService) error { - return StartHybridServer(fs, host, httpPort, grpcPort, loggingService) +func (s *RealServerStarter) StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + return StartHybridServer(fs, host, httpPort, grpcPort, loggingService, metricsClient, config) } func main() { @@ -107,16 +105,29 @@ func main() { log.Fatal().Stack().Err(err).Msg("Failed to create loggingService") } - // TODO: writeLoggedFeaturesCallback is defaulted to nil. write_logged_features functionality needs to be - // implemented in Golang specific to OfflineStoreSink. Python Feature Server doesn't support this. + var metricsClient metrics.StatsdClient + if metrics.IsMissingKeyMetricsEnabled() { + if addr := metrics.GetStatsDAddress(); addr != "" { + client, clientErr := statsd.New(addr) + if clientErr != nil { + log.Error().Err(clientErr).Msg("Failed to create statsd client for missing key metrics") + } else { + metricsClient = client + defer client.Close() + log.Info().Msg("Missing key metrics enabled") + } + } else { + log.Warn().Msg("ENABLE_MISSING_KEY_METRICS is true but DD_AGENT_HOST is not set") + } + } + switch serverType { case "http": - err = serverStarter.StartHttpServer(fs, host, port, loggingService) + err = serverStarter.StartHttpServer(fs, host, port, loggingService, metricsClient, repoConfig) case "grpc": - err = serverStarter.StartGrpcServer(fs, host, port, loggingService) + err = serverStarter.StartGrpcServer(fs, host, port, loggingService, metricsClient, repoConfig) case "hybrid": - // hybrid starts both gRPC(on gRPC port) & http(on port) - err = serverStarter.StartHybridServer(fs, host, port, grpcPort, loggingService) + err = serverStarter.StartHybridServer(fs, host, port, grpcPort, loggingService, metricsClient, repoConfig) default: fmt.Println("Unknown serverStarter type. Please specify 'http', 'grpc', or 'hybrid'.") } @@ -133,8 +144,8 @@ func datadogTracingEnabled() bool { func publishVersionInfoToDatadog(info *version.Info) { if datadogTracingEnabled() { - if statsdHost, ok := os.LookupEnv("DD_AGENT_HOST"); ok { - var client, err = statsd.New(fmt.Sprintf("%s:8125", statsdHost)) + if addr := metrics.GetStatsDAddress(); addr != "" { + var client, err = statsd.New(addr) if err != nil { log.Error().Err(err).Msg("Failed to connect to statsd") return @@ -189,14 +200,14 @@ func constructLoggingService(fs *feast.FeatureStore, writeLoggedFeaturesCallback return loggingService, nil } -// StartGrpcServerWithLogging creates a gRPC server with enabled feature logging -func StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { +// StartGrpcServer creates a gRPC server with enabled feature logging +func StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { if datadogTracingEnabled() { tracer.Start(tracer.WithRuntimeMetrics()) defer tracer.Stop() } - ser := server.NewGrpcServingServiceServer(fs, loggingService) + ser := server.NewGrpcServingServiceServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a gRPC server on host %s port %d", host, port) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { @@ -229,11 +240,9 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingServi return grpcServer.Serve(lis) } -// StartHttpServerWithLogging creates an HTTP server with enabled feature logging -// Go does not allow direct assignment to package-level functions as a way to -// mock them for tests -func StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { - ser := server.NewHttpServer(fs, loggingService) +// StartHttpServer creates an HTTP server with enabled feature logging +func StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + ser := server.NewHttpServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, port) stop := make(chan os.Signal, 1) @@ -257,15 +266,13 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingServi } // StartHybridServer creates a gRPC Server and HTTP server -// Handlers for these are defined in hybrid_server.go -// Stops both servers if a stop signal is received. -func StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService) error { +func StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { if datadogTracingEnabled() { tracer.Start(tracer.WithRuntimeMetrics()) defer tracer.Stop() } - ser := server.NewGrpcServingServiceServer(fs, loggingService) + ser := server.NewGrpcServingServiceServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a gRPC server on host %s port %d", host, grpcPort) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, grpcPort)) if err != nil { @@ -274,7 +281,7 @@ func StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPo grpcSer := ser.RegisterServices() - httpSer := server.NewHttpServer(fs, loggingService) + httpSer := server.NewHttpServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, httpPort) stop := make(chan os.Signal, 1) diff --git a/go/main_test.go b/go/main_test.go index 04a99cac010..eb8553206e8 100644 --- a/go/main_test.go +++ b/go/main_test.go @@ -6,6 +6,8 @@ import ( "testing" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/metrics" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -16,13 +18,18 @@ type MockServerStarter struct { mock.Mock } -func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + args := m.Called(fs, host, port, loggingService, metricsClient, config) return args.Error(0) } -func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + args := m.Called(fs, host, port, loggingService, metricsClient, config) + return args.Error(0) +} + +func (m *MockServerStarter) StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + args := m.Called(fs, host, httpPort, grpcPort, loggingService, metricsClient, config) return args.Error(0) } @@ -32,13 +39,10 @@ func TestStartHttpServer(t *testing.T) { fs := &feast.FeatureStore{} host := "localhost" port := 8080 - var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback - loggingOpts := &logging.LoggingOptions{} - - mockServerStarter.On("StartHttpServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + mockServerStarter.On("StartHttpServer", fs, host, port, (*logging.LoggingService)(nil), (metrics.StatsdClient)(nil), (*registry.RepoConfig)(nil)).Return(nil) - err := mockServerStarter.StartHttpServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + err := mockServerStarter.StartHttpServer(fs, host, port, nil, nil, nil) assert.NoError(t, err) mockServerStarter.AssertExpectations(t) } @@ -49,12 +53,10 @@ func TestStartGrpcServer(t *testing.T) { fs := &feast.FeatureStore{} host := "localhost" port := 9090 - var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback - loggingOpts := &logging.LoggingOptions{} - mockServerStarter.On("StartGrpcServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + mockServerStarter.On("StartGrpcServer", fs, host, port, (*logging.LoggingService)(nil), (metrics.StatsdClient)(nil), (*registry.RepoConfig)(nil)).Return(nil) - err := mockServerStarter.StartGrpcServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + err := mockServerStarter.StartGrpcServer(fs, host, port, nil, nil, nil) assert.NoError(t, err) mockServerStarter.AssertExpectations(t) } diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py new file mode 100644 index 00000000000..4a9d1b756f7 --- /dev/null +++ b/sdk/python/feast/_missing_key_metrics.py @@ -0,0 +1,106 @@ +import logging +import os +import random +from collections import Counter +from typing import List + +from feast.protos.feast.serving.ServingService_pb2 import FieldStatus + +logger = logging.getLogger(__name__) + + +def _extract_feature_view(feature_name: str) -> str: + """Extract feature view name from full feature name. + + Feature names follow the format: feature_view__feature_name + Example: "hotel_fv__price" -> "hotel_fv" + + Args: + feature_name: Full feature name with feature view prefix + + Returns: + Feature view name, or "unknown" if format doesn't match + """ + if "__" in feature_name: + return feature_name.split("__", 1)[0] + return "unknown" + + +class LookupMetricsAggregator: + def __init__( + self, + project: str, + online_store_type: str, + metrics_client, + ): + self.project = project + self.online_store_type = online_store_type + self.metrics_client = metrics_client + self.not_found: Counter = Counter() + self.null_or_expired: Counter = Counter() + + # Read sampling rate from environment (default: 1.0 = no sampling) + sample_rate_str = os.getenv("FEAST_METRICS_SAMPLE_RATE", "1.0") + try: + self.sample_rate = float(sample_rate_str) + # Validate: must be between 0 and 1 + if not 0 < self.sample_rate <= 1.0: + logger.warning( + f"Invalid FEAST_METRICS_SAMPLE_RATE={sample_rate_str}, using 1.0" + ) + self.sample_rate = 1.0 + except ValueError: + logger.warning( + f"Invalid FEAST_METRICS_SAMPLE_RATE={sample_rate_str}, using 1.0" + ) + self.sample_rate = 1.0 + + def record(self, feature_id: str, status: int) -> None: + if status == FieldStatus.NOT_FOUND: + self.not_found[feature_id] += 1 + elif status in (FieldStatus.NULL_VALUE, FieldStatus.OUTSIDE_MAX_AGE): + self.null_or_expired[feature_id] += 1 + + def emit(self) -> None: + if self.metrics_client is None: + return + + # Probabilistic sampling: skip this request's metrics based on sample_rate + if self.sample_rate < 1.0 and random.random() > self.sample_rate: + return + + # Calculate multiplier to preserve statistical accuracy + # If sample_rate=0.1, we only emit 10% of the time, so multiply counts by 10 + multiplier = 1.0 / self.sample_rate + + base_tags: List[str] = [ + f"project:{self.project}", + f"online_store_type:{self.online_store_type}", + ] + + for feat, cnt in self.not_found.items(): + if cnt: + # Adjust count to preserve accuracy when sampling + adjusted_count = int(cnt * multiplier) + self.metrics_client.increment( + "mlpfs.featureserver.feature_lookup_not_found", + adjusted_count, + tags=base_tags + + [ + f"feature:{feat}", + f"feature_view:{_extract_feature_view(feat)}", + ], + ) + + for feat, cnt in self.null_or_expired.items(): + if cnt: + adjusted_count = int(cnt * multiplier) + self.metrics_client.increment( + "mlpfs.featureserver.feature_lookup_null_or_expired", + adjusted_count, + tags=base_tags + + [ + f"feature:{feat}", + f"feature_view:{_extract_feature_view(feat)}", + ], + ) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index b77185229d5..01b272b7bbc 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -12,17 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +import logging +import os from abc import ABC, abstractmethod from datetime import datetime from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union from feast import Entity, utils +from feast._missing_key_metrics import LookupMetricsAggregator from feast.batch_feature_view import BatchFeatureView from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject from feast.infra.registry.base_registry import BaseRegistry from feast.infra.supported_async_methods import SupportedAsyncMethods +from feast.metrics_client import get_metrics_client from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -229,6 +233,13 @@ def get_online_features( utils._drop_unneeded_columns( online_features_response, requested_result_row_names ) + + if _is_missing_key_metrics_enabled(): + try: + _emit_missing_key_metrics(config, project, online_features_response) + except Exception: + logger.debug("Failed to emit missing key metrics", exc_info=True) + return OnlineResponse(online_features_response) async def get_online_features_async( @@ -472,3 +483,31 @@ async def initialize(self, config: RepoConfig) -> None: async def close(self) -> None: pass + + +logger = logging.getLogger(__name__) + + +def _is_missing_key_metrics_enabled() -> bool: + return os.getenv("ENABLE_MISSING_KEY_METRICS", "false").lower() == "true" + + +def _emit_missing_key_metrics(config, project, response_proto): + online_store_type = ( + config.online_store.type if hasattr(config.online_store, "type") else "unknown" + ) + + agg = LookupMetricsAggregator( + project=project, + online_store_type=online_store_type, + metrics_client=get_metrics_client(), + ) + + feature_names = list(response_proto.metadata.feature_names.val) + for i, feature_ref in enumerate(feature_names): + if i < len(response_proto.results): + feature_vector = response_proto.results[i] + for status in feature_vector.statuses: + agg.record(feature_ref, status) + + agg.emit() diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py new file mode 100644 index 00000000000..cd16e4240a6 --- /dev/null +++ b/sdk/python/feast/metrics.py @@ -0,0 +1,5 @@ +def datadog_client(**kwargs): + """Create a Datadog DogStatsd client. Requires feast[metrics] extra.""" + from datadog import DogStatsd + + return DogStatsd(**kwargs) diff --git a/sdk/python/feast/metrics_client.py b/sdk/python/feast/metrics_client.py new file mode 100644 index 00000000000..9c7f44d6d2d --- /dev/null +++ b/sdk/python/feast/metrics_client.py @@ -0,0 +1,34 @@ +from typing import List, Optional, Protocol, runtime_checkable + + +@runtime_checkable +class StatsdClient(Protocol): + def increment( + self, metric: str, value: int = 1, tags: Optional[List[str]] = None + ) -> None: ... + + +class NoOpStatsdClient: + def increment( + self, metric: str, value: int = 1, tags: Optional[List[str]] = None + ) -> None: + pass + + +_global_client: StatsdClient = NoOpStatsdClient() + + +def set_metrics_client(client: StatsdClient) -> None: + """Register a StatsD-compatible metrics client for Feast SDK. + + Call once at process startup. Example with Datadog: + from datadog import DogStatsd + from feast.metrics_client import set_metrics_client + set_metrics_client(DogStatsd(host="localhost", port=8125)) + """ + global _global_client + _global_client = client + + +def get_metrics_client() -> StatsdClient: + return _global_client diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py new file mode 100644 index 00000000000..fee6b166fd6 --- /dev/null +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -0,0 +1,284 @@ +import os +from unittest.mock import MagicMock + +from feast._missing_key_metrics import LookupMetricsAggregator, _extract_feature_view +from feast.metrics_client import ( + NoOpStatsdClient, + get_metrics_client, + set_metrics_client, +) +from feast.protos.feast.serving.ServingService_pb2 import ( + FeatureList, + FieldStatus, + GetOnlineFeaturesResponse, + GetOnlineFeaturesResponseMetadata, +) + + +class FakeMetricsClient: + def __init__(self): + self.calls = [] + + def increment(self, metric, value=1, tags=None): + self.calls.append({"metric": metric, "value": value, "tags": tags or []}) + + +class TestLookupMetricsAggregator: + def test_not_found_only(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + agg.record("user_fv__age", FieldStatus.NOT_FOUND) + agg.record("user_fv__age", FieldStatus.NOT_FOUND) + agg.record("user_fv__age", FieldStatus.NOT_FOUND) + agg.emit() + + assert len(fake.calls) == 1 + assert fake.calls[0]["metric"] == "mlpfs.featureserver.feature_lookup_not_found" + assert fake.calls[0]["value"] == 3 + assert "feature:user_fv__age" in fake.calls[0]["tags"] + + def test_null_or_expired(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + agg.record("order_fv__amt", FieldStatus.NULL_VALUE) + agg.record("order_fv__amt", FieldStatus.OUTSIDE_MAX_AGE) + agg.emit() + + assert len(fake.calls) == 1 + assert ( + fake.calls[0]["metric"] + == "mlpfs.featureserver.feature_lookup_null_or_expired" + ) + assert fake.calls[0]["value"] == 2 + + def test_mixed_statuses(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + agg.record("fv_a__f1", FieldStatus.PRESENT) + agg.record("fv_a__f1", FieldStatus.NOT_FOUND) + agg.record("fv_b__f2", FieldStatus.NULL_VALUE) + agg.record("fv_b__f2", FieldStatus.PRESENT) + agg.record("fv_b__f2", FieldStatus.OUTSIDE_MAX_AGE) + agg.emit() + + assert len(fake.calls) == 2 + metrics_by_feature = {} + for call in fake.calls: + feature_tag = [t for t in call["tags"] if t.startswith("feature:")] + key = (call["metric"], feature_tag[0] if feature_tag else "") + metrics_by_feature[key] = call["value"] + + assert ( + metrics_by_feature[ + ( + "mlpfs.featureserver.feature_lookup_not_found", + "feature:fv_a__f1", + ) + ] + == 1 + ) + assert ( + metrics_by_feature[ + ( + "mlpfs.featureserver.feature_lookup_null_or_expired", + "feature:fv_b__f2", + ) + ] + == 2 + ) + + def test_all_present(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + agg.record("fv__f1", FieldStatus.PRESENT) + agg.record("fv__f1", FieldStatus.PRESENT) + agg.emit() + + assert len(fake.calls) == 0 + + def test_none_client(self): + agg = LookupMetricsAggregator("proj", "redis", None) + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.emit() + + def test_tags(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("mlpfs", "eg-valkey", fake) + + agg.record("hotel_fv__price", FieldStatus.NOT_FOUND) + agg.emit() + + tags = fake.calls[0]["tags"] + assert "project:mlpfs" in tags + assert "online_store_type:eg-valkey" in tags + assert not any(t.startswith("service:") for t in tags) + assert not any(t.startswith("env:") for t in tags) + assert "feature:hotel_fv__price" in tags + assert "feature_view:hotel_fv" in tags + + +class TestMetricsClientRegistry: + def test_default_is_noop(self): + set_metrics_client(NoOpStatsdClient()) + client = get_metrics_client() + assert isinstance(client, NoOpStatsdClient) + + def test_set_and_get(self): + fake = FakeMetricsClient() + set_metrics_client(fake) + assert get_metrics_client() is fake + set_metrics_client(NoOpStatsdClient()) + + +class TestFeatureViewExtraction: + def test_standard_format(self): + assert _extract_feature_view("hotel_fv__price") == "hotel_fv" + assert _extract_feature_view("user_fv__age") == "user_fv" + assert ( + _extract_feature_view("ranking_signals_fv__score") == "ranking_signals_fv" + ) + + def test_multiple_underscores_in_feature_name(self): + # Only split on first __ occurrence + assert _extract_feature_view("hotel_fv__review_score_avg") == "hotel_fv" + + def test_no_double_underscore(self): + # Fallback to "unknown" for non-standard format + assert _extract_feature_view("age") == "unknown" + assert _extract_feature_view("hotel_fv:price") == "unknown" + + def test_empty_string(self): + assert _extract_feature_view("") == "unknown" + + +class TestIsMissingKeyMetricsEnabled: + def test_disabled_by_default(self): + os.environ.pop("ENABLE_MISSING_KEY_METRICS", None) + from feast.infra.online_stores.online_store import ( + _is_missing_key_metrics_enabled, + ) + + assert _is_missing_key_metrics_enabled() is False + + def test_enabled(self): + os.environ["ENABLE_MISSING_KEY_METRICS"] = "true" + from feast.infra.online_stores.online_store import ( + _is_missing_key_metrics_enabled, + ) + + try: + assert _is_missing_key_metrics_enabled() is True + finally: + os.environ.pop("ENABLE_MISSING_KEY_METRICS", None) + + +class TestEmitMissingKeyMetricsIntegration: + def test_with_proto_response(self): + from feast.infra.online_stores.online_store import _emit_missing_key_metrics + + fake = FakeMetricsClient() + set_metrics_client(fake) + + response = GetOnlineFeaturesResponse() + response.metadata.CopyFrom( + GetOnlineFeaturesResponseMetadata( + feature_names=FeatureList(val=["fv_a__f1", "fv_a__f2"]) + ) + ) + fv1 = response.results.add() + fv1.statuses.extend([FieldStatus.PRESENT, FieldStatus.NOT_FOUND]) + fv2 = response.results.add() + fv2.statuses.extend([FieldStatus.NOT_FOUND, FieldStatus.NOT_FOUND]) + + config = MagicMock() + config.online_store.type = "redis" + + _emit_missing_key_metrics(config, "test_project", response) + + assert len(fake.calls) == 2 + calls_by_feature = { + [t for t in c["tags"] if t.startswith("feature:")][0]: c for c in fake.calls + } + assert calls_by_feature["feature:fv_a__f1"]["value"] == 1 + assert calls_by_feature["feature:fv_a__f2"]["value"] == 2 + + set_metrics_client(NoOpStatsdClient()) + + +class TestSamplingFeature: + def test_default_no_sampling(self): + """Default sample_rate should be 1.0 (no sampling)""" + os.environ.pop("FEAST_METRICS_SAMPLE_RATE", None) + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + assert agg.sample_rate == 1.0 + + def test_sample_rate_from_env(self): + """Should read sample_rate from environment""" + os.environ["FEAST_METRICS_SAMPLE_RATE"] = "0.5" + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + assert agg.sample_rate == 0.5 + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") + + def test_invalid_sample_rate_uses_default(self): + """Invalid sample rates should default to 1.0""" + test_cases = ["-0.5", "1.5", "abc", ""] + + for invalid_value in test_cases: + os.environ["FEAST_METRICS_SAMPLE_RATE"] = invalid_value + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + assert agg.sample_rate == 1.0, f"Failed for value: {invalid_value}" + + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") + + def test_sampling_adjusts_counts(self): + """When sampling, counts should be multiplied by 1/sample_rate""" + os.environ["FEAST_METRICS_SAMPLE_RATE"] = "0.5" + fake = FakeMetricsClient() + + # Force emit by seeding random to always emit + import random + + random.seed(0) # This makes random.random() predictable + + agg = LookupMetricsAggregator("proj", "redis", fake) + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.record("fv__f1", FieldStatus.NOT_FOUND) # 2 times + + # Try multiple times to ensure at least one emit happens + emitted = False + for _ in range(20): + fake.calls.clear() + agg.emit() + if fake.calls: + emitted = True + # With sample_rate=0.5, count of 2 should become 4 (2 / 0.5) + assert fake.calls[0]["value"] == 4 + break + + assert emitted, "Should have emitted at least once in 20 tries" + + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") + + def test_no_sampling_keeps_original_counts(self): + """With sample_rate=1.0, counts should not be adjusted""" + os.environ["FEAST_METRICS_SAMPLE_RATE"] = "1.0" + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", fake) + + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.emit() + + assert len(fake.calls) == 1 + assert fake.calls[0]["value"] == 2 # Original count, no adjustment + + os.environ.pop("FEAST_METRICS_SAMPLE_RATE")