From 3cacea43fd489488f015fb44f565ac69576bb092 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 28 Apr 2026 14:56:48 -0700 Subject: [PATCH 01/10] feat: Add missing-key metrics for online feature lookups (Go + Python) Emit DogStatsD counters (feature_lookup_not_found, feature_lookup_null_or_expired) when online store reads return NOT_FOUND, NULL_VALUE, or OUTSIDE_MAX_AGE statuses. Gated behind ENABLE_MISSING_KEY_METRICS env var; pluggable StatsD client in Python via set_metrics_client(). Covers Go HTTP/gRPC servers and Python OnlineStore.get_online_features. Co-Authored-By: Claude Opus 4.6 --- go/embedded/online_features.go | 4 +- go/internal/feast/metrics/client.go | 12 + go/internal/feast/metrics/config.go | 40 ++++ go/internal/feast/metrics/lookup_metrics.go | 103 +++++++++ .../feast/metrics/lookup_metrics_test.go | 206 ++++++++++++++++++ go/internal/feast/server/grpc_server.go | 32 ++- go/internal/feast/server/http_server.go | 32 ++- go/internal/feast/server/server_test_utils.go | 4 +- go/main.go | 69 +++--- go/main_test.go | 28 +-- sdk/python/feast/_missing_key_metrics.py | 58 +++++ .../feast/infra/online_stores/online_store.py | 53 +++++ sdk/python/feast/metrics.py | 5 + sdk/python/feast/metrics_client.py | 34 +++ .../tests/unit/test_missing_key_metrics.py | 192 ++++++++++++++++ 15 files changed, 820 insertions(+), 52 deletions(-) create mode 100644 go/internal/feast/metrics/client.go create mode 100644 go/internal/feast/metrics/config.go create mode 100644 go/internal/feast/metrics/lookup_metrics.go create mode 100644 go/internal/feast/metrics/lookup_metrics_test.go create mode 100644 sdk/python/feast/_missing_key_metrics.py create mode 100644 sdk/python/feast/metrics.py create mode 100644 sdk/python/feast/metrics_client.py create mode 100644 sdk/python/tests/unit/test_missing_key_metrics.py diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index e8272ecd237..9f8b0349da8 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -311,7 +311,7 @@ func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int, if err != nil { return err } - ser := server.NewGrpcServingServiceServer(s.fs, loggingService) + ser := server.NewGrpcServingServiceServer(s.fs, loggingService, nil, nil) log.Printf("Starting a gRPC server on host %s port %d\n", host, port) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { @@ -366,7 +366,7 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int, if err != nil { return err } - ser := server.NewHttpServer(s.fs, loggingService) + ser := server.NewHttpServer(s.fs, loggingService, nil, nil) log.Printf("Starting a HTTP server on host %s port %d\n", host, port) go func() { diff --git a/go/internal/feast/metrics/client.go b/go/internal/feast/metrics/client.go new file mode 100644 index 00000000000..f0e38208fc6 --- /dev/null +++ b/go/internal/feast/metrics/client.go @@ -0,0 +1,12 @@ +package metrics + +// StatsdClient wraps DogStatsD so tests can inject a fake. +// The real github.com/DataDog/datadog-go/v5/statsd.Client satisfies this interface. +type StatsdClient interface { + Count(name string, value int64, tags []string, rate float64) error +} + +// NoOpStatsdClient does nothing when metrics are disabled. +type NoOpStatsdClient struct{} + +func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil } diff --git a/go/internal/feast/metrics/config.go b/go/internal/feast/metrics/config.go new file mode 100644 index 00000000000..34948871259 --- /dev/null +++ b/go/internal/feast/metrics/config.go @@ -0,0 +1,40 @@ +package metrics + +import ( + "fmt" + "os" + "strings" + + "github.com/feast-dev/feast/go/internal/feast/registry" +) + +func IsMissingKeyMetricsEnabled() bool { + return strings.ToLower(os.Getenv("ENABLE_MISSING_KEY_METRICS")) == "true" +} + +func GetOnlineStoreType(config *registry.RepoConfig) string { + if storeType, ok := config.OnlineStore["type"]; ok { + return fmt.Sprintf("%v", storeType) + } + return "unknown" +} + +func GetServiceName() string { + if svc := os.Getenv("SERVICE_NAME"); svc != "" { + return svc + } + if app := os.Getenv("APPLICATION"); app != "" { + return app + } + return "unknown_service" +} + +func GetEnvironment() string { + if env := os.Getenv("EXPEDIA_ENVIRONMENT_CATEGORY"); env != "" { + return env + } + if env := os.Getenv("EXPEDIA_ENVIRONMENT"); env != "" { + return env + } + return "unknown_env" +} diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go new file mode 100644 index 00000000000..cc9b366cea6 --- /dev/null +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -0,0 +1,103 @@ +package metrics + +import ( + "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/protos/feast/serving" +) + +type LookupMetricsAggregator struct { + notFound map[string]int64 + nullOrExpired map[string]int64 + project string + onlineStore string + service string + env string + client StatsdClient +} + +func NewLookupMetricsAggregator( + project, onlineStore, service, env string, + client StatsdClient, +) *LookupMetricsAggregator { + if client == nil { + return nil + } + return &LookupMetricsAggregator{ + notFound: make(map[string]int64), + nullOrExpired: make(map[string]int64), + project: project, + onlineStore: onlineStore, + service: service, + env: env, + client: client, + } +} + +func (m *LookupMetricsAggregator) Record(featureID string, status serving.FieldStatus) { + if m == nil { + return + } + switch status { + case serving.FieldStatus_NOT_FOUND: + m.notFound[featureID]++ + case serving.FieldStatus_NULL_VALUE, serving.FieldStatus_OUTSIDE_MAX_AGE: + m.nullOrExpired[featureID]++ + } +} + +func (m *LookupMetricsAggregator) RecordFromFeatureVectors(vectors []*onlineserving.FeatureVector) { + if m == nil { + return + } + for _, vector := range vectors { + for _, status := range vector.Statuses { + m.Record(vector.Name, status) + } + } +} + +func (m *LookupMetricsAggregator) RecordFromRangeFeatureVectors(vectors []*onlineserving.RangeFeatureVector) { + if m == nil { + return + } + for _, vector := range vectors { + for _, entityStatuses := range vector.RangeStatuses { + for _, status := range entityStatuses { + m.Record(vector.Name, status) + } + } + } +} + +func (m *LookupMetricsAggregator) Emit() { + if m == nil || m.client == nil { + return + } + + baseTags := []string{ + "project:" + m.project, + "online_store_type:" + m.onlineStore, + "service:" + m.service, + "env:" + m.env, + } + + for featureID, count := range m.notFound { + if count == 0 { + continue + } + tags := make([]string, len(baseTags)+1) + copy(tags, baseTags) + tags[len(baseTags)] = "feature:" + featureID + m.client.Count("feast.feature_server.feature_lookup_not_found", count, tags, 1.0) + } + + for featureID, count := range m.nullOrExpired { + if count == 0 { + continue + } + tags := make([]string, len(baseTags)+1) + copy(tags, baseTags) + tags[len(baseTags)] = "feature:" + featureID + m.client.Count("feast.feature_server.feature_lookup_null_or_expired", count, tags, 1.0) + } +} diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go new file mode 100644 index 00000000000..40b142b9354 --- /dev/null +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -0,0 +1,206 @@ +package metrics + +import ( + "os" + "testing" + + "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/stretchr/testify/assert" +) + +type metricCall struct { + name string + value int64 + tags []string +} + +type fakeStatsdClient struct { + calls []metricCall +} + +func (f *fakeStatsdClient) Count(name string, value int64, tags []string, rate float64) error { + f.calls = append(f.calls, metricCall{name: name, value: value, tags: tags}) + return nil +} + +func newTestAggregator(client StatsdClient) *LookupMetricsAggregator { + return NewLookupMetricsAggregator("test_project", "redis", "test_service", "test", client) +} + +func TestAggregator_AllNotFound(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) + agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) + agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, "feast.feature_server.feature_lookup_not_found", fake.calls[0].name) + assert.Equal(t, int64(3), fake.calls[0].value) + assert.Contains(t, fake.calls[0].tags, "feature:user_fv__age") +} + +func TestAggregator_AllNullOrExpired(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("order_fv__amt", serving.FieldStatus_NULL_VALUE) + agg.Record("order_fv__amt", serving.FieldStatus_NULL_VALUE) + agg.Record("order_fv__amt", serving.FieldStatus_OUTSIDE_MAX_AGE) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, "feast.feature_server.feature_lookup_null_or_expired", fake.calls[0].name) + assert.Equal(t, int64(3), fake.calls[0].value) + assert.Contains(t, fake.calls[0].tags, "feature:order_fv__amt") +} + +func TestAggregator_MixedStatuses(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("fv_a__f1", serving.FieldStatus_PRESENT) + agg.Record("fv_a__f1", serving.FieldStatus_NOT_FOUND) + agg.Record("fv_b__f2", serving.FieldStatus_NULL_VALUE) + agg.Record("fv_b__f2", serving.FieldStatus_PRESENT) + agg.Record("fv_b__f2", serving.FieldStatus_OUTSIDE_MAX_AGE) + agg.Emit() + + assert.Len(t, fake.calls, 2) + + callsByName := map[string]metricCall{} + for _, c := range fake.calls { + callsByName[c.name+":"+findTag(c.tags, "feature:")] = c + } + + nf := callsByName["feast.feature_server.feature_lookup_not_found:fv_a__f1"] + assert.Equal(t, int64(1), nf.value) + + ne := callsByName["feast.feature_server.feature_lookup_null_or_expired:fv_b__f2"] + assert.Equal(t, int64(2), ne.value) +} + +func TestAggregator_AllPresent(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("fv__f1", serving.FieldStatus_PRESENT) + agg.Record("fv__f1", serving.FieldStatus_PRESENT) + agg.Emit() + + assert.Len(t, fake.calls, 0) +} + +func TestAggregator_NilSafe(t *testing.T) { + var agg *LookupMetricsAggregator + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.RecordFromFeatureVectors(nil) + agg.RecordFromRangeFeatureVectors(nil) + agg.Emit() +} + +func TestAggregator_NilClient(t *testing.T) { + agg := NewLookupMetricsAggregator("p", "r", "s", "e", nil) + assert.Nil(t, agg) +} + +func TestAggregator_Tags(t *testing.T) { + fake := &fakeStatsdClient{} + agg := NewLookupMetricsAggregator("mlpfs", "eg-valkey", "ranking-fs", "dw", fake) + + agg.Record("hotel_fv__price", serving.FieldStatus_NOT_FOUND) + agg.Emit() + + assert.Len(t, fake.calls, 1) + tags := fake.calls[0].tags + assert.Contains(t, tags, "project:mlpfs") + assert.Contains(t, tags, "online_store_type:eg-valkey") + assert.Contains(t, tags, "service:ranking-fs") + assert.Contains(t, tags, "env:dw") + assert.Contains(t, tags, "feature:hotel_fv__price") +} + +func TestRecordFromFeatureVectors(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + vectors := []*onlineserving.FeatureVector{ + { + Name: "fv_a__f1", + Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT, serving.FieldStatus_NOT_FOUND}, + }, + { + Name: "fv_a__f2", + Statuses: []serving.FieldStatus{serving.FieldStatus_NOT_FOUND, serving.FieldStatus_NOT_FOUND}, + }, + } + + agg.RecordFromFeatureVectors(vectors) + agg.Emit() + + assert.Len(t, fake.calls, 2) + + callsByFeature := map[string]int64{} + for _, c := range fake.calls { + callsByFeature[findTag(c.tags, "feature:")] = c.value + } + assert.Equal(t, int64(1), callsByFeature["fv_a__f1"]) + assert.Equal(t, int64(2), callsByFeature["fv_a__f2"]) +} + +func TestRecordFromRangeFeatureVectors(t *testing.T) { + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + vectors := []*onlineserving.RangeFeatureVector{ + { + Name: "sfv__f1", + RangeStatuses: [][]serving.FieldStatus{ + {serving.FieldStatus_PRESENT, serving.FieldStatus_NOT_FOUND}, + {serving.FieldStatus_NOT_FOUND}, + }, + }, + } + + agg.RecordFromRangeFeatureVectors(vectors) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, int64(2), fake.calls[0].value) + assert.Equal(t, "feast.feature_server.feature_lookup_not_found", fake.calls[0].name) +} + +func TestIsMissingKeyMetricsEnabled(t *testing.T) { + os.Unsetenv("ENABLE_MISSING_KEY_METRICS") + assert.False(t, IsMissingKeyMetricsEnabled()) + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "true") + assert.True(t, IsMissingKeyMetricsEnabled()) + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "TRUE") + assert.True(t, IsMissingKeyMetricsEnabled()) + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "false") + assert.False(t, IsMissingKeyMetricsEnabled()) + + os.Unsetenv("ENABLE_MISSING_KEY_METRICS") +} + +func TestGetOnlineStoreType(t *testing.T) { + // Test with mock config that has "type" key + assert.Equal(t, "unknown_service", GetServiceName()) + assert.Equal(t, "unknown_env", GetEnvironment()) +} + +// findTag extracts the value portion of a tag matching the given prefix. +func findTag(tags []string, prefix string) string { + for _, tag := range tags { + if len(tag) > len(prefix) && tag[:len(prefix)] == prefix { + return tag[len(prefix):] + } + } + return "" +} diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index 5077683b68e..6eb8f85bd3d 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -6,6 +6,8 @@ import ( "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/errors" + "github.com/feast-dev/feast/go/internal/feast/metrics" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/internal/feast/version" "github.com/feast-dev/feast/go/protos/feast/serving" @@ -27,11 +29,13 @@ const feastServerVersion = "0.0.1" type grpcServingServiceServer struct { fs *feast.FeatureStore loggingService *logging.LoggingService + metricsClient metrics.StatsdClient + config *registry.RepoConfig serving.UnimplementedServingServiceServer } -func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *grpcServingServiceServer { - return &grpcServingServiceServer{fs: fs, loggingService: loggingService} +func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) *grpcServingServiceServer { + return &grpcServingServiceServer{fs: fs, loggingService: loggingService, metricsClient: metricsClient, config: config} } func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { @@ -86,6 +90,18 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques return nil, errors.GrpcFromError(err) } + if s.metricsClient != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + metrics.GetServiceName(), + metrics.GetEnvironment(), + s.metricsClient, + ) + agg.RecordFromFeatureVectors(featureVectors) + agg.Emit() + } + resp := &serving.GetOnlineFeaturesResponse{ Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, 0), Metadata: &serving.GetOnlineFeaturesResponseMetadata{ @@ -168,6 +184,18 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r return nil, errors.GrpcFromError(err) } + if s.metricsClient != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + metrics.GetServiceName(), + metrics.GetEnvironment(), + s.metricsClient, + ) + agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) + agg.Emit() + } + entities := request.GetEntities() results := make([]*serving.GetOnlineFeaturesRangeResponse_RangeFeatureVector, 0, len(rangeFeatureVectors)) featureNames := make([]string, 0, len(rangeFeatureVectors)) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 68e71e0aabe..dd3719059a9 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -18,8 +18,10 @@ import ( httptrace "github.com/DataDog/dd-trace-go/contrib/net/http/v2" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/metrics" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" @@ -30,6 +32,8 @@ import ( type HttpServer struct { fs *feast.FeatureStore loggingService *logging.LoggingService + metricsClient metrics.StatsdClient + config *registry.RepoConfig server *http.Server } @@ -320,8 +324,8 @@ type getOnlineFeaturesRequest struct { RequestContext map[string]repeatedValue `json:"request_context"` } -func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *HttpServer { - return &HttpServer{fs: fs, loggingService: loggingService} +func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) *HttpServer { + return &HttpServer{fs: fs, loggingService: loggingService, metricsClient: metricsClient, config: config} } func parseIncludeMetadata(r *http.Request) (bool, error) { @@ -424,6 +428,18 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { return } + if s.metricsClient != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + metrics.GetServiceName(), + metrics.GetEnvironment(), + s.metricsClient, + ) + agg.RecordFromFeatureVectors(featureVectors) + agg.Emit() + } + var featureNames []string var results []map[string]interface{} for _, vector := range featureVectors { @@ -628,6 +644,18 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque return } + if s.metricsClient != nil { + agg := metrics.NewLookupMetricsAggregator( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + metrics.GetServiceName(), + metrics.GetEnvironment(), + s.metricsClient, + ) + agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) + agg.Emit() + } + featureNames, entities, results, err := processFeatureVectors( rangeFeatureVectors, includeMetadata, entitiesProto) if err != nil { diff --git a/go/internal/feast/server/server_test_utils.go b/go/internal/feast/server/server_test_utils.go index 762da7a9dad..a3fffc6ca4f 100644 --- a/go/internal/feast/server/server_test_utils.go +++ b/go/internal/feast/server/server_test_utils.go @@ -45,7 +45,7 @@ func GetClient(ctx context.Context, basePath string, logPath string) (serving.Se if err != nil { panic(err) } - servingServiceServer := NewGrpcServingServiceServer(fs, loggingService) + servingServiceServer := NewGrpcServingServiceServer(fs, loggingService, nil, nil) serving.RegisterServingServiceServer(server, servingServiceServer) go func() { @@ -109,5 +109,5 @@ func GetHttpServer(basePath string, logPath string) *HttpServer { if err != nil { panic(err) } - return NewHttpServer(fs, loggingService) + return NewHttpServer(fs, loggingService, nil, config) } diff --git a/go/main.go b/go/main.go index f6d1662c3d3..af22293b781 100644 --- a/go/main.go +++ b/go/main.go @@ -12,6 +12,7 @@ import ( "syscall" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/metrics" "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server" "github.com/feast-dev/feast/go/internal/feast/server/logging" @@ -24,26 +25,23 @@ import ( ) type ServerStarter interface { - StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error - StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error - StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService) error - // TODO: MERGE-CONFLICT resolve different logging setups - //StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error - //StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error + StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error + StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error + StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error } type RealServerStarter struct{} -func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { - return StartHttpServer(fs, host, port, loggingService) +func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + return StartHttpServer(fs, host, port, loggingService, metricsClient, config) } -func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { - return StartGrpcServer(fs, host, port, loggingService) +func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + return StartGrpcServer(fs, host, port, loggingService, metricsClient, config) } -func (s *RealServerStarter) StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService) error { - return StartHybridServer(fs, host, httpPort, grpcPort, loggingService) +func (s *RealServerStarter) StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + return StartHybridServer(fs, host, httpPort, grpcPort, loggingService, metricsClient, config) } func main() { @@ -107,16 +105,29 @@ func main() { log.Fatal().Stack().Err(err).Msg("Failed to create loggingService") } - // TODO: writeLoggedFeaturesCallback is defaulted to nil. write_logged_features functionality needs to be - // implemented in Golang specific to OfflineStoreSink. Python Feature Server doesn't support this. + var metricsClient metrics.StatsdClient + if metrics.IsMissingKeyMetricsEnabled() { + if statsdHost, ok := os.LookupEnv("DD_AGENT_HOST"); ok { + client, clientErr := statsd.New(fmt.Sprintf("%s:8125", statsdHost)) + if clientErr != nil { + log.Error().Err(clientErr).Msg("Failed to create statsd client for missing key metrics") + } else { + metricsClient = client + defer client.Close() + log.Info().Msg("Missing key metrics enabled") + } + } else { + log.Warn().Msg("ENABLE_MISSING_KEY_METRICS is true but DD_AGENT_HOST is not set") + } + } + switch serverType { case "http": - err = serverStarter.StartHttpServer(fs, host, port, loggingService) + err = serverStarter.StartHttpServer(fs, host, port, loggingService, metricsClient, repoConfig) case "grpc": - err = serverStarter.StartGrpcServer(fs, host, port, loggingService) + err = serverStarter.StartGrpcServer(fs, host, port, loggingService, metricsClient, repoConfig) case "hybrid": - // hybrid starts both gRPC(on gRPC port) & http(on port) - err = serverStarter.StartHybridServer(fs, host, port, grpcPort, loggingService) + err = serverStarter.StartHybridServer(fs, host, port, grpcPort, loggingService, metricsClient, repoConfig) default: fmt.Println("Unknown serverStarter type. Please specify 'http', 'grpc', or 'hybrid'.") } @@ -189,14 +200,14 @@ func constructLoggingService(fs *feast.FeatureStore, writeLoggedFeaturesCallback return loggingService, nil } -// StartGrpcServerWithLogging creates a gRPC server with enabled feature logging -func StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { +// StartGrpcServer creates a gRPC server with enabled feature logging +func StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { if datadogTracingEnabled() { tracer.Start(tracer.WithRuntimeMetrics()) defer tracer.Stop() } - ser := server.NewGrpcServingServiceServer(fs, loggingService) + ser := server.NewGrpcServingServiceServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a gRPC server on host %s port %d", host, port) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { @@ -229,11 +240,9 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingServi return grpcServer.Serve(lis) } -// StartHttpServerWithLogging creates an HTTP server with enabled feature logging -// Go does not allow direct assignment to package-level functions as a way to -// mock them for tests -func StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService) error { - ser := server.NewHttpServer(fs, loggingService) +// StartHttpServer creates an HTTP server with enabled feature logging +func StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + ser := server.NewHttpServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, port) stop := make(chan os.Signal, 1) @@ -257,15 +266,13 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingServi } // StartHybridServer creates a gRPC Server and HTTP server -// Handlers for these are defined in hybrid_server.go -// Stops both servers if a stop signal is received. -func StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService) error { +func StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { if datadogTracingEnabled() { tracer.Start(tracer.WithRuntimeMetrics()) defer tracer.Stop() } - ser := server.NewGrpcServingServiceServer(fs, loggingService) + ser := server.NewGrpcServingServiceServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a gRPC server on host %s port %d", host, grpcPort) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, grpcPort)) if err != nil { @@ -274,7 +281,7 @@ func StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPo grpcSer := ser.RegisterServices() - httpSer := server.NewHttpServer(fs, loggingService) + httpSer := server.NewHttpServer(fs, loggingService, metricsClient, config) log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, httpPort) stop := make(chan os.Signal, 1) diff --git a/go/main_test.go b/go/main_test.go index 04a99cac010..eb8553206e8 100644 --- a/go/main_test.go +++ b/go/main_test.go @@ -6,6 +6,8 @@ import ( "testing" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/metrics" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -16,13 +18,18 @@ type MockServerStarter struct { mock.Mock } -func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + args := m.Called(fs, host, port, loggingService, metricsClient, config) return args.Error(0) } -func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + args := m.Called(fs, host, port, loggingService, metricsClient, config) + return args.Error(0) +} + +func (m *MockServerStarter) StartHybridServer(fs *feast.FeatureStore, host string, httpPort int, grpcPort int, loggingService *logging.LoggingService, metricsClient metrics.StatsdClient, config *registry.RepoConfig) error { + args := m.Called(fs, host, httpPort, grpcPort, loggingService, metricsClient, config) return args.Error(0) } @@ -32,13 +39,10 @@ func TestStartHttpServer(t *testing.T) { fs := &feast.FeatureStore{} host := "localhost" port := 8080 - var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback - loggingOpts := &logging.LoggingOptions{} - - mockServerStarter.On("StartHttpServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + mockServerStarter.On("StartHttpServer", fs, host, port, (*logging.LoggingService)(nil), (metrics.StatsdClient)(nil), (*registry.RepoConfig)(nil)).Return(nil) - err := mockServerStarter.StartHttpServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + err := mockServerStarter.StartHttpServer(fs, host, port, nil, nil, nil) assert.NoError(t, err) mockServerStarter.AssertExpectations(t) } @@ -49,12 +53,10 @@ func TestStartGrpcServer(t *testing.T) { fs := &feast.FeatureStore{} host := "localhost" port := 9090 - var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback - loggingOpts := &logging.LoggingOptions{} - mockServerStarter.On("StartGrpcServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + mockServerStarter.On("StartGrpcServer", fs, host, port, (*logging.LoggingService)(nil), (metrics.StatsdClient)(nil), (*registry.RepoConfig)(nil)).Return(nil) - err := mockServerStarter.StartGrpcServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + err := mockServerStarter.StartGrpcServer(fs, host, port, nil, nil, nil) assert.NoError(t, err) mockServerStarter.AssertExpectations(t) } diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py new file mode 100644 index 00000000000..3825abeb653 --- /dev/null +++ b/sdk/python/feast/_missing_key_metrics.py @@ -0,0 +1,58 @@ +import logging +from collections import Counter +from typing import List, Optional + +from feast.protos.feast.serving.ServingService_pb2 import FieldStatus + +logger = logging.getLogger(__name__) + + +class LookupMetricsAggregator: + def __init__( + self, + project: str, + online_store_type: str, + service: str, + env: str, + metrics_client, + ): + self.project = project + self.online_store_type = online_store_type + self.service = service or "unknown_service" + self.env = env or "unknown_env" + self.metrics_client = metrics_client + self.not_found: Counter = Counter() + self.null_or_expired: Counter = Counter() + + def record(self, feature_id: str, status: int) -> None: + if status == FieldStatus.NOT_FOUND: + self.not_found[feature_id] += 1 + elif status in (FieldStatus.NULL_VALUE, FieldStatus.OUTSIDE_MAX_AGE): + self.null_or_expired[feature_id] += 1 + + def emit(self) -> None: + if self.metrics_client is None: + return + + base_tags: List[str] = [ + f"project:{self.project}", + f"online_store_type:{self.online_store_type}", + f"service:{self.service}", + f"env:{self.env}", + ] + + for feat, cnt in self.not_found.items(): + if cnt: + self.metrics_client.increment( + "feast.feature_server.feature_lookup_not_found", + cnt, + tags=base_tags + [f"feature:{feat}"], + ) + + for feat, cnt in self.null_or_expired.items(): + if cnt: + self.metrics_client.increment( + "feast.feature_server.feature_lookup_null_or_expired", + cnt, + tags=base_tags + [f"feature:{feat}"], + ) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index b77185229d5..a0026ea1878 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +import logging +import os from abc import ABC, abstractmethod from datetime import datetime from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union @@ -229,6 +231,17 @@ def get_online_features( utils._drop_unneeded_columns( online_features_response, requested_result_row_names ) + + if _is_missing_key_metrics_enabled(): + try: + _emit_missing_key_metrics( + config, project, online_features_response + ) + except Exception: + logger.debug( + "Failed to emit missing key metrics", exc_info=True + ) + return OnlineResponse(online_features_response) async def get_online_features_async( @@ -472,3 +485,43 @@ async def initialize(self, config: RepoConfig) -> None: async def close(self) -> None: pass + + +logger = logging.getLogger(__name__) + + +def _is_missing_key_metrics_enabled() -> bool: + return os.getenv("ENABLE_MISSING_KEY_METRICS", "false").lower() == "true" + + +def _emit_missing_key_metrics(config, project, response_proto): + from feast._missing_key_metrics import LookupMetricsAggregator + from feast.metrics_client import get_metrics_client + + online_store_type = ( + config.online_store.type + if hasattr(config.online_store, "type") + else "unknown" + ) + service = os.getenv("SERVICE_NAME", os.getenv("APPLICATION", "unknown_service")) + env = os.getenv( + "EXPEDIA_ENVIRONMENT_CATEGORY", + os.getenv("EXPEDIA_ENVIRONMENT", "unknown_env"), + ) + + agg = LookupMetricsAggregator( + project=project, + online_store_type=online_store_type, + service=service, + env=env, + metrics_client=get_metrics_client(), + ) + + feature_names = list(response_proto.metadata.feature_names.val) + for i, feature_ref in enumerate(feature_names): + if i < len(response_proto.results): + feature_vector = response_proto.results[i] + for status in feature_vector.statuses: + agg.record(feature_ref, status) + + agg.emit() diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py new file mode 100644 index 00000000000..cd16e4240a6 --- /dev/null +++ b/sdk/python/feast/metrics.py @@ -0,0 +1,5 @@ +def datadog_client(**kwargs): + """Create a Datadog DogStatsd client. Requires feast[metrics] extra.""" + from datadog import DogStatsd + + return DogStatsd(**kwargs) diff --git a/sdk/python/feast/metrics_client.py b/sdk/python/feast/metrics_client.py new file mode 100644 index 00000000000..9c7f44d6d2d --- /dev/null +++ b/sdk/python/feast/metrics_client.py @@ -0,0 +1,34 @@ +from typing import List, Optional, Protocol, runtime_checkable + + +@runtime_checkable +class StatsdClient(Protocol): + def increment( + self, metric: str, value: int = 1, tags: Optional[List[str]] = None + ) -> None: ... + + +class NoOpStatsdClient: + def increment( + self, metric: str, value: int = 1, tags: Optional[List[str]] = None + ) -> None: + pass + + +_global_client: StatsdClient = NoOpStatsdClient() + + +def set_metrics_client(client: StatsdClient) -> None: + """Register a StatsD-compatible metrics client for Feast SDK. + + Call once at process startup. Example with Datadog: + from datadog import DogStatsd + from feast.metrics_client import set_metrics_client + set_metrics_client(DogStatsd(host="localhost", port=8125)) + """ + global _global_client + _global_client = client + + +def get_metrics_client() -> StatsdClient: + return _global_client diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py new file mode 100644 index 00000000000..26a14f97f42 --- /dev/null +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -0,0 +1,192 @@ +import os +from unittest.mock import MagicMock + +import pytest + +from feast._missing_key_metrics import LookupMetricsAggregator +from feast.metrics_client import ( + NoOpStatsdClient, + get_metrics_client, + set_metrics_client, +) +from feast.protos.feast.serving.ServingService_pb2 import ( + FieldStatus, + GetOnlineFeaturesResponse, + GetOnlineFeaturesResponseMetadata, +) +from feast.protos.feast.serving.ServingService_pb2 import ( + FeatureList, +) + + +class FakeMetricsClient: + def __init__(self): + self.calls = [] + + def increment(self, metric, value=1, tags=None): + self.calls.append({"metric": metric, "value": value, "tags": tags or []}) + + +class TestLookupMetricsAggregator: + def test_not_found_only(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + agg.record("user_fv__age", FieldStatus.NOT_FOUND) + agg.record("user_fv__age", FieldStatus.NOT_FOUND) + agg.record("user_fv__age", FieldStatus.NOT_FOUND) + agg.emit() + + assert len(fake.calls) == 1 + assert fake.calls[0]["metric"] == "feast.feature_server.feature_lookup_not_found" + assert fake.calls[0]["value"] == 3 + assert "feature:user_fv__age" in fake.calls[0]["tags"] + + def test_null_or_expired(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + agg.record("order_fv__amt", FieldStatus.NULL_VALUE) + agg.record("order_fv__amt", FieldStatus.OUTSIDE_MAX_AGE) + agg.emit() + + assert len(fake.calls) == 1 + assert ( + fake.calls[0]["metric"] + == "feast.feature_server.feature_lookup_null_or_expired" + ) + assert fake.calls[0]["value"] == 2 + + def test_mixed_statuses(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + agg.record("fv_a__f1", FieldStatus.PRESENT) + agg.record("fv_a__f1", FieldStatus.NOT_FOUND) + agg.record("fv_b__f2", FieldStatus.NULL_VALUE) + agg.record("fv_b__f2", FieldStatus.PRESENT) + agg.record("fv_b__f2", FieldStatus.OUTSIDE_MAX_AGE) + agg.emit() + + assert len(fake.calls) == 2 + metrics_by_feature = {} + for call in fake.calls: + feature_tag = [t for t in call["tags"] if t.startswith("feature:")] + key = (call["metric"], feature_tag[0] if feature_tag else "") + metrics_by_feature[key] = call["value"] + + assert ( + metrics_by_feature[ + ( + "feast.feature_server.feature_lookup_not_found", + "feature:fv_a__f1", + ) + ] + == 1 + ) + assert ( + metrics_by_feature[ + ( + "feast.feature_server.feature_lookup_null_or_expired", + "feature:fv_b__f2", + ) + ] + == 2 + ) + + def test_all_present(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + agg.record("fv__f1", FieldStatus.PRESENT) + agg.record("fv__f1", FieldStatus.PRESENT) + agg.emit() + + assert len(fake.calls) == 0 + + def test_none_client(self): + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", None) + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.emit() + + def test_tags(self): + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("mlpfs", "eg-valkey", "ranking-fs", "dw", fake) + + agg.record("hotel_fv__price", FieldStatus.NOT_FOUND) + agg.emit() + + tags = fake.calls[0]["tags"] + assert "project:mlpfs" in tags + assert "online_store_type:eg-valkey" in tags + assert "service:ranking-fs" in tags + assert "env:dw" in tags + assert "feature:hotel_fv__price" in tags + + +class TestMetricsClientRegistry: + def test_default_is_noop(self): + set_metrics_client(NoOpStatsdClient()) + client = get_metrics_client() + assert isinstance(client, NoOpStatsdClient) + + def test_set_and_get(self): + fake = FakeMetricsClient() + set_metrics_client(fake) + assert get_metrics_client() is fake + set_metrics_client(NoOpStatsdClient()) + + +class TestIsMissingKeyMetricsEnabled: + def test_disabled_by_default(self): + os.environ.pop("ENABLE_MISSING_KEY_METRICS", None) + from feast.infra.online_stores.online_store import ( + _is_missing_key_metrics_enabled, + ) + + assert _is_missing_key_metrics_enabled() is False + + def test_enabled(self): + os.environ["ENABLE_MISSING_KEY_METRICS"] = "true" + from feast.infra.online_stores.online_store import ( + _is_missing_key_metrics_enabled, + ) + + try: + assert _is_missing_key_metrics_enabled() is True + finally: + os.environ.pop("ENABLE_MISSING_KEY_METRICS", None) + + +class TestEmitMissingKeyMetricsIntegration: + def test_with_proto_response(self): + from feast.infra.online_stores.online_store import _emit_missing_key_metrics + + fake = FakeMetricsClient() + set_metrics_client(fake) + + response = GetOnlineFeaturesResponse() + response.metadata.CopyFrom( + GetOnlineFeaturesResponseMetadata( + feature_names=FeatureList(val=["fv_a__f1", "fv_a__f2"]) + ) + ) + fv1 = response.results.add() + fv1.statuses.extend([FieldStatus.PRESENT, FieldStatus.NOT_FOUND]) + fv2 = response.results.add() + fv2.statuses.extend([FieldStatus.NOT_FOUND, FieldStatus.NOT_FOUND]) + + config = MagicMock() + config.online_store.type = "redis" + + _emit_missing_key_metrics(config, "test_project", response) + + assert len(fake.calls) == 2 + calls_by_feature = { + [t for t in c["tags"] if t.startswith("feature:")][0]: c + for c in fake.calls + } + assert calls_by_feature["feature:fv_a__f1"]["value"] == 1 + assert calls_by_feature["feature:fv_a__f2"]["value"] == 2 + + set_metrics_client(NoOpStatsdClient()) From c3434a338255cb497e1aef14827ba2f4fad07009 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 28 Apr 2026 15:34:19 -0700 Subject: [PATCH 02/10] fix: Remove unused imports and fix import sorting Remove unused Optional import and pytest import, consolidate protobuf imports to fix ruff linting errors. Co-Authored-By: Claude Sonnet 4.5 --- sdk/python/feast/_missing_key_metrics.py | 2 +- sdk/python/tests/unit/test_missing_key_metrics.py | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py index 3825abeb653..7331bb8d080 100644 --- a/sdk/python/feast/_missing_key_metrics.py +++ b/sdk/python/feast/_missing_key_metrics.py @@ -1,6 +1,6 @@ import logging from collections import Counter -from typing import List, Optional +from typing import List from feast.protos.feast.serving.ServingService_pb2 import FieldStatus diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 26a14f97f42..253f20c46c5 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -1,8 +1,6 @@ import os from unittest.mock import MagicMock -import pytest - from feast._missing_key_metrics import LookupMetricsAggregator from feast.metrics_client import ( NoOpStatsdClient, @@ -10,13 +8,11 @@ set_metrics_client, ) from feast.protos.feast.serving.ServingService_pb2 import ( + FeatureList, FieldStatus, GetOnlineFeaturesResponse, GetOnlineFeaturesResponseMetadata, ) -from feast.protos.feast.serving.ServingService_pb2 import ( - FeatureList, -) class FakeMetricsClient: From 05ebd768715b1a2d973957b3dd0defa57ff7b3b9 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 28 Apr 2026 15:45:53 -0700 Subject: [PATCH 03/10] style: Apply ruff code formatting Format code to match project style guidelines using ruff format. Co-Authored-By: Claude Sonnet 4.5 --- sdk/python/feast/infra/online_stores/online_store.py | 12 +++--------- sdk/python/tests/unit/test_missing_key_metrics.py | 7 ++++--- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index a0026ea1878..cefdbaccea2 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -234,13 +234,9 @@ def get_online_features( if _is_missing_key_metrics_enabled(): try: - _emit_missing_key_metrics( - config, project, online_features_response - ) + _emit_missing_key_metrics(config, project, online_features_response) except Exception: - logger.debug( - "Failed to emit missing key metrics", exc_info=True - ) + logger.debug("Failed to emit missing key metrics", exc_info=True) return OnlineResponse(online_features_response) @@ -499,9 +495,7 @@ def _emit_missing_key_metrics(config, project, response_proto): from feast.metrics_client import get_metrics_client online_store_type = ( - config.online_store.type - if hasattr(config.online_store, "type") - else "unknown" + config.online_store.type if hasattr(config.online_store, "type") else "unknown" ) service = os.getenv("SERVICE_NAME", os.getenv("APPLICATION", "unknown_service")) env = os.getenv( diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 253f20c46c5..1dc535879ed 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -34,7 +34,9 @@ def test_not_found_only(self): agg.emit() assert len(fake.calls) == 1 - assert fake.calls[0]["metric"] == "feast.feature_server.feature_lookup_not_found" + assert ( + fake.calls[0]["metric"] == "feast.feature_server.feature_lookup_not_found" + ) assert fake.calls[0]["value"] == 3 assert "feature:user_fv__age" in fake.calls[0]["tags"] @@ -179,8 +181,7 @@ def test_with_proto_response(self): assert len(fake.calls) == 2 calls_by_feature = { - [t for t in c["tags"] if t.startswith("feature:")][0]: c - for c in fake.calls + [t for t in c["tags"] if t.startswith("feature:")][0]: c for c in fake.calls } assert calls_by_feature["feature:fv_a__f1"]["value"] == 1 assert calls_by_feature["feature:fv_a__f2"]["value"] == 2 From a517ac540bf4639fa07902e156bef73ec3ac8761 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 29 Apr 2026 12:26:07 -0700 Subject: [PATCH 04/10] feat: Add feature_view tag to missing-key metrics Extract and add feature_view tag alongside feature tag for better aggregation and filtering in Datadog dashboards. Changes: - Add _extract_feature_view() function in Python and extractFeatureView() in Go - Parse feature_view from feature name (format: feature_view__feature_name) - Add feature_view tag to both not_found and null_or_expired metrics - Add comprehensive tests for extraction function and tag presence - No cardinality increase - feature_view is derived from existing feature tag Tags now include: - project, online_store_type, service, env (existing) - feature (existing, high cardinality) - feature_view (new, low cardinality ~10-50 per project) Enables Datadog queries like: sum:feast.feature_server.feature_lookup_not_found by {feature_view} Co-Authored-By: Claude Sonnet 4.5 --- go/internal/feast/metrics/lookup_metrics.go | 19 ++++++++++-- .../feast/metrics/lookup_metrics_test.go | 24 +++++++++++++++ sdk/python/feast/_missing_key_metrics.py | 29 +++++++++++++++++-- .../tests/unit/test_missing_key_metrics.py | 24 ++++++++++++++- 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index cc9b366cea6..a7694226849 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -1,10 +1,23 @@ package metrics import ( + "strings" + "github.com/feast-dev/feast/go/internal/feast/onlineserving" "github.com/feast-dev/feast/go/protos/feast/serving" ) +// extractFeatureView extracts the feature view name from a full feature name. +// Feature names follow the format: feature_view__feature_name +// Example: "hotel_fv__price" -> "hotel_fv" +func extractFeatureView(featureName string) string { + parts := strings.SplitN(featureName, "__", 2) + if len(parts) == 2 { + return parts[0] + } + return "unknown" +} + type LookupMetricsAggregator struct { notFound map[string]int64 nullOrExpired map[string]int64 @@ -85,9 +98,10 @@ func (m *LookupMetricsAggregator) Emit() { if count == 0 { continue } - tags := make([]string, len(baseTags)+1) + tags := make([]string, len(baseTags)+2) copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID + tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) m.client.Count("feast.feature_server.feature_lookup_not_found", count, tags, 1.0) } @@ -95,9 +109,10 @@ func (m *LookupMetricsAggregator) Emit() { if count == 0 { continue } - tags := make([]string, len(baseTags)+1) + tags := make([]string, len(baseTags)+2) copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID + tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) m.client.Count("feast.feature_server.feature_lookup_null_or_expired", count, tags, 1.0) } } diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index 40b142b9354..fdb7256239b 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -121,6 +121,7 @@ func TestAggregator_Tags(t *testing.T) { assert.Contains(t, tags, "service:ranking-fs") assert.Contains(t, tags, "env:dw") assert.Contains(t, tags, "feature:hotel_fv__price") + assert.Contains(t, tags, "feature_view:hotel_fv") } func TestRecordFromFeatureVectors(t *testing.T) { @@ -195,6 +196,29 @@ func TestGetOnlineStoreType(t *testing.T) { assert.Equal(t, "unknown_env", GetEnvironment()) } +func TestExtractFeatureView(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {"standard format", "hotel_fv__price", "hotel_fv"}, + {"with underscore in feature", "hotel_fv__review_score_avg", "hotel_fv"}, + {"multiple feature views", "user_fv__age", "user_fv"}, + {"long feature view name", "ranking_signals_fv__score", "ranking_signals_fv"}, + {"no double underscore", "age", "unknown"}, + {"colon separator", "hotel_fv:price", "unknown"}, + {"empty string", "", "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := extractFeatureView(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + // findTag extracts the value portion of a tag matching the given prefix. func findTag(tags []string, prefix string) string { for _, tag := range tags { diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py index 7331bb8d080..0ec36344c87 100644 --- a/sdk/python/feast/_missing_key_metrics.py +++ b/sdk/python/feast/_missing_key_metrics.py @@ -7,6 +7,23 @@ logger = logging.getLogger(__name__) +def _extract_feature_view(feature_name: str) -> str: + """Extract feature view name from full feature name. + + Feature names follow the format: feature_view__feature_name + Example: "hotel_fv__price" -> "hotel_fv" + + Args: + feature_name: Full feature name with feature view prefix + + Returns: + Feature view name, or "unknown" if format doesn't match + """ + if "__" in feature_name: + return feature_name.split("__", 1)[0] + return "unknown" + + class LookupMetricsAggregator: def __init__( self, @@ -46,7 +63,11 @@ def emit(self) -> None: self.metrics_client.increment( "feast.feature_server.feature_lookup_not_found", cnt, - tags=base_tags + [f"feature:{feat}"], + tags=base_tags + + [ + f"feature:{feat}", + f"feature_view:{_extract_feature_view(feat)}", + ], ) for feat, cnt in self.null_or_expired.items(): @@ -54,5 +75,9 @@ def emit(self) -> None: self.metrics_client.increment( "feast.feature_server.feature_lookup_null_or_expired", cnt, - tags=base_tags + [f"feature:{feat}"], + tags=base_tags + + [ + f"feature:{feat}", + f"feature_view:{_extract_feature_view(feat)}", + ], ) diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 1dc535879ed..b7f04a312b7 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -1,7 +1,7 @@ import os from unittest.mock import MagicMock -from feast._missing_key_metrics import LookupMetricsAggregator +from feast._missing_key_metrics import LookupMetricsAggregator, _extract_feature_view from feast.metrics_client import ( NoOpStatsdClient, get_metrics_client, @@ -120,6 +120,7 @@ def test_tags(self): assert "service:ranking-fs" in tags assert "env:dw" in tags assert "feature:hotel_fv__price" in tags + assert "feature_view:hotel_fv" in tags class TestMetricsClientRegistry: @@ -135,6 +136,27 @@ def test_set_and_get(self): set_metrics_client(NoOpStatsdClient()) +class TestFeatureViewExtraction: + def test_standard_format(self): + assert _extract_feature_view("hotel_fv__price") == "hotel_fv" + assert _extract_feature_view("user_fv__age") == "user_fv" + assert ( + _extract_feature_view("ranking_signals_fv__score") == "ranking_signals_fv" + ) + + def test_multiple_underscores_in_feature_name(self): + # Only split on first __ occurrence + assert _extract_feature_view("hotel_fv__review_score_avg") == "hotel_fv" + + def test_no_double_underscore(self): + # Fallback to "unknown" for non-standard format + assert _extract_feature_view("age") == "unknown" + assert _extract_feature_view("hotel_fv:price") == "unknown" + + def test_empty_string(self): + assert _extract_feature_view("") == "unknown" + + class TestIsMissingKeyMetricsEnabled: def test_disabled_by_default(self): os.environ.pop("ENABLE_MISSING_KEY_METRICS", None) From 8f4424ce49e5485b2c8c7cb19457f7048eaff602 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 4 May 2026 22:51:27 -0700 Subject: [PATCH 05/10] Addressed review comments --- go/internal/feast/metrics/config.go | 17 ++++ go/internal/feast/metrics/config_test.go | 70 +++++++++++++++ go/internal/feast/metrics/lookup_metrics.go | 32 ++++++- .../feast/metrics/lookup_metrics_test.go | 89 +++++++++++++++++++ go/internal/feast/server/grpc_server.go | 4 +- go/internal/feast/server/http_server.go | 4 +- go/main.go | 8 +- sdk/python/feast/_missing_key_metrics.py | 33 ++++++- .../feast/infra/online_stores/online_store.py | 5 +- .../tests/unit/test_missing_key_metrics.py | 75 ++++++++++++++++ 10 files changed, 322 insertions(+), 15 deletions(-) create mode 100644 go/internal/feast/metrics/config_test.go diff --git a/go/internal/feast/metrics/config.go b/go/internal/feast/metrics/config.go index 34948871259..bc9886cce57 100644 --- a/go/internal/feast/metrics/config.go +++ b/go/internal/feast/metrics/config.go @@ -38,3 +38,20 @@ func GetEnvironment() string { } return "unknown_env" } + +// GetStatsDAddress returns the DogStatsD address from environment variables. +// Returns empty string if DD_AGENT_HOST is not set. +// Port can be configured via DD_DOGSTATSD_PORT (defaults to 8125). +func GetStatsDAddress() string { + host := os.Getenv("DD_AGENT_HOST") + if host == "" { + return "" + } + + port := os.Getenv("DD_DOGSTATSD_PORT") + if port == "" { + port = "8125" + } + + return fmt.Sprintf("%s:%s", host, port) +} diff --git a/go/internal/feast/metrics/config_test.go b/go/internal/feast/metrics/config_test.go new file mode 100644 index 00000000000..0336ab4deb1 --- /dev/null +++ b/go/internal/feast/metrics/config_test.go @@ -0,0 +1,70 @@ +package metrics + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetStatsDAddress(t *testing.T) { + tests := []struct { + name string + host string + port string + expected string + }{ + { + name: "no env vars set", + host: "", + port: "", + expected: "", + }, + { + name: "only host set, default port", + host: "datadog-agent", + port: "", + expected: "datadog-agent:8125", + }, + { + name: "host and custom port", + host: "datadog-agent", + port: "9125", + expected: "datadog-agent:9125", + }, + { + name: "localhost with default port", + host: "localhost", + port: "", + expected: "localhost:8125", + }, + { + name: "IP address with custom port", + host: "10.0.0.5", + port: "8126", + expected: "10.0.0.5:8126", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set env vars + if tt.host != "" { + os.Setenv("DD_AGENT_HOST", tt.host) + defer os.Unsetenv("DD_AGENT_HOST") + } else { + os.Unsetenv("DD_AGENT_HOST") + } + + if tt.port != "" { + os.Setenv("DD_DOGSTATSD_PORT", tt.port) + defer os.Unsetenv("DD_DOGSTATSD_PORT") + } else { + os.Unsetenv("DD_DOGSTATSD_PORT") + } + + result := GetStatsDAddress() + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index a7694226849..537f3f7b1a2 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -1,6 +1,9 @@ package metrics import ( + "math/rand" + "os" + "strconv" "strings" "github.com/feast-dev/feast/go/internal/feast/onlineserving" @@ -26,6 +29,7 @@ type LookupMetricsAggregator struct { service string env string client StatsdClient + sampleRate float64 } func NewLookupMetricsAggregator( @@ -35,6 +39,17 @@ func NewLookupMetricsAggregator( if client == nil { return nil } + + // Read sampling rate from environment (default: 1.0 = no sampling) + sampleRate := 1.0 + if rateStr := os.Getenv("FEAST_METRICS_SAMPLE_RATE"); rateStr != "" { + if rate, err := strconv.ParseFloat(rateStr, 64); err == nil { + if rate > 0 && rate <= 1.0 { + sampleRate = rate + } + } + } + return &LookupMetricsAggregator{ notFound: make(map[string]int64), nullOrExpired: make(map[string]int64), @@ -43,6 +58,7 @@ func NewLookupMetricsAggregator( service: service, env: env, client: client, + sampleRate: sampleRate, } } @@ -87,6 +103,15 @@ func (m *LookupMetricsAggregator) Emit() { return } + // Probabilistic sampling: skip this request's metrics based on sample_rate + if m.sampleRate < 1.0 && rand.Float64() > m.sampleRate { + return + } + + // Calculate multiplier to preserve statistical accuracy + // If sampleRate=0.1, we only emit 10% of the time, so multiply counts by 10 + multiplier := 1.0 / m.sampleRate + baseTags := []string{ "project:" + m.project, "online_store_type:" + m.onlineStore, @@ -98,21 +123,24 @@ func (m *LookupMetricsAggregator) Emit() { if count == 0 { continue } + // Adjust count to preserve accuracy when sampling + adjustedCount := int64(float64(count) * multiplier) tags := make([]string, len(baseTags)+2) copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("feast.feature_server.feature_lookup_not_found", count, tags, 1.0) + m.client.Count("feast.feature_server.feature_lookup_not_found", adjustedCount, tags, 1.0) } for featureID, count := range m.nullOrExpired { if count == 0 { continue } + adjustedCount := int64(float64(count) * multiplier) tags := make([]string, len(baseTags)+2) copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("feast.feature_server.feature_lookup_null_or_expired", count, tags, 1.0) + m.client.Count("feast.feature_server.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) } } diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index fdb7256239b..de3efd10e37 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -228,3 +228,92 @@ func findTag(tags []string, prefix string) string { } return "" } + +func TestSampling_DefaultNoSampling(t *testing.T) { + os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + assert.Equal(t, 1.0, agg.sampleRate, "Default sample rate should be 1.0") +} + +func TestSampling_ReadFromEnv(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "0.5") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + assert.Equal(t, 0.5, agg.sampleRate, "Should read sample rate from environment") +} + +func TestSampling_InvalidValues(t *testing.T) { + testCases := []struct { + value string + expected float64 + }{ + {"-0.5", 1.0}, // Negative + {"1.5", 1.0}, // > 1.0 + {"0", 1.0}, // Zero + {"abc", 1.0}, // Non-numeric + {"", 1.0}, // Empty (unset uses default) + } + + for _, tc := range testCases { + t.Run(tc.value, func(t *testing.T) { + if tc.value == "" { + os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + } else { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", tc.value) + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + } + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + assert.Equal(t, tc.expected, agg.sampleRate) + }) + } +} + +func TestSampling_AdjustsCountsCorrectly(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "0.5") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + // Record 2 missing keys + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + + // Try multiple times to ensure at least one emit happens + emitted := false + for i := 0; i < 50; i++ { + fake.calls = nil + agg.Emit() + if len(fake.calls) > 0 { + emitted = true + // With sample_rate=0.5, count of 2 should become 4 (2 / 0.5) + assert.Equal(t, int64(4), fake.calls[0].value, "Count should be adjusted by 1/sample_rate") + break + } + } + + assert.True(t, emitted, "Should have emitted at least once in 50 tries") +} + +func TestSampling_NoAdjustmentWhenNotSampling(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "1.0") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + agg := newTestAggregator(fake) + + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) + agg.Emit() + + assert.Len(t, fake.calls, 1) + assert.Equal(t, int64(2), fake.calls[0].value, "Count should not be adjusted with sample_rate=1.0") +} diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index 6eb8f85bd3d..322e830980a 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -90,7 +90,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques return nil, errors.GrpcFromError(err) } - if s.metricsClient != nil { + if s.metricsClient != nil && s.config != nil { agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), @@ -184,7 +184,7 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r return nil, errors.GrpcFromError(err) } - if s.metricsClient != nil { + if s.metricsClient != nil && s.config != nil { agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index dd3719059a9..41ce9bb57c2 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -428,7 +428,7 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { return } - if s.metricsClient != nil { + if s.metricsClient != nil && s.config != nil { agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), @@ -644,7 +644,7 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque return } - if s.metricsClient != nil { + if s.metricsClient != nil && s.config != nil { agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), diff --git a/go/main.go b/go/main.go index af22293b781..91a4af5d9cd 100644 --- a/go/main.go +++ b/go/main.go @@ -107,8 +107,8 @@ func main() { var metricsClient metrics.StatsdClient if metrics.IsMissingKeyMetricsEnabled() { - if statsdHost, ok := os.LookupEnv("DD_AGENT_HOST"); ok { - client, clientErr := statsd.New(fmt.Sprintf("%s:8125", statsdHost)) + if addr := metrics.GetStatsDAddress(); addr != "" { + client, clientErr := statsd.New(addr) if clientErr != nil { log.Error().Err(clientErr).Msg("Failed to create statsd client for missing key metrics") } else { @@ -144,8 +144,8 @@ func datadogTracingEnabled() bool { func publishVersionInfoToDatadog(info *version.Info) { if datadogTracingEnabled() { - if statsdHost, ok := os.LookupEnv("DD_AGENT_HOST"); ok { - var client, err = statsd.New(fmt.Sprintf("%s:8125", statsdHost)) + if addr := metrics.GetStatsDAddress(); addr != "" { + var client, err = statsd.New(addr) if err != nil { log.Error().Err(err).Msg("Failed to connect to statsd") return diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py index 0ec36344c87..4cceed526d7 100644 --- a/sdk/python/feast/_missing_key_metrics.py +++ b/sdk/python/feast/_missing_key_metrics.py @@ -1,4 +1,6 @@ import logging +import os +import random from collections import Counter from typing import List @@ -41,6 +43,22 @@ def __init__( self.not_found: Counter = Counter() self.null_or_expired: Counter = Counter() + # Read sampling rate from environment (default: 1.0 = no sampling) + sample_rate_str = os.getenv("FEAST_METRICS_SAMPLE_RATE", "1.0") + try: + self.sample_rate = float(sample_rate_str) + # Validate: must be between 0 and 1 + if not 0 < self.sample_rate <= 1.0: + logger.warning( + f"Invalid FEAST_METRICS_SAMPLE_RATE={sample_rate_str}, using 1.0" + ) + self.sample_rate = 1.0 + except ValueError: + logger.warning( + f"Invalid FEAST_METRICS_SAMPLE_RATE={sample_rate_str}, using 1.0" + ) + self.sample_rate = 1.0 + def record(self, feature_id: str, status: int) -> None: if status == FieldStatus.NOT_FOUND: self.not_found[feature_id] += 1 @@ -51,6 +69,14 @@ def emit(self) -> None: if self.metrics_client is None: return + # Probabilistic sampling: skip this request's metrics based on sample_rate + if self.sample_rate < 1.0 and random.random() > self.sample_rate: + return + + # Calculate multiplier to preserve statistical accuracy + # If sample_rate=0.1, we only emit 10% of the time, so multiply counts by 10 + multiplier = 1.0 / self.sample_rate + base_tags: List[str] = [ f"project:{self.project}", f"online_store_type:{self.online_store_type}", @@ -60,9 +86,11 @@ def emit(self) -> None: for feat, cnt in self.not_found.items(): if cnt: + # Adjust count to preserve accuracy when sampling + adjusted_count = int(cnt * multiplier) self.metrics_client.increment( "feast.feature_server.feature_lookup_not_found", - cnt, + adjusted_count, tags=base_tags + [ f"feature:{feat}", @@ -72,9 +100,10 @@ def emit(self) -> None: for feat, cnt in self.null_or_expired.items(): if cnt: + adjusted_count = int(cnt * multiplier) self.metrics_client.increment( "feast.feature_server.feature_lookup_null_or_expired", - cnt, + adjusted_count, tags=base_tags + [ f"feature:{feat}", diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index cefdbaccea2..ded118ab748 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -19,12 +19,14 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union from feast import Entity, utils +from feast._missing_key_metrics import LookupMetricsAggregator from feast.batch_feature_view import BatchFeatureView from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject from feast.infra.registry.base_registry import BaseRegistry from feast.infra.supported_async_methods import SupportedAsyncMethods +from feast.metrics_client import get_metrics_client from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -491,9 +493,6 @@ def _is_missing_key_metrics_enabled() -> bool: def _emit_missing_key_metrics(config, project, response_proto): - from feast._missing_key_metrics import LookupMetricsAggregator - from feast.metrics_client import get_metrics_client - online_store_type = ( config.online_store.type if hasattr(config.online_store, "type") else "unknown" ) diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index b7f04a312b7..7e738e15e0f 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -209,3 +209,78 @@ def test_with_proto_response(self): assert calls_by_feature["feature:fv_a__f2"]["value"] == 2 set_metrics_client(NoOpStatsdClient()) + + +class TestSamplingFeature: + def test_default_no_sampling(self): + """Default sample_rate should be 1.0 (no sampling)""" + os.environ.pop("FEAST_METRICS_SAMPLE_RATE", None) + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + assert agg.sample_rate == 1.0 + + def test_sample_rate_from_env(self): + """Should read sample_rate from environment""" + os.environ["FEAST_METRICS_SAMPLE_RATE"] = "0.5" + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + assert agg.sample_rate == 0.5 + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") + + def test_invalid_sample_rate_uses_default(self): + """Invalid sample rates should default to 1.0""" + test_cases = ["-0.5", "1.5", "abc", ""] + + for invalid_value in test_cases: + os.environ["FEAST_METRICS_SAMPLE_RATE"] = invalid_value + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + assert agg.sample_rate == 1.0, f"Failed for value: {invalid_value}" + + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") + + def test_sampling_adjusts_counts(self): + """When sampling, counts should be multiplied by 1/sample_rate""" + os.environ["FEAST_METRICS_SAMPLE_RATE"] = "0.5" + fake = FakeMetricsClient() + + # Force emit by seeding random to always emit + import random + + random.seed(0) # This makes random.random() predictable + + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.record("fv__f1", FieldStatus.NOT_FOUND) # 2 times + + # Try multiple times to ensure at least one emit happens + emitted = False + for _ in range(20): + fake.calls.clear() + agg.emit() + if fake.calls: + emitted = True + # With sample_rate=0.5, count of 2 should become 4 (2 / 0.5) + assert fake.calls[0]["value"] == 4 + break + + assert emitted, "Should have emitted at least once in 20 tries" + + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") + + def test_no_sampling_keeps_original_counts(self): + """With sample_rate=1.0, counts should not be adjusted""" + os.environ["FEAST_METRICS_SAMPLE_RATE"] = "1.0" + fake = FakeMetricsClient() + agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.record("fv__f1", FieldStatus.NOT_FOUND) + agg.emit() + + assert len(fake.calls) == 1 + assert fake.calls[0]["value"] == 2 # Original count, no adjustment + + os.environ.pop("FEAST_METRICS_SAMPLE_RATE") From 91b7530ab3d071a250b7e6cdaeea2bd10d07233c Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 5 May 2026 13:27:21 -0700 Subject: [PATCH 06/10] Updated the metric name --- go/internal/feast/metrics/lookup_metrics.go | 4 ++-- go/internal/feast/metrics/lookup_metrics_test.go | 10 +++++----- sdk/python/feast/_missing_key_metrics.py | 4 ++-- sdk/python/tests/unit/test_missing_key_metrics.py | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index 537f3f7b1a2..aeed75301b5 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -129,7 +129,7 @@ func (m *LookupMetricsAggregator) Emit() { copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("feast.feature_server.feature_lookup_not_found", adjustedCount, tags, 1.0) + m.client.Count("featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0) } for featureID, count := range m.nullOrExpired { @@ -141,6 +141,6 @@ func (m *LookupMetricsAggregator) Emit() { copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("feast.feature_server.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) + m.client.Count("featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) } } diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index de3efd10e37..71b3f0b43da 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -38,7 +38,7 @@ func TestAggregator_AllNotFound(t *testing.T) { agg.Emit() assert.Len(t, fake.calls, 1) - assert.Equal(t, "feast.feature_server.feature_lookup_not_found", fake.calls[0].name) + assert.Equal(t, "featureserver.feature_lookup_not_found", fake.calls[0].name) assert.Equal(t, int64(3), fake.calls[0].value) assert.Contains(t, fake.calls[0].tags, "feature:user_fv__age") } @@ -53,7 +53,7 @@ func TestAggregator_AllNullOrExpired(t *testing.T) { agg.Emit() assert.Len(t, fake.calls, 1) - assert.Equal(t, "feast.feature_server.feature_lookup_null_or_expired", fake.calls[0].name) + assert.Equal(t, "featureserver.feature_lookup_null_or_expired", fake.calls[0].name) assert.Equal(t, int64(3), fake.calls[0].value) assert.Contains(t, fake.calls[0].tags, "feature:order_fv__amt") } @@ -76,10 +76,10 @@ func TestAggregator_MixedStatuses(t *testing.T) { callsByName[c.name+":"+findTag(c.tags, "feature:")] = c } - nf := callsByName["feast.feature_server.feature_lookup_not_found:fv_a__f1"] + nf := callsByName["featureserver.feature_lookup_not_found:fv_a__f1"] assert.Equal(t, int64(1), nf.value) - ne := callsByName["feast.feature_server.feature_lookup_null_or_expired:fv_b__f2"] + ne := callsByName["featureserver.feature_lookup_null_or_expired:fv_b__f2"] assert.Equal(t, int64(2), ne.value) } @@ -171,7 +171,7 @@ func TestRecordFromRangeFeatureVectors(t *testing.T) { assert.Len(t, fake.calls, 1) assert.Equal(t, int64(2), fake.calls[0].value) - assert.Equal(t, "feast.feature_server.feature_lookup_not_found", fake.calls[0].name) + assert.Equal(t, "featureserver.feature_lookup_not_found", fake.calls[0].name) } func TestIsMissingKeyMetricsEnabled(t *testing.T) { diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py index 4cceed526d7..e4a16529645 100644 --- a/sdk/python/feast/_missing_key_metrics.py +++ b/sdk/python/feast/_missing_key_metrics.py @@ -89,7 +89,7 @@ def emit(self) -> None: # Adjust count to preserve accuracy when sampling adjusted_count = int(cnt * multiplier) self.metrics_client.increment( - "feast.feature_server.feature_lookup_not_found", + "featureserver.feature_lookup_not_found", adjusted_count, tags=base_tags + [ @@ -102,7 +102,7 @@ def emit(self) -> None: if cnt: adjusted_count = int(cnt * multiplier) self.metrics_client.increment( - "feast.feature_server.feature_lookup_null_or_expired", + "featureserver.feature_lookup_null_or_expired", adjusted_count, tags=base_tags + [ diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 7e738e15e0f..7f9a6799a8e 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -35,7 +35,7 @@ def test_not_found_only(self): assert len(fake.calls) == 1 assert ( - fake.calls[0]["metric"] == "feast.feature_server.feature_lookup_not_found" + fake.calls[0]["metric"] == "featureserver.feature_lookup_not_found" ) assert fake.calls[0]["value"] == 3 assert "feature:user_fv__age" in fake.calls[0]["tags"] @@ -51,7 +51,7 @@ def test_null_or_expired(self): assert len(fake.calls) == 1 assert ( fake.calls[0]["metric"] - == "feast.feature_server.feature_lookup_null_or_expired" + == "featureserver.feature_lookup_null_or_expired" ) assert fake.calls[0]["value"] == 2 @@ -76,7 +76,7 @@ def test_mixed_statuses(self): assert ( metrics_by_feature[ ( - "feast.feature_server.feature_lookup_not_found", + "featureserver.feature_lookup_not_found", "feature:fv_a__f1", ) ] @@ -85,7 +85,7 @@ def test_mixed_statuses(self): assert ( metrics_by_feature[ ( - "feast.feature_server.feature_lookup_null_or_expired", + "featureserver.feature_lookup_null_or_expired", "feature:fv_b__f2", ) ] From ff4f5c0741f89984d20d9a03b86644f5bc64af24 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 5 May 2026 13:45:31 -0700 Subject: [PATCH 07/10] style: Apply ruff formatting to test_missing_key_metrics.py Co-Authored-By: Claude Sonnet 4.5 --- sdk/python/tests/unit/test_missing_key_metrics.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 7f9a6799a8e..0b0cfb3dc83 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -34,9 +34,7 @@ def test_not_found_only(self): agg.emit() assert len(fake.calls) == 1 - assert ( - fake.calls[0]["metric"] == "featureserver.feature_lookup_not_found" - ) + assert fake.calls[0]["metric"] == "featureserver.feature_lookup_not_found" assert fake.calls[0]["value"] == 3 assert "feature:user_fv__age" in fake.calls[0]["tags"] @@ -49,10 +47,7 @@ def test_null_or_expired(self): agg.emit() assert len(fake.calls) == 1 - assert ( - fake.calls[0]["metric"] - == "featureserver.feature_lookup_null_or_expired" - ) + assert fake.calls[0]["metric"] == "featureserver.feature_lookup_null_or_expired" assert fake.calls[0]["value"] == 2 def test_mixed_statuses(self): From 23dc884974ee2f15743686949f57607b9ab00f9e Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 6 May 2026 04:23:42 -0700 Subject: [PATCH 08/10] fix: Remove explicit env and service tags from missing-key metrics Let the DD agent's global tags (DD_ENV, DD_SERVICE) handle env and service dimensions, consistent with the existing heartbeat metric. Co-Authored-By: Claude Opus 4.6 --- go/internal/feast/metrics/config.go | 20 -------------- go/internal/feast/metrics/lookup_metrics.go | 8 +----- .../feast/metrics/lookup_metrics_test.go | 11 +++----- go/internal/feast/server/grpc_server.go | 4 --- go/internal/feast/server/http_server.go | 4 --- sdk/python/feast/_missing_key_metrics.py | 6 ----- .../feast/infra/online_stores/online_store.py | 7 ----- .../tests/unit/test_missing_key_metrics.py | 26 +++++++++---------- 8 files changed, 17 insertions(+), 69 deletions(-) diff --git a/go/internal/feast/metrics/config.go b/go/internal/feast/metrics/config.go index bc9886cce57..316715e619f 100644 --- a/go/internal/feast/metrics/config.go +++ b/go/internal/feast/metrics/config.go @@ -19,26 +19,6 @@ func GetOnlineStoreType(config *registry.RepoConfig) string { return "unknown" } -func GetServiceName() string { - if svc := os.Getenv("SERVICE_NAME"); svc != "" { - return svc - } - if app := os.Getenv("APPLICATION"); app != "" { - return app - } - return "unknown_service" -} - -func GetEnvironment() string { - if env := os.Getenv("EXPEDIA_ENVIRONMENT_CATEGORY"); env != "" { - return env - } - if env := os.Getenv("EXPEDIA_ENVIRONMENT"); env != "" { - return env - } - return "unknown_env" -} - // GetStatsDAddress returns the DogStatsD address from environment variables. // Returns empty string if DD_AGENT_HOST is not set. // Port can be configured via DD_DOGSTATSD_PORT (defaults to 8125). diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index aeed75301b5..2281577a8f3 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -26,14 +26,12 @@ type LookupMetricsAggregator struct { nullOrExpired map[string]int64 project string onlineStore string - service string - env string client StatsdClient sampleRate float64 } func NewLookupMetricsAggregator( - project, onlineStore, service, env string, + project, onlineStore string, client StatsdClient, ) *LookupMetricsAggregator { if client == nil { @@ -55,8 +53,6 @@ func NewLookupMetricsAggregator( nullOrExpired: make(map[string]int64), project: project, onlineStore: onlineStore, - service: service, - env: env, client: client, sampleRate: sampleRate, } @@ -115,8 +111,6 @@ func (m *LookupMetricsAggregator) Emit() { baseTags := []string{ "project:" + m.project, "online_store_type:" + m.onlineStore, - "service:" + m.service, - "env:" + m.env, } for featureID, count := range m.notFound { diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index 71b3f0b43da..e80d903425f 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -25,7 +25,7 @@ func (f *fakeStatsdClient) Count(name string, value int64, tags []string, rate f } func newTestAggregator(client StatsdClient) *LookupMetricsAggregator { - return NewLookupMetricsAggregator("test_project", "redis", "test_service", "test", client) + return NewLookupMetricsAggregator("test_project", "redis", client) } func TestAggregator_AllNotFound(t *testing.T) { @@ -103,13 +103,13 @@ func TestAggregator_NilSafe(t *testing.T) { } func TestAggregator_NilClient(t *testing.T) { - agg := NewLookupMetricsAggregator("p", "r", "s", "e", nil) + agg := NewLookupMetricsAggregator("p", "r", nil) assert.Nil(t, agg) } func TestAggregator_Tags(t *testing.T) { fake := &fakeStatsdClient{} - agg := NewLookupMetricsAggregator("mlpfs", "eg-valkey", "ranking-fs", "dw", fake) + agg := NewLookupMetricsAggregator("mlpfs", "eg-valkey", fake) agg.Record("hotel_fv__price", serving.FieldStatus_NOT_FOUND) agg.Emit() @@ -118,8 +118,6 @@ func TestAggregator_Tags(t *testing.T) { tags := fake.calls[0].tags assert.Contains(t, tags, "project:mlpfs") assert.Contains(t, tags, "online_store_type:eg-valkey") - assert.Contains(t, tags, "service:ranking-fs") - assert.Contains(t, tags, "env:dw") assert.Contains(t, tags, "feature:hotel_fv__price") assert.Contains(t, tags, "feature_view:hotel_fv") } @@ -191,9 +189,6 @@ func TestIsMissingKeyMetricsEnabled(t *testing.T) { } func TestGetOnlineStoreType(t *testing.T) { - // Test with mock config that has "type" key - assert.Equal(t, "unknown_service", GetServiceName()) - assert.Equal(t, "unknown_env", GetEnvironment()) } func TestExtractFeatureView(t *testing.T) { diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index 322e830980a..d9a928c7edb 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -94,8 +94,6 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), - metrics.GetServiceName(), - metrics.GetEnvironment(), s.metricsClient, ) agg.RecordFromFeatureVectors(featureVectors) @@ -188,8 +186,6 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), - metrics.GetServiceName(), - metrics.GetEnvironment(), s.metricsClient, ) agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 41ce9bb57c2..73538f43dd8 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -432,8 +432,6 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), - metrics.GetServiceName(), - metrics.GetEnvironment(), s.metricsClient, ) agg.RecordFromFeatureVectors(featureVectors) @@ -648,8 +646,6 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque agg := metrics.NewLookupMetricsAggregator( s.config.Project, metrics.GetOnlineStoreType(s.config), - metrics.GetServiceName(), - metrics.GetEnvironment(), s.metricsClient, ) agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py index e4a16529645..ff9919c2db1 100644 --- a/sdk/python/feast/_missing_key_metrics.py +++ b/sdk/python/feast/_missing_key_metrics.py @@ -31,14 +31,10 @@ def __init__( self, project: str, online_store_type: str, - service: str, - env: str, metrics_client, ): self.project = project self.online_store_type = online_store_type - self.service = service or "unknown_service" - self.env = env or "unknown_env" self.metrics_client = metrics_client self.not_found: Counter = Counter() self.null_or_expired: Counter = Counter() @@ -80,8 +76,6 @@ def emit(self) -> None: base_tags: List[str] = [ f"project:{self.project}", f"online_store_type:{self.online_store_type}", - f"service:{self.service}", - f"env:{self.env}", ] for feat, cnt in self.not_found.items(): diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index ded118ab748..01b272b7bbc 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -496,17 +496,10 @@ def _emit_missing_key_metrics(config, project, response_proto): online_store_type = ( config.online_store.type if hasattr(config.online_store, "type") else "unknown" ) - service = os.getenv("SERVICE_NAME", os.getenv("APPLICATION", "unknown_service")) - env = os.getenv( - "EXPEDIA_ENVIRONMENT_CATEGORY", - os.getenv("EXPEDIA_ENVIRONMENT", "unknown_env"), - ) agg = LookupMetricsAggregator( project=project, online_store_type=online_store_type, - service=service, - env=env, metrics_client=get_metrics_client(), ) diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 0b0cfb3dc83..9b865be9a6f 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -26,7 +26,7 @@ def increment(self, metric, value=1, tags=None): class TestLookupMetricsAggregator: def test_not_found_only(self): fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) agg.record("user_fv__age", FieldStatus.NOT_FOUND) agg.record("user_fv__age", FieldStatus.NOT_FOUND) @@ -40,7 +40,7 @@ def test_not_found_only(self): def test_null_or_expired(self): fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) agg.record("order_fv__amt", FieldStatus.NULL_VALUE) agg.record("order_fv__amt", FieldStatus.OUTSIDE_MAX_AGE) @@ -52,7 +52,7 @@ def test_null_or_expired(self): def test_mixed_statuses(self): fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) agg.record("fv_a__f1", FieldStatus.PRESENT) agg.record("fv_a__f1", FieldStatus.NOT_FOUND) @@ -89,7 +89,7 @@ def test_mixed_statuses(self): def test_all_present(self): fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) agg.record("fv__f1", FieldStatus.PRESENT) agg.record("fv__f1", FieldStatus.PRESENT) @@ -98,13 +98,13 @@ def test_all_present(self): assert len(fake.calls) == 0 def test_none_client(self): - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", None) + agg = LookupMetricsAggregator("proj", "redis", None) agg.record("fv__f1", FieldStatus.NOT_FOUND) agg.emit() def test_tags(self): fake = FakeMetricsClient() - agg = LookupMetricsAggregator("mlpfs", "eg-valkey", "ranking-fs", "dw", fake) + agg = LookupMetricsAggregator("mlpfs", "eg-valkey", fake) agg.record("hotel_fv__price", FieldStatus.NOT_FOUND) agg.emit() @@ -112,8 +112,8 @@ def test_tags(self): tags = fake.calls[0]["tags"] assert "project:mlpfs" in tags assert "online_store_type:eg-valkey" in tags - assert "service:ranking-fs" in tags - assert "env:dw" in tags + assert not any(t.startswith("service:") for t in tags) + assert not any(t.startswith("env:") for t in tags) assert "feature:hotel_fv__price" in tags assert "feature_view:hotel_fv" in tags @@ -211,7 +211,7 @@ def test_default_no_sampling(self): """Default sample_rate should be 1.0 (no sampling)""" os.environ.pop("FEAST_METRICS_SAMPLE_RATE", None) fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) assert agg.sample_rate == 1.0 @@ -219,7 +219,7 @@ def test_sample_rate_from_env(self): """Should read sample_rate from environment""" os.environ["FEAST_METRICS_SAMPLE_RATE"] = "0.5" fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) assert agg.sample_rate == 0.5 os.environ.pop("FEAST_METRICS_SAMPLE_RATE") @@ -231,7 +231,7 @@ def test_invalid_sample_rate_uses_default(self): for invalid_value in test_cases: os.environ["FEAST_METRICS_SAMPLE_RATE"] = invalid_value fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) assert agg.sample_rate == 1.0, f"Failed for value: {invalid_value}" os.environ.pop("FEAST_METRICS_SAMPLE_RATE") @@ -246,7 +246,7 @@ def test_sampling_adjusts_counts(self): random.seed(0) # This makes random.random() predictable - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) agg.record("fv__f1", FieldStatus.NOT_FOUND) agg.record("fv__f1", FieldStatus.NOT_FOUND) # 2 times @@ -269,7 +269,7 @@ def test_no_sampling_keeps_original_counts(self): """With sample_rate=1.0, counts should not be adjusted""" os.environ["FEAST_METRICS_SAMPLE_RATE"] = "1.0" fake = FakeMetricsClient() - agg = LookupMetricsAggregator("proj", "redis", "svc", "test", fake) + agg = LookupMetricsAggregator("proj", "redis", fake) agg.record("fv__f1", FieldStatus.NOT_FOUND) agg.record("fv__f1", FieldStatus.NOT_FOUND) From 4a71d3b40e6eefb8839f28d55acecf8ad46c3956 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 13 May 2026 13:31:39 -0700 Subject: [PATCH 09/10] fix: Prefix missing-key metrics with mlpfs namespace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update metric names to use mlpfs.featureserver prefix for better namespace organization within ML Platform monitoring: - featureserver.feature_lookup_not_found → mlpfs.featureserver.feature_lookup_not_found - featureserver.feature_lookup_null_or_expired → mlpfs.featureserver.feature_lookup_null_or_expired Changes applied to both Go and Python implementations and their corresponding unit tests. Co-Authored-By: Claude Sonnet 4.5 --- go/internal/feast/metrics/lookup_metrics.go | 4 ++-- go/internal/feast/metrics/lookup_metrics_test.go | 10 +++++----- sdk/python/feast/_missing_key_metrics.py | 4 ++-- sdk/python/tests/unit/test_missing_key_metrics.py | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index 2281577a8f3..334b1555934 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -123,7 +123,7 @@ func (m *LookupMetricsAggregator) Emit() { copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0) + m.client.Count("mlpfs.featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0) } for featureID, count := range m.nullOrExpired { @@ -135,6 +135,6 @@ func (m *LookupMetricsAggregator) Emit() { copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) + m.client.Count("mlpfs.featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) } } diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index e80d903425f..62e26f9647e 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -38,7 +38,7 @@ func TestAggregator_AllNotFound(t *testing.T) { agg.Emit() assert.Len(t, fake.calls, 1) - assert.Equal(t, "featureserver.feature_lookup_not_found", fake.calls[0].name) + assert.Equal(t, "mlpfs.featureserver.feature_lookup_not_found", fake.calls[0].name) assert.Equal(t, int64(3), fake.calls[0].value) assert.Contains(t, fake.calls[0].tags, "feature:user_fv__age") } @@ -53,7 +53,7 @@ func TestAggregator_AllNullOrExpired(t *testing.T) { agg.Emit() assert.Len(t, fake.calls, 1) - assert.Equal(t, "featureserver.feature_lookup_null_or_expired", fake.calls[0].name) + assert.Equal(t, "mlpfs.featureserver.feature_lookup_null_or_expired", fake.calls[0].name) assert.Equal(t, int64(3), fake.calls[0].value) assert.Contains(t, fake.calls[0].tags, "feature:order_fv__amt") } @@ -76,10 +76,10 @@ func TestAggregator_MixedStatuses(t *testing.T) { callsByName[c.name+":"+findTag(c.tags, "feature:")] = c } - nf := callsByName["featureserver.feature_lookup_not_found:fv_a__f1"] + nf := callsByName["mlpfs.featureserver.feature_lookup_not_found:fv_a__f1"] assert.Equal(t, int64(1), nf.value) - ne := callsByName["featureserver.feature_lookup_null_or_expired:fv_b__f2"] + ne := callsByName["mlpfs.featureserver.feature_lookup_null_or_expired:fv_b__f2"] assert.Equal(t, int64(2), ne.value) } @@ -169,7 +169,7 @@ func TestRecordFromRangeFeatureVectors(t *testing.T) { assert.Len(t, fake.calls, 1) assert.Equal(t, int64(2), fake.calls[0].value) - assert.Equal(t, "featureserver.feature_lookup_not_found", fake.calls[0].name) + assert.Equal(t, "mlpfs.featureserver.feature_lookup_not_found", fake.calls[0].name) } func TestIsMissingKeyMetricsEnabled(t *testing.T) { diff --git a/sdk/python/feast/_missing_key_metrics.py b/sdk/python/feast/_missing_key_metrics.py index ff9919c2db1..4a9d1b756f7 100644 --- a/sdk/python/feast/_missing_key_metrics.py +++ b/sdk/python/feast/_missing_key_metrics.py @@ -83,7 +83,7 @@ def emit(self) -> None: # Adjust count to preserve accuracy when sampling adjusted_count = int(cnt * multiplier) self.metrics_client.increment( - "featureserver.feature_lookup_not_found", + "mlpfs.featureserver.feature_lookup_not_found", adjusted_count, tags=base_tags + [ @@ -96,7 +96,7 @@ def emit(self) -> None: if cnt: adjusted_count = int(cnt * multiplier) self.metrics_client.increment( - "featureserver.feature_lookup_null_or_expired", + "mlpfs.featureserver.feature_lookup_null_or_expired", adjusted_count, tags=base_tags + [ diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 9b865be9a6f..2672460c418 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -34,7 +34,7 @@ def test_not_found_only(self): agg.emit() assert len(fake.calls) == 1 - assert fake.calls[0]["metric"] == "featureserver.feature_lookup_not_found" + assert fake.calls[0]["metric"] == "mlpfs.featureserver.feature_lookup_not_found" assert fake.calls[0]["value"] == 3 assert "feature:user_fv__age" in fake.calls[0]["tags"] @@ -47,7 +47,7 @@ def test_null_or_expired(self): agg.emit() assert len(fake.calls) == 1 - assert fake.calls[0]["metric"] == "featureserver.feature_lookup_null_or_expired" + assert fake.calls[0]["metric"] == "mlpfs.featureserver.feature_lookup_null_or_expired" assert fake.calls[0]["value"] == 2 def test_mixed_statuses(self): @@ -71,7 +71,7 @@ def test_mixed_statuses(self): assert ( metrics_by_feature[ ( - "featureserver.feature_lookup_not_found", + "mlpfs.featureserver.feature_lookup_not_found", "feature:fv_a__f1", ) ] @@ -80,7 +80,7 @@ def test_mixed_statuses(self): assert ( metrics_by_feature[ ( - "featureserver.feature_lookup_null_or_expired", + "mlpfs.featureserver.feature_lookup_null_or_expired", "feature:fv_b__f2", ) ] From ab30535a4efe3b1b9c93cc660253929e98b8078f Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 13 May 2026 16:47:58 -0700 Subject: [PATCH 10/10] fix formatting --- go/internal/feast/metrics/lookup_metrics_test.go | 10 +++++----- sdk/python/tests/unit/test_missing_key_metrics.py | 5 ++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index 62e26f9647e..b4f1651bf52 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -247,11 +247,11 @@ func TestSampling_InvalidValues(t *testing.T) { value string expected float64 }{ - {"-0.5", 1.0}, // Negative - {"1.5", 1.0}, // > 1.0 - {"0", 1.0}, // Zero - {"abc", 1.0}, // Non-numeric - {"", 1.0}, // Empty (unset uses default) + {"-0.5", 1.0}, // Negative + {"1.5", 1.0}, // > 1.0 + {"0", 1.0}, // Zero + {"abc", 1.0}, // Non-numeric + {"", 1.0}, // Empty (unset uses default) } for _, tc := range testCases { diff --git a/sdk/python/tests/unit/test_missing_key_metrics.py b/sdk/python/tests/unit/test_missing_key_metrics.py index 2672460c418..fee6b166fd6 100644 --- a/sdk/python/tests/unit/test_missing_key_metrics.py +++ b/sdk/python/tests/unit/test_missing_key_metrics.py @@ -47,7 +47,10 @@ def test_null_or_expired(self): agg.emit() assert len(fake.calls) == 1 - assert fake.calls[0]["metric"] == "mlpfs.featureserver.feature_lookup_null_or_expired" + assert ( + fake.calls[0]["metric"] + == "mlpfs.featureserver.feature_lookup_null_or_expired" + ) assert fake.calls[0]["value"] == 2 def test_mixed_statuses(self):