Skip to content

Commit 1fa0360

Browse files
authored
fix: Ignore arrow conversion when transformation is not used (#313)
* fix: Ignore arrow conversion when transformation is not used * fix embedded go references to feature vector values * add tests comparing transpose with and without arrow conversion * add full-loop conversion benchmark test * fix: convert protos to go types for http response and include get features in http int test * fix http empty lists within list of values returning as null instead as empty lists
1 parent 0a2ad13 commit 1fa0360

13 files changed

Lines changed: 918 additions & 105 deletions

File tree

go/embedded/online_features.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
219219
outputFields = append(outputFields,
220220
arrow.Field{
221221
Name: featureVector.Name,
222-
Type: featureVector.Values.DataType()})
222+
Type: featureVector.Values.(arrow.Array).DataType()})
223223
outputFields = append(outputFields,
224224
arrow.Field{
225225
Name: fmt.Sprintf("%s__status", featureVector.Name),
@@ -229,7 +229,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
229229
Name: fmt.Sprintf("%s__timestamp", featureVector.Name),
230230
Type: arrow.PrimitiveTypes.Int64})
231231

232-
outputColumns = append(outputColumns, featureVector.Values)
232+
outputColumns = append(outputColumns, featureVector.Values.(arrow.Array))
233233

234234
statusColumnBuilder := array.NewInt32Builder(pool)
235235
for _, status := range featureVector.Statuses {
@@ -250,7 +250,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
250250
s.tsColumnBuildersToRelease = append(s.tsColumnBuildersToRelease, tsColumnBuilder)
251251
s.arraysToRelease = append(s.arraysToRelease, statusColumn)
252252
s.arraysToRelease = append(s.arraysToRelease, tsColumn)
253-
s.arraysToRelease = append(s.arraysToRelease, featureVector.Values)
253+
s.arraysToRelease = append(s.arraysToRelease, featureVector.Values.(arrow.Array))
254254
}
255255

256256
result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows))

go/internal/feast/featurestore.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,8 @@ func (fs *FeatureStore) GetOnlineFeatures(
265265
// Flatten channel into 1D
266266
var result []*onlineserving.FeatureVector
267267

268+
transformationRequired := len(requestedOnDemandFeatureViews) > 0
269+
268270
v2Batching := os.Getenv("ENABLE_V2_BATCHING")
269271
if v2Batching != "false" {
270272
groupedRefsV2, err := onlineserving.GroupFeatureRefsV2(requestedFeatureViews, joinKeyToEntityValues, entityNameToJoinKeyMap, fullFeatureNames, fs.onlineStore.GetDataModelType())
@@ -291,6 +293,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
291293
requestedFeatureViews,
292294
arrowMemory,
293295
numRows,
296+
transformationRequired,
294297
)
295298
if err != nil {
296299
return err
@@ -332,6 +335,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
332335
requestedFeatureViews,
333336
arrowMemory,
334337
numRows,
338+
transformationRequired,
335339
)
336340
if err != nil {
337341
return err
@@ -353,7 +357,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
353357
}
354358
}
355359

356-
if fs.transformationCallback != nil || fs.transformationService != nil {
360+
if transformationRequired {
357361
onDemandFeatures, err := transformation.AugmentResponseWithOnDemandTransforms(
358362
ctx,
359363
requestedOnDemandFeatureViews,
@@ -377,7 +381,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
377381
return nil, err
378382
}
379383

380-
entityColumns, err := onlineserving.EntitiesToFeatureVectors(joinKeyToEntityValues, arrowMemory, numRows)
384+
entityColumns, err := onlineserving.EntitiesToFeatureVectors(joinKeyToEntityValues, arrowMemory, numRows, transformationRequired)
381385
result = append(entityColumns, result...)
382386
return result, nil
383387
}
@@ -504,6 +508,7 @@ func (fs *FeatureStore) GetOnlineFeaturesRange(
504508
requestedSortedFeatureViews,
505509
arrowMemory,
506510
numRows,
511+
false,
507512
)
508513
if err != nil {
509514
return nil, err

go/internal/feast/featurestore_test.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/feast-dev/feast/go/protos/feast/core"
2626
"github.com/feast-dev/feast/go/protos/feast/serving"
2727
"github.com/feast-dev/feast/go/protos/feast/types"
28-
types2 "github.com/feast-dev/feast/go/types"
2928
"google.golang.org/protobuf/types/known/durationpb"
3029
"google.golang.org/protobuf/types/known/timestamppb"
3130
"time"
@@ -501,14 +500,15 @@ func TestGetOnlineFeaturesRange(t *testing.T) {
501500
assert.NotNil(t, accRateVector)
502501
assert.NotNil(t, convRateVector)
503502

504-
accRateValues, err := types2.ArrowValuesToProtoValues(accRateVector.RangeValues)
503+
accRateValues, err := accRateVector.GetProtoValues()
505504
assert.NoError(t, err)
506-
convRateValues, err := types2.ArrowValuesToProtoValues(convRateVector.RangeValues)
505+
convRateValues, err := convRateVector.GetProtoValues()
507506
assert.NoError(t, err)
508-
assert.Equal(t, []float64{0.91, 0.92, 0.94}, accRateValues[0].GetDoubleListVal().Val)
509-
assert.Equal(t, []float64{0.85, 0.87, 0.89}, convRateValues[0].GetDoubleListVal().Val)
510-
assert.Equal(t, []float64{0.85, 0.88}, accRateValues[1].GetDoubleListVal().Val)
511-
assert.Equal(t, []float64{0.78, 0.80}, convRateValues[1].GetDoubleListVal().Val)
507+
508+
validateRepeatedDoubleValue(t, []float64{0.91, 0.92, 0.94}, accRateValues[0])
509+
validateRepeatedDoubleValue(t, []float64{0.85, 0.87, 0.89}, convRateValues[0])
510+
validateRepeatedDoubleValue(t, []float64{0.85, 0.88}, accRateValues[1])
511+
validateRepeatedDoubleValue(t, []float64{0.78, 0.80}, convRateValues[1])
512512

513513
assert.Equal(t, 3, len(accRateVector.RangeStatuses[0]))
514514
assert.Equal(t, 3, len(convRateVector.RangeStatuses[0]))
@@ -518,6 +518,12 @@ func TestGetOnlineFeaturesRange(t *testing.T) {
518518
mockStore.AssertExpectations(t)
519519
}
520520

521+
func validateRepeatedDoubleValue(t *testing.T, expectedVals []float64, actual *types.RepeatedValue) {
522+
for i, val := range actual.GetVal() {
523+
assert.Equal(t, expectedVals[i], val.GetDoubleVal())
524+
}
525+
}
526+
521527
// This is a test helper function that mimics the core logic of FeatureStore.GetOnlineFeaturesRange
522528
// but accepts test data directly instead of using a registry
523529
// TODO: Refactor to use the real online store when the OnlineReadRange method is implemented.
@@ -601,6 +607,7 @@ func testGetOnlineFeaturesRange(
601607
sortedFeatureViews,
602608
arrowAllocator,
603609
numRows,
610+
false,
604611
)
605612
if err != nil {
606613
return nil, err

go/internal/feast/integration_tests/scylladb/http/http_integration_test.go

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
)
1919

2020
var httpServer *server.HttpServer
21+
var getOnlineFeaturesHandler http.HandlerFunc
2122
var getOnlineFeaturesRangeHandler http.HandlerFunc
2223

2324
func TestMain(m *testing.M) {
@@ -30,9 +31,10 @@ func TestMain(m *testing.M) {
3031

3132
// GetOnlineFeaturesRange Handler should be the second handler in the list returned by DefaultHttpHandlers
3233
for _, handler := range server.DefaultHttpHandlers(httpServer) {
33-
if handler.Path == "/get-online-features-range" {
34+
if handler.Path == "/get-online-features" {
35+
getOnlineFeaturesHandler = handler.HandlerFunc.(http.HandlerFunc)
36+
} else if handler.Path == "/get-online-features-range" {
3437
getOnlineFeaturesRangeHandler = handler.HandlerFunc.(http.HandlerFunc)
35-
break
3638
}
3739
}
3840

@@ -57,6 +59,58 @@ func loadResponse(fileName string) ([]byte, error) {
5759
return os.ReadFile(filePath)
5860
}
5961

62+
func TestGetOnlineFeatures_Http(t *testing.T) {
63+
requestJson := []byte(`{
64+
"features": [
65+
"all_dtypes:int_val",
66+
"all_dtypes:long_val",
67+
"all_dtypes:float_val",
68+
"all_dtypes:double_val",
69+
"all_dtypes:byte_val",
70+
"all_dtypes:string_val",
71+
"all_dtypes:timestamp_val",
72+
"all_dtypes:boolean_val",
73+
"all_dtypes:array_int_val",
74+
"all_dtypes:array_long_val",
75+
"all_dtypes:array_float_val",
76+
"all_dtypes:array_double_val",
77+
"all_dtypes:array_byte_val",
78+
"all_dtypes:array_string_val",
79+
"all_dtypes:array_timestamp_val",
80+
"all_dtypes:array_boolean_val",
81+
"all_dtypes:null_int_val",
82+
"all_dtypes:null_long_val",
83+
"all_dtypes:null_float_val",
84+
"all_dtypes:null_double_val",
85+
"all_dtypes:null_byte_val",
86+
"all_dtypes:null_string_val",
87+
"all_dtypes:null_timestamp_val",
88+
"all_dtypes:null_boolean_val",
89+
"all_dtypes:null_array_int_val",
90+
"all_dtypes:null_array_long_val",
91+
"all_dtypes:null_array_float_val",
92+
"all_dtypes:null_array_double_val",
93+
"all_dtypes:null_array_byte_val",
94+
"all_dtypes:null_array_string_val",
95+
"all_dtypes:null_array_timestamp_val",
96+
"all_dtypes:null_array_boolean_val"
97+
],
98+
"entities": {
99+
"index_id": [1, 2, 3]
100+
}
101+
}`)
102+
103+
request := httptest.NewRequest(http.MethodPost, "/get-online-features?include-metadata=true", bytes.NewBuffer(requestJson))
104+
responseRecorder := httptest.NewRecorder()
105+
106+
getOnlineFeaturesHandler.ServeHTTP(responseRecorder, request)
107+
assert.Equal(t, responseRecorder.Code, http.StatusOK, "Expected HTTP status code 200 OK response body is: %s", responseRecorder.Body.String())
108+
actual := responseRecorder.Body.String()
109+
expectedResponse, err := loadResponse("valid_get_features_response.json")
110+
require.NoError(t, err, "Failed to load expected response from file")
111+
assert.JSONEq(t, string(expectedResponse), actual, "Response body does not match expected JSON")
112+
}
113+
60114
func TestGetOnlineFeaturesRange_Http(t *testing.T) {
61115
requestJson := []byte(`{
62116
"features": [

0 commit comments

Comments
 (0)