diff --git a/charts/device-definitions-api/values-prod.yaml b/charts/device-definitions-api/values-prod.yaml index f4317e56..b8736372 100644 --- a/charts/device-definitions-api/values-prod.yaml +++ b/charts/device-definitions-api/values-prod.yaml @@ -58,3 +58,10 @@ tolerations: [] affinity: {} podDisruptionBudget: minAvailable: 1 +jobs: + - name: device-definitions-search-sync + schedule: 0 13 * * * + args: + - '-c' + - /device-definitions-api sync-device-definitions-search; CODE=$?; echo "daily device definitions typesense search sync completed"; wget -q --post-data "hello=shutdown" http://localhost:4191/shutdown &> /dev/null; exit $CODE; + diff --git a/cmd/device-definitions-api/main.go b/cmd/device-definitions-api/main.go index d5d20e4f..4ccfbaec 100644 --- a/cmd/device-definitions-api/main.go +++ b/cmd/device-definitions-api/main.go @@ -55,7 +55,6 @@ func main() { subcommands.Register(&decodeVINCmd{logger: &logger, settings: &settings}, "") subcommands.Register(&syncDeviceDefinitionSearchCmd{logger: logger, settings: settings, sender: sigSender}, "") subcommands.Register(&deleteDefinition{logger: logger, settings: settings}, "") - subcommands.Register(&syncR1CompatibiltyCmd{logger: logger, settings: settings}, "") subcommands.Register(&bulkUpdatePowertrain{logger: logger, settings: settings, sender: sigSender}, "") if len(os.Args) == 1 { diff --git a/cmd/device-definitions-api/search_indexer.go b/cmd/device-definitions-api/search_indexer.go new file mode 100644 index 00000000..8ac1b3e3 --- /dev/null +++ b/cmd/device-definitions-api/search_indexer.go @@ -0,0 +1,53 @@ +package main + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/typesense/typesense-go/typesense" + "github.com/typesense/typesense-go/typesense/api" +) + +//go:generate mockgen -source search_indexer.go -destination search_indexer_mock_test.go -package main +type SearchIndexer interface { + RecreateIndex(ctx context.Context, schema *api.CollectionSchema) error + UpsertDocuments(ctx context.Context, collectionName string, docs []SearchEntryItem) error +} + +type typesenseSearchIndexer struct { + client *typesense.Client +} + +func NewTypesenseSearchIndexer(client *typesense.Client) SearchIndexer { + return &typesenseSearchIndexer{client: client} +} + +func (t *typesenseSearchIndexer) RecreateIndex(ctx context.Context, schema *api.CollectionSchema) error { + if _, err := t.client.Collection(schema.Name).Delete(ctx); err != nil { + // Deletion failure is non-fatal (index may not exist yet) — caller logs. + fmt.Printf("RecreateIndex: delete returned %v (continuing)\n", err) + } + if _, err := t.client.Collections().Create(ctx, schema); err != nil { + return errors.Wrap(err, "failed to create collection") + } + return nil +} + +func (t *typesenseSearchIndexer) UpsertDocuments(ctx context.Context, collectionName string, docs []SearchEntryItem) error { + if len(docs) == 0 { + return nil + } + payload := make([]interface{}, 0, len(docs)) + for _, d := range docs { + payload = append(payload, d) + } + action := "upsert" + _, err := t.client.Collection(collectionName).Documents().Import(ctx, payload, &api.ImportDocumentsParams{ + Action: &action, + }) + if err != nil { + return errors.Wrap(err, "failed to import documents") + } + return nil +} diff --git a/cmd/device-definitions-api/search_indexer_mock_test.go b/cmd/device-definitions-api/search_indexer_mock_test.go new file mode 100644 index 00000000..26d30244 --- /dev/null +++ b/cmd/device-definitions-api/search_indexer_mock_test.go @@ -0,0 +1,70 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: search_indexer.go +// +// Generated by this command: +// +// mockgen -source search_indexer.go -destination search_indexer_mock_test.go -package main +// + +// Package main is a generated GoMock package. +package main + +import ( + context "context" + reflect "reflect" + + api "github.com/typesense/typesense-go/typesense/api" + gomock "go.uber.org/mock/gomock" +) + +// MockSearchIndexer is a mock of SearchIndexer interface. +type MockSearchIndexer struct { + ctrl *gomock.Controller + recorder *MockSearchIndexerMockRecorder + isgomock struct{} +} + +// MockSearchIndexerMockRecorder is the mock recorder for MockSearchIndexer. +type MockSearchIndexerMockRecorder struct { + mock *MockSearchIndexer +} + +// NewMockSearchIndexer creates a new mock instance. +func NewMockSearchIndexer(ctrl *gomock.Controller) *MockSearchIndexer { + mock := &MockSearchIndexer{ctrl: ctrl} + mock.recorder = &MockSearchIndexerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSearchIndexer) EXPECT() *MockSearchIndexerMockRecorder { + return m.recorder +} + +// RecreateIndex mocks base method. +func (m *MockSearchIndexer) RecreateIndex(ctx context.Context, schema *api.CollectionSchema) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecreateIndex", ctx, schema) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecreateIndex indicates an expected call of RecreateIndex. +func (mr *MockSearchIndexerMockRecorder) RecreateIndex(ctx, schema any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecreateIndex", reflect.TypeOf((*MockSearchIndexer)(nil).RecreateIndex), ctx, schema) +} + +// UpsertDocuments mocks base method. +func (m *MockSearchIndexer) UpsertDocuments(ctx context.Context, collectionName string, docs []SearchEntryItem) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpsertDocuments", ctx, collectionName, docs) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpsertDocuments indicates an expected call of UpsertDocuments. +func (mr *MockSearchIndexerMockRecorder) UpsertDocuments(ctx, collectionName, docs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertDocuments", reflect.TypeOf((*MockSearchIndexer)(nil).UpsertDocuments), ctx, collectionName, docs) +} diff --git a/cmd/device-definitions-api/sync_device_definition_search.go b/cmd/device-definitions-api/sync_device_definition_search.go index 56653fad..24a4284d 100644 --- a/cmd/device-definitions-api/sync_device_definition_search.go +++ b/cmd/device-definitions-api/sync_device_definition_search.go @@ -4,9 +4,8 @@ import ( "context" "flag" "fmt" - "time" - models2 "github.com/DIMO-Network/device-definitions-api/internal/core/models" + coremodels "github.com/DIMO-Network/device-definitions-api/internal/core/models" "github.com/DIMO-Network/device-definitions-api/internal/infrastructure/gateways" "github.com/DIMO-Network/device-definitions-api/internal/infrastructure/sender" stringutils "github.com/DIMO-Network/shared/pkg/strings" @@ -23,7 +22,12 @@ import ( "github.com/typesense/typesense-go/typesense/api/pointer" ) -// syncDeviceDefinitionSearchCmd cli command to sync to typesense +const ( + minSearchYear = 2007 + tablelandPageSize = 500 + searchDefaultScore = 1 +) + type syncDeviceDefinitionSearchCmd struct { logger zerolog.Logger settings config.Settings @@ -44,7 +48,6 @@ func (p *syncDeviceDefinitionSearchCmd) SetFlags(f *flag.FlagSet) { f.BoolVar(&p.createIndex, "create-index", false, "create or recreate index") } -// nolint func (p *syncDeviceDefinitionSearchCmd) Execute(ctx context.Context, _ *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { pdb := db.NewDbConnectionFromSettings(ctx, &p.settings.DB, true) pdb.WaitForDB(p.logger) @@ -53,214 +56,136 @@ func (p *syncDeviceDefinitionSearchCmd) Execute(ctx context.Context, _ *flag.Fla if err != nil { p.logger.Fatal().Err(err).Msg("Failed to create Ethereum client.") } - chainID, err := ethClient.ChainID(ctx) if err != nil { p.logger.Fatal().Err(err).Msg("Couldn't retrieve chain id.") } - //queryInstance, err := contracts.NewRegistry(p.settings.EthereumRegistryAddress, ethClient) - //if err != nil { - // p.logger.Fatal().Err(err).Msg("Failed to create registry query instance.") - //} onChainSvc := gateways.NewDeviceDefinitionOnChainService(&p.settings, &p.logger, ethClient, chainID, p.sender, pdb.DBS) + identity := gateways.NewIdentityAPIService(&p.logger, &p.settings) client := typesense.NewClient( typesense.WithServer(p.settings.SearchServiceAPIURL.String()), typesense.WithAPIKey(p.settings.SearchServiceAPIKey)) + indexer := NewTypesenseSearchIndexer(client) collectionName := p.settings.SearchServiceIndexName - identity := gateways.NewIdentityAPIService(&p.logger, &p.settings) - if p.createIndex { - - _, err := client.Collection(collectionName).Delete(context.Background()) - if err != nil { - p.logger.Error().Err(err).Send() - } - fmt.Println("Successfully deleted index: " + collectionName) - - hasFacet := true - nestedFields := false - sort := true - schema := &api.CollectionSchema{ - Name: collectionName, - EnableNestedFields: &nestedFields, - DefaultSortingField: pointer.String("score"), - Fields: []api.Field{ - { - Name: "device_definition_id", - Type: "string", - }, - { - Name: "name", - Type: "string", - Sort: &sort, - }, - { - Name: "make", - Type: "string", - Facet: &hasFacet, - }, - { - Name: "make_slug", - Type: "string", - Facet: &hasFacet, - }, - { - Name: "make_token_id", - Type: "int32", - Facet: &hasFacet, - }, - { - Name: "model", - Type: "string", - Facet: &hasFacet, - }, - { - Name: "model_slug", - Type: "string", - Facet: &hasFacet, - }, - { - Name: "year", - Type: "int32", - Facet: &hasFacet, - }, - { - Name: "image_url", - Type: "string", - }, - { - Name: "score", - Type: "int32", - }, - { - Name: "definition_id", - Type: "string", - }, - }, - } - _, err = client.Collections().Create(context.Background(), schema) - if err != nil { + if err := indexer.RecreateIndex(ctx, deviceDefinitionSearchSchema(collectionName)); err != nil { p.logger.Error().Err(err).Send() return subcommands.ExitFailure } - fmt.Printf("Index %s created\n", collectionName) } - fmt.Printf("Starting processing definitions\n") + if err := runSearchSync(ctx, identity, onChainSvc, indexer, collectionName); err != nil { + p.logger.Error().Err(err).Msg("sync failed") + return subcommands.ExitFailure + } + fmt.Print("Index Updated") + return subcommands.ExitSuccess +} +// runSearchSync fetches every manufacturer, builds its definition documents, and +// upserts them to the search index one manufacturer at a time. This caps the +// steady-state memory footprint to a single manufacturer's definitions rather +// than the full catalog. +func runSearchSync( + ctx context.Context, + identity gateways.IdentityAPI, + onChainSvc gateways.DeviceDefinitionOnChainService, + indexer SearchIndexer, + collectionName string, +) error { makes, err := identity.GetManufacturers() if err != nil { - p.logger.Fatal().Err(err).Send() + return fmt.Errorf("get manufacturers: %w", err) } + fmt.Printf("Found %d manufacturers\n", len(makes)) - fmt.Printf("Found %d manufacturers in db\n", len(makes)) - - var documents []SearchEntryItem - // iterate over all makes, then query tableland for _, dm := range makes { - manufacturer, err := onChainSvc.GetManufacturer(stringutils.SlugString(dm.Name)) + docs, err := buildManufacturerDocuments(ctx, onChainSvc, dm) if err != nil { - p.logger.Fatal().Err(err).Send() + return fmt.Errorf("build documents for %s: %w", dm.Name, err) } - - definitions, err := fetchAllDefinitions(ctx, onChainSvc, manufacturer.TokenID, "") - if err != nil { - p.logger.Fatal().Err(err).Send() + if len(docs) == 0 { + fmt.Printf("%s: no definitions to sync\n", dm.Name) + continue } - - for _, dd := range definitions { - id := dd.ID - deviceDefinitionID := dd.ID - name := common.BuildDeviceDefinitionName(int16(dd.Year), dm.Name, dd.Model) - imageUrl := dd.ImageURI - modelName := dd.Model - modelSlug := stringutils.SlugString(dd.Model) - - year := dd.Year - if year < 2007 { - continue // do not sync old cars into search - } - - makeName := dm.Name - makeSlug := stringutils.SlugString(dm.Name) - manufacturerTokenID := manufacturer.TokenID - - newDocument := SearchEntryItem{ - ID: id, - DeviceDefinitionID: deviceDefinitionID, - DefinitionID: dd.ID, - Name: name, - Make: makeName, - MakeSlug: makeSlug, - ManufacturerTokenID: manufacturerTokenID, - Model: modelName, - ModelSlug: modelSlug, - Year: year, - ImageURL: imageUrl, - Score: 1, - } - - documents = append(documents, newDocument) + if err := indexer.UpsertDocuments(ctx, collectionName, docs); err != nil { + return fmt.Errorf("upsert %s: %w", dm.Name, err) } - definitions = nil // will this help rambo the gc? - fmt.Printf("loaded %d definitions from %s \n", len(documents), manufacturer.Name) + fmt.Printf("%s: upserted %d definitions\n", dm.Name, len(docs)) } - - err = uploadWithAPI(client, documents, p.settings.SearchServiceIndexName) - - fmt.Print("Index Updated") - return subcommands.ExitSuccess + return nil } -func fetchAllDefinitions(ctx context.Context, onChainSvc gateways.DeviceDefinitionOnChainService, manufacturerID int, whereClause string) ([]models2.DeviceDefinitionTablelandModel, error) { - pageIndex := 0 - var allDefinitions []models2.DeviceDefinitionTablelandModel +// buildManufacturerDocuments pulls every tableland definition for a manufacturer +// and converts the ones from model year >= minSearchYear into SearchEntryItems. +func buildManufacturerDocuments( + ctx context.Context, + onChainSvc gateways.DeviceDefinitionOnChainService, + dm coremodels.Manufacturer, +) ([]SearchEntryItem, error) { + makeSlug := stringutils.SlugString(dm.Name) + var docs []SearchEntryItem + pageIndex := 0 for { - definitions, err := onChainSvc.QueryDefinitionsCustom(ctx, manufacturerID, whereClause, pageIndex) + page, err := onChainSvc.QueryDefinitionsCustom(ctx, dm.TokenID, "", pageIndex) if err != nil { return nil, err } - - // Append the current page of definitions to allDefinitions. - allDefinitions = append(allDefinitions, definitions...) - - // If you receive less than 50 results then you've likely reached the end. - if len(definitions) < 50 { + for _, dd := range page { + if dd.Year < minSearchYear { + continue + } + docs = append(docs, SearchEntryItem{ + ID: dd.ID, + DeviceDefinitionID: dd.ID, + DefinitionID: dd.ID, + Name: common.BuildDeviceDefinitionName(int16(dd.Year), dm.Name, dd.Model), + Make: dm.Name, + MakeSlug: makeSlug, + ManufacturerTokenID: dm.TokenID, + Model: dd.Model, + ModelSlug: stringutils.SlugString(dd.Model), + Year: dd.Year, + ImageURL: dd.ImageURI, + Score: searchDefaultScore, + }) + } + if len(page) < tablelandPageSize { break } - - // Move to the next page. pageIndex++ } - - return allDefinitions, nil + return docs, nil } -func uploadWithAPI(client *typesense.Client, entries []SearchEntryItem, collectionName string) error { - processedCount := 0 - for _, entry := range entries { - processedCount++ - _, err := client.Collection(collectionName).Documents().Upsert(context.Background(), entry) - if err != nil { - fmt.Printf("Error uploading entry: %v\n Retrying...", err) - time.Sleep(1000) - _, err = client.Collection(collectionName).Documents().Upsert(context.Background(), entry) - // todo fancier retry - if err != nil { - return err - } - } - if processedCount%100 == 0 { - fmt.Printf("Uploaded %d definitionIds to Typesense search.\n", processedCount) - } +func deviceDefinitionSearchSchema(collectionName string) *api.CollectionSchema { + hasFacet := true + nestedFields := false + sort := true + return &api.CollectionSchema{ + Name: collectionName, + EnableNestedFields: &nestedFields, + DefaultSortingField: pointer.String("score"), + Fields: []api.Field{ + {Name: "device_definition_id", Type: "string"}, + {Name: "name", Type: "string", Sort: &sort}, + {Name: "make", Type: "string", Facet: &hasFacet}, + {Name: "make_slug", Type: "string", Facet: &hasFacet}, + {Name: "make_token_id", Type: "int32", Facet: &hasFacet}, + {Name: "model", Type: "string", Facet: &hasFacet}, + {Name: "model_slug", Type: "string", Facet: &hasFacet}, + {Name: "year", Type: "int32", Facet: &hasFacet}, + {Name: "image_url", Type: "string"}, + {Name: "score", Type: "int32"}, + {Name: "definition_id", Type: "string"}, + }, } - return nil } type SearchEntryItem struct { diff --git a/cmd/device-definitions-api/sync_device_definition_search_test.go b/cmd/device-definitions-api/sync_device_definition_search_test.go new file mode 100644 index 00000000..1f6dd0eb --- /dev/null +++ b/cmd/device-definitions-api/sync_device_definition_search_test.go @@ -0,0 +1,214 @@ +package main + +import ( + "context" + "errors" + "testing" + + coremodels "github.com/DIMO-Network/device-definitions-api/internal/core/models" + mock_gateways "github.com/DIMO-Network/device-definitions-api/internal/infrastructure/gateways/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func defToyota(year int, model, id string) coremodels.DeviceDefinitionTablelandModel { + return coremodels.DeviceDefinitionTablelandModel{ + ID: id, + Model: model, + Year: year, + ImageURI: "https://img/" + id, + } +} + +func TestBuildManufacturerDocuments_FiltersPre2007(t *testing.T) { + ctrl := gomock.NewController(t) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + dm := coremodels.Manufacturer{TokenID: 42, Name: "Toyota"} + + onChain.EXPECT(). + QueryDefinitionsCustom(gomock.Any(), 42, "", 0). + Return([]coremodels.DeviceDefinitionTablelandModel{ + defToyota(2006, "Camry", "id-old"), + defToyota(2007, "Camry", "id-keep"), + defToyota(2020, "Prius", "id-new"), + }, nil) + + docs, err := buildManufacturerDocuments(context.Background(), onChain, dm) + require.NoError(t, err) + require.Len(t, docs, 2) + assert.Equal(t, "id-keep", docs[0].ID) + assert.Equal(t, "id-new", docs[1].ID) +} + +func TestBuildManufacturerDocuments_PopulatesFields(t *testing.T) { + ctrl := gomock.NewController(t) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + dm := coremodels.Manufacturer{TokenID: 7, Name: "Land Rover"} + + onChain.EXPECT(). + QueryDefinitionsCustom(gomock.Any(), 7, "", 0). + Return([]coremodels.DeviceDefinitionTablelandModel{ + {ID: "ddid-1", Model: "Range Rover", Year: 2021, ImageURI: "https://img/rr"}, + }, nil) + + docs, err := buildManufacturerDocuments(context.Background(), onChain, dm) + require.NoError(t, err) + require.Len(t, docs, 1) + + d := docs[0] + assert.Equal(t, "ddid-1", d.ID) + assert.Equal(t, "ddid-1", d.DeviceDefinitionID) + assert.Equal(t, "ddid-1", d.DefinitionID) + assert.Equal(t, "Land Rover", d.Make) + assert.Equal(t, "land-rover", d.MakeSlug) + assert.Equal(t, 7, d.ManufacturerTokenID) + assert.Equal(t, "Range Rover", d.Model) + assert.Equal(t, "range-rover", d.ModelSlug) + assert.Equal(t, 2021, d.Year) + assert.Equal(t, "2021 Land Rover Range Rover", d.Name) + assert.Equal(t, "https://img/rr", d.ImageURL) + assert.Equal(t, searchDefaultScore, d.Score) +} + +func TestBuildManufacturerDocuments_TerminatesOnShortPage(t *testing.T) { + ctrl := gomock.NewController(t) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + dm := coremodels.Manufacturer{TokenID: 1, Name: "Honda"} + + // 10 rows on page 0 is < 500 → loop should break without requesting page 1. + page := make([]coremodels.DeviceDefinitionTablelandModel, 10) + for i := range page { + page[i] = defToyota(2020, "Civic", "id") + } + onChain.EXPECT(). + QueryDefinitionsCustom(gomock.Any(), 1, "", 0). + Return(page, nil). + Times(1) + + docs, err := buildManufacturerDocuments(context.Background(), onChain, dm) + require.NoError(t, err) + assert.Len(t, docs, 10) +} + +func TestBuildManufacturerDocuments_PagesUntilShortPage(t *testing.T) { + ctrl := gomock.NewController(t) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + dm := coremodels.Manufacturer{TokenID: 1, Name: "Honda"} + + full := make([]coremodels.DeviceDefinitionTablelandModel, tablelandPageSize) + for i := range full { + full[i] = defToyota(2020, "Civic", "id") + } + short := []coremodels.DeviceDefinitionTablelandModel{defToyota(2020, "Accord", "id-last")} + + gomock.InOrder( + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 1, "", 0).Return(full, nil), + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 1, "", 1).Return(full, nil), + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 1, "", 2).Return(short, nil), + ) + + docs, err := buildManufacturerDocuments(context.Background(), onChain, dm) + require.NoError(t, err) + assert.Len(t, docs, 2*tablelandPageSize+1) +} + +func TestBuildManufacturerDocuments_PropagatesError(t *testing.T) { + ctrl := gomock.NewController(t) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + dm := coremodels.Manufacturer{TokenID: 1, Name: "Honda"} + + boom := errors.New("tableland down") + onChain.EXPECT(). + QueryDefinitionsCustom(gomock.Any(), 1, "", 0). + Return(nil, boom) + + _, err := buildManufacturerDocuments(context.Background(), onChain, dm) + require.ErrorIs(t, err, boom) +} + +func TestRunSearchSync_FlushesPerManufacturer(t *testing.T) { + ctrl := gomock.NewController(t) + identity := mock_gateways.NewMockIdentityAPI(ctrl) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + indexer := NewMockSearchIndexer(ctrl) + + identity.EXPECT().GetManufacturers().Return([]coremodels.Manufacturer{ + {TokenID: 1, Name: "Honda"}, + {TokenID: 2, Name: "Toyota"}, + }, nil) + + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 1, "", 0). + Return([]coremodels.DeviceDefinitionTablelandModel{defToyota(2020, "Civic", "h1")}, nil) + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 2, "", 0). + Return([]coremodels.DeviceDefinitionTablelandModel{defToyota(2020, "Camry", "t1")}, nil) + + // One upsert per manufacturer, each with exactly that make's docs. + indexer.EXPECT().UpsertDocuments(gomock.Any(), "dd-search", gomock.Any()). + DoAndReturn(func(_ context.Context, _ string, docs []SearchEntryItem) error { + require.Len(t, docs, 1) + assert.Equal(t, "Honda", docs[0].Make) + return nil + }) + indexer.EXPECT().UpsertDocuments(gomock.Any(), "dd-search", gomock.Any()). + DoAndReturn(func(_ context.Context, _ string, docs []SearchEntryItem) error { + require.Len(t, docs, 1) + assert.Equal(t, "Toyota", docs[0].Make) + return nil + }) + + err := runSearchSync(context.Background(), identity, onChain, indexer, "dd-search") + require.NoError(t, err) +} + +func TestRunSearchSync_SkipsMakeWithNoEligibleDefs(t *testing.T) { + ctrl := gomock.NewController(t) + identity := mock_gateways.NewMockIdentityAPI(ctrl) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + indexer := NewMockSearchIndexer(ctrl) + + identity.EXPECT().GetManufacturers().Return([]coremodels.Manufacturer{ + {TokenID: 9, Name: "Studebaker"}, + }, nil) + + // All pre-2007 → filtered out → builder returns zero docs. + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 9, "", 0). + Return([]coremodels.DeviceDefinitionTablelandModel{defToyota(1950, "Champion", "s1")}, nil) + + // No UpsertDocuments expectation → gomock will fail the test if it's called. + + err := runSearchSync(context.Background(), identity, onChain, indexer, "dd-search") + require.NoError(t, err) +} + +func TestRunSearchSync_PropagatesManufacturersError(t *testing.T) { + ctrl := gomock.NewController(t) + identity := mock_gateways.NewMockIdentityAPI(ctrl) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + indexer := NewMockSearchIndexer(ctrl) + + boom := errors.New("identity down") + identity.EXPECT().GetManufacturers().Return(nil, boom) + + err := runSearchSync(context.Background(), identity, onChain, indexer, "dd-search") + require.ErrorIs(t, err, boom) +} + +func TestRunSearchSync_PropagatesUpsertError(t *testing.T) { + ctrl := gomock.NewController(t) + identity := mock_gateways.NewMockIdentityAPI(ctrl) + onChain := mock_gateways.NewMockDeviceDefinitionOnChainService(ctrl) + indexer := NewMockSearchIndexer(ctrl) + + identity.EXPECT().GetManufacturers().Return([]coremodels.Manufacturer{ + {TokenID: 1, Name: "Honda"}, + }, nil) + onChain.EXPECT().QueryDefinitionsCustom(gomock.Any(), 1, "", 0). + Return([]coremodels.DeviceDefinitionTablelandModel{defToyota(2020, "Civic", "h1")}, nil) + + boom := errors.New("typesense down") + indexer.EXPECT().UpsertDocuments(gomock.Any(), "dd-search", gomock.Any()).Return(boom) + + err := runSearchSync(context.Background(), identity, onChain, indexer, "dd-search") + require.ErrorIs(t, err, boom) +} diff --git a/cmd/device-definitions-api/sync_r1_compatibility.go b/cmd/device-definitions-api/sync_r1_compatibility.go deleted file mode 100644 index b0060247..00000000 --- a/cmd/device-definitions-api/sync_r1_compatibility.go +++ /dev/null @@ -1,197 +0,0 @@ -//nolint:tagliatelle -package main - -import ( - "context" - "flag" - "fmt" - - "github.com/DIMO-Network/device-definitions-api/internal/config" - "github.com/DIMO-Network/device-definitions-api/internal/core/queries" - "github.com/google/subcommands" - "github.com/pkg/errors" - "github.com/rs/zerolog" - "github.com/typesense/typesense-go/typesense" - "github.com/typesense/typesense-go/typesense/api" - "github.com/typesense/typesense-go/typesense/api/pointer" -) - -const typeSenseR1Index = "r1_compatibility" - -// syncR1CompatibiltyCmd cli command to sync from google spreadsheet to Typesense search index for R1 compatibilty -type syncR1CompatibiltyCmd struct { - logger zerolog.Logger - settings config.Settings - - createIndex bool - oemFilter string -} - -func (*syncR1CompatibiltyCmd) Name() string { return "sync-r1-compatibilty" } -func (*syncR1CompatibiltyCmd) Synopsis() string { - return "sync r1 google spreadsheet to typesense search" -} -func (*syncR1CompatibiltyCmd) Usage() string { - return `sync-r1-compatibilty` -} - -func (p *syncR1CompatibiltyCmd) SetFlags(f *flag.FlagSet) { - f.BoolVar(&p.createIndex, "create-index", false, "create or recreate index") - f.StringVar(&p.oemFilter, "oem-filter", "", "oem filter") -} - -func (p *syncR1CompatibiltyCmd) Execute(ctx context.Context, _ *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { - // get data from sheet using google sheets - qh := queries.NewCompatibilityR1SheetQueryHandler(&p.settings) - compatibilityRows, err := qh.Handle(ctx, nil) - if err != nil { - p.logger.Fatal().Err(err).Msg("error fetching compatibility sheet data") - return subcommands.ExitFailure - } - sheetData := compatibilityRows.([]queries.CompatibilitySheetRow) - - fmt.Printf("Fetched %d records\n", len(sheetData)) - - client := typesense.NewClient( - typesense.WithServer(p.settings.SearchServiceAPIURL.String()), - typesense.WithAPIKey(p.settings.SearchServiceAPIKey)) - - // get optional Make filter from cmd line - if len(p.oemFilter) > 1 { - fmt.Println("Make filter used: " + p.oemFilter) - } - searchEntries := make([]queries.GetR1SearchEntryItem, 0) - - // Step 2: Check each definitionId via GraphQL - processedCount := 0 - for _, item := range sheetData { - if p.oemFilter != "" { - if p.oemFilter != item.Make { - continue - } - } - processedCount++ - - entry := queries.GetR1SearchEntryItem{ - DefinitionID: item.DefinitionID, - Make: item.Make, - Model: item.Model, - Year: item.Year, - Compatible: item.Compatible, - Name: fmt.Sprintf("%s %s %d", item.Make, item.Model, item.Year), - } - - searchEntries = append(searchEntries, entry) - } - fmt.Printf("Processed %d definitionIds. Uploading items...\n", processedCount) - if p.createIndex { - err := createR1CompatibilityIndex(p.logger, client, typeSenseR1Index) - if err != nil { - p.logger.Fatal().Msgf("error creating index: %v", err) - } - p.logger.Info().Msg("index created: " + typeSenseR1Index) - } - err = uploadR1EntriesWithAPI(ctx, client, searchEntries) - if err != nil { - p.logger.Fatal().Msgf("error uploading to Typesense: %v", err) - } - - p.logger.Info().Msg("completed syncing ruptela compatibility search") - return subcommands.ExitSuccess -} - -func uploadR1EntriesWithAPI(ctx context.Context, client *typesense.Client, entries []queries.GetR1SearchEntryItem) error { - //processedCount := 0 - action := "upsert" - var interfaceSlice []interface{} - for _, entry := range entries { - // some validation - if entry.DefinitionID != "" && entry.Make != "" && entry.Model != "" && entry.Name != "" && entry.Compatible != "" { - interfaceSlice = append(interfaceSlice, entry) - } - } - - responses, err := client.Collection(typeSenseR1Index).Documents().Import(ctx, interfaceSlice, &api.ImportDocumentsParams{ - Action: &action, - BatchSize: nil, - DirtyValues: nil, - RemoteEmbeddingBatchSize: nil, - }) - if err != nil { - return errors.Wrap(err, "failed to import documents") - } - - fmt.Printf("Uploaded %d definitions to Typesense search.\n", len(responses)) - - //for _, entry := range entries { - // processedCount++ - // - // _, err := client.Collection(typeSenseR1Index).Documents().Upsert(ctx, entry) - // if err != nil { - // fmt.Printf("Error uploading entry: %v\n Retrying...", err) - // time.Sleep(1000) - // _, err = client.Collection(typeSenseR1Index).Documents().Upsert(ctx, entry) - // // todo fancier retry - // if err != nil { - // return err - // } - // } - // if processedCount%100 == 0 { - // fmt.Printf("Uploaded %d definitionIds to Typesense search.\n", processedCount) - // } - //} - return nil -} - -func createR1CompatibilityIndex(logger zerolog.Logger, client *typesense.Client, collectionName string) error { - _, err := client.Collection(collectionName).Delete(context.Background()) - if err != nil { - logger.Error().Err(err).Send() - } - fmt.Println("Successfully deleted index: " + collectionName) - - hasFacet := true - nestedFields := false - sort := true - schema := &api.CollectionSchema{ - Name: collectionName, - EnableNestedFields: &nestedFields, - DefaultSortingField: pointer.String("year"), - Fields: []api.Field{ - { - // this will hold the device_definition_id - must be called id for typesense upsert to work - Name: "id", - Type: "string", - }, - { - Name: "name", - Type: "string", - Sort: &sort, - }, - { - Name: "make", - Type: "string", - Facet: &hasFacet, - }, - { - Name: "model", - Type: "string", - Facet: &hasFacet, - }, - { - Name: "year", - Type: "int32", - Facet: &hasFacet, - }, - { - Name: "compatible", - Type: "string", - }, - }, - } - _, err = client.Collections().Create(context.Background(), schema) - if err != nil { - return errors.Wrap(err, "failed to create collection") - } - return nil -} diff --git a/internal/infrastructure/gateways/device_definition_on_chain_service.go b/internal/infrastructure/gateways/device_definition_on_chain_service.go index ab11bf68..e7c0443f 100644 --- a/internal/infrastructure/gateways/device_definition_on_chain_service.go +++ b/internal/infrastructure/gateways/device_definition_on_chain_service.go @@ -233,7 +233,8 @@ func (e *deviceDefinitionOnChainService) QueryDefinitionsCustom(ctx context.Cont return nil, fmt.Errorf("tableName cannot be empty for manufacturer token id %d", manufacturerID) } - statement := fmt.Sprintf("SELECT * FROM %s %s LIMIT %d OFFSET %d", tableName, whereClause, 500, pageIndex) + const pageSize = 500 + statement := fmt.Sprintf("SELECT * FROM %s %s LIMIT %d OFFSET %d", tableName, whereClause, pageSize, pageIndex*pageSize) queryParams := map[string]string{ "statement": statement,