Skip to content

Commit c31cfb8

Browse files
feat: Add missing-key metrics for online feature lookups (#359)
* feat: Add missing-key metrics for online feature lookups (Go + Python) Emit DogStatsD counters (feature_lookup_not_found, feature_lookup_null_or_expired) when online store reads return NOT_FOUND, NULL_VALUE, or OUTSIDE_MAX_AGE statuses. Gated behind ENABLE_MISSING_KEY_METRICS env var
1 parent 6df7530 commit c31cfb8

16 files changed

Lines changed: 1152 additions & 54 deletions

go/embedded/online_features.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int,
311311
if err != nil {
312312
return err
313313
}
314-
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
314+
ser := server.NewGrpcServingServiceServer(s.fs, loggingService, nil, nil)
315315
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
316316
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
317317
if err != nil {
@@ -366,7 +366,7 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int,
366366
if err != nil {
367367
return err
368368
}
369-
ser := server.NewHttpServer(s.fs, loggingService)
369+
ser := server.NewHttpServer(s.fs, loggingService, nil, nil)
370370
log.Printf("Starting a HTTP server on host %s port %d\n", host, port)
371371

372372
go func() {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package metrics
2+
3+
// StatsdClient wraps DogStatsD so tests can inject a fake.
4+
// The real github.com/DataDog/datadog-go/v5/statsd.Client satisfies this interface.
5+
type StatsdClient interface {
6+
Count(name string, value int64, tags []string, rate float64) error
7+
}
8+
9+
// NoOpStatsdClient does nothing when metrics are disabled.
10+
type NoOpStatsdClient struct{}
11+
12+
func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil }
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package metrics
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"strings"
7+
8+
"github.com/feast-dev/feast/go/internal/feast/registry"
9+
)
10+
11+
func IsMissingKeyMetricsEnabled() bool {
12+
return strings.ToLower(os.Getenv("ENABLE_MISSING_KEY_METRICS")) == "true"
13+
}
14+
15+
func GetOnlineStoreType(config *registry.RepoConfig) string {
16+
if storeType, ok := config.OnlineStore["type"]; ok {
17+
return fmt.Sprintf("%v", storeType)
18+
}
19+
return "unknown"
20+
}
21+
22+
// GetStatsDAddress returns the DogStatsD address from environment variables.
23+
// Returns empty string if DD_AGENT_HOST is not set.
24+
// Port can be configured via DD_DOGSTATSD_PORT (defaults to 8125).
25+
func GetStatsDAddress() string {
26+
host := os.Getenv("DD_AGENT_HOST")
27+
if host == "" {
28+
return ""
29+
}
30+
31+
port := os.Getenv("DD_DOGSTATSD_PORT")
32+
if port == "" {
33+
port = "8125"
34+
}
35+
36+
return fmt.Sprintf("%s:%s", host, port)
37+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package metrics
2+
3+
import (
4+
"os"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestGetStatsDAddress(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
host string
14+
port string
15+
expected string
16+
}{
17+
{
18+
name: "no env vars set",
19+
host: "",
20+
port: "",
21+
expected: "",
22+
},
23+
{
24+
name: "only host set, default port",
25+
host: "datadog-agent",
26+
port: "",
27+
expected: "datadog-agent:8125",
28+
},
29+
{
30+
name: "host and custom port",
31+
host: "datadog-agent",
32+
port: "9125",
33+
expected: "datadog-agent:9125",
34+
},
35+
{
36+
name: "localhost with default port",
37+
host: "localhost",
38+
port: "",
39+
expected: "localhost:8125",
40+
},
41+
{
42+
name: "IP address with custom port",
43+
host: "10.0.0.5",
44+
port: "8126",
45+
expected: "10.0.0.5:8126",
46+
},
47+
}
48+
49+
for _, tt := range tests {
50+
t.Run(tt.name, func(t *testing.T) {
51+
// Set env vars
52+
if tt.host != "" {
53+
os.Setenv("DD_AGENT_HOST", tt.host)
54+
defer os.Unsetenv("DD_AGENT_HOST")
55+
} else {
56+
os.Unsetenv("DD_AGENT_HOST")
57+
}
58+
59+
if tt.port != "" {
60+
os.Setenv("DD_DOGSTATSD_PORT", tt.port)
61+
defer os.Unsetenv("DD_DOGSTATSD_PORT")
62+
} else {
63+
os.Unsetenv("DD_DOGSTATSD_PORT")
64+
}
65+
66+
result := GetStatsDAddress()
67+
assert.Equal(t, tt.expected, result)
68+
})
69+
}
70+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package metrics
2+
3+
import (
4+
"math/rand"
5+
"os"
6+
"strconv"
7+
"strings"
8+
9+
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
10+
"github.com/feast-dev/feast/go/protos/feast/serving"
11+
)
12+
13+
// extractFeatureView extracts the feature view name from a full feature name.
14+
// Feature names follow the format: feature_view__feature_name
15+
// Example: "hotel_fv__price" -> "hotel_fv"
16+
func extractFeatureView(featureName string) string {
17+
parts := strings.SplitN(featureName, "__", 2)
18+
if len(parts) == 2 {
19+
return parts[0]
20+
}
21+
return "unknown"
22+
}
23+
24+
type LookupMetricsAggregator struct {
25+
notFound map[string]int64
26+
nullOrExpired map[string]int64
27+
project string
28+
onlineStore string
29+
client StatsdClient
30+
sampleRate float64
31+
}
32+
33+
func NewLookupMetricsAggregator(
34+
project, onlineStore string,
35+
client StatsdClient,
36+
) *LookupMetricsAggregator {
37+
if client == nil {
38+
return nil
39+
}
40+
41+
// Read sampling rate from environment (default: 1.0 = no sampling)
42+
sampleRate := 1.0
43+
if rateStr := os.Getenv("FEAST_METRICS_SAMPLE_RATE"); rateStr != "" {
44+
if rate, err := strconv.ParseFloat(rateStr, 64); err == nil {
45+
if rate > 0 && rate <= 1.0 {
46+
sampleRate = rate
47+
}
48+
}
49+
}
50+
51+
return &LookupMetricsAggregator{
52+
notFound: make(map[string]int64),
53+
nullOrExpired: make(map[string]int64),
54+
project: project,
55+
onlineStore: onlineStore,
56+
client: client,
57+
sampleRate: sampleRate,
58+
}
59+
}
60+
61+
func (m *LookupMetricsAggregator) Record(featureID string, status serving.FieldStatus) {
62+
if m == nil {
63+
return
64+
}
65+
switch status {
66+
case serving.FieldStatus_NOT_FOUND:
67+
m.notFound[featureID]++
68+
case serving.FieldStatus_NULL_VALUE, serving.FieldStatus_OUTSIDE_MAX_AGE:
69+
m.nullOrExpired[featureID]++
70+
}
71+
}
72+
73+
func (m *LookupMetricsAggregator) RecordFromFeatureVectors(vectors []*onlineserving.FeatureVector) {
74+
if m == nil {
75+
return
76+
}
77+
for _, vector := range vectors {
78+
for _, status := range vector.Statuses {
79+
m.Record(vector.Name, status)
80+
}
81+
}
82+
}
83+
84+
func (m *LookupMetricsAggregator) RecordFromRangeFeatureVectors(vectors []*onlineserving.RangeFeatureVector) {
85+
if m == nil {
86+
return
87+
}
88+
for _, vector := range vectors {
89+
for _, entityStatuses := range vector.RangeStatuses {
90+
for _, status := range entityStatuses {
91+
m.Record(vector.Name, status)
92+
}
93+
}
94+
}
95+
}
96+
97+
func (m *LookupMetricsAggregator) Emit() {
98+
if m == nil || m.client == nil {
99+
return
100+
}
101+
102+
// Probabilistic sampling: skip this request's metrics based on sample_rate
103+
if m.sampleRate < 1.0 && rand.Float64() > m.sampleRate {
104+
return
105+
}
106+
107+
// Calculate multiplier to preserve statistical accuracy
108+
// If sampleRate=0.1, we only emit 10% of the time, so multiply counts by 10
109+
multiplier := 1.0 / m.sampleRate
110+
111+
baseTags := []string{
112+
"project:" + m.project,
113+
"online_store_type:" + m.onlineStore,
114+
}
115+
116+
for featureID, count := range m.notFound {
117+
if count == 0 {
118+
continue
119+
}
120+
// Adjust count to preserve accuracy when sampling
121+
adjustedCount := int64(float64(count) * multiplier)
122+
tags := make([]string, len(baseTags)+2)
123+
copy(tags, baseTags)
124+
tags[len(baseTags)] = "feature:" + featureID
125+
tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID)
126+
m.client.Count("mlpfs.featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0)
127+
}
128+
129+
for featureID, count := range m.nullOrExpired {
130+
if count == 0 {
131+
continue
132+
}
133+
adjustedCount := int64(float64(count) * multiplier)
134+
tags := make([]string, len(baseTags)+2)
135+
copy(tags, baseTags)
136+
tags[len(baseTags)] = "feature:" + featureID
137+
tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID)
138+
m.client.Count("mlpfs.featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0)
139+
}
140+
}

0 commit comments

Comments
 (0)