Skip to content

Commit 885852a

Browse files
committed
feat: implement GroupAssets API with ES composite aggregation
- Add GroupConfig, GroupResult, GroupField types to core/asset/discovery.go - Add GroupAssets to DiscoveryRepository interface - Implement GroupAssets in ES using composite aggregation with top_hits - Add GroupAssets gRPC handler with filter and include_fields support - Add GroupAssets to AssetService interface and service delegate - Update mocks for new interface methods
1 parent 8ecb7d9 commit 885852a

7 files changed

Lines changed: 277 additions & 0 deletions

File tree

core/asset/discovery.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,28 @@ type DiscoveryRepository interface {
1414
SoftDeleteByURN(ctx context.Context, ns *namespace.Namespace, assetURN string) error
1515
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
1616
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
17+
GroupAssets(ctx context.Context, cfg GroupConfig) ([]GroupResult, error)
18+
}
19+
20+
// GroupConfig represents configuration for grouping assets
21+
type GroupConfig struct {
22+
GroupBy []string
23+
Filters SearchFilter
24+
IncludeFields []string
25+
Size int
26+
Namespace *namespace.Namespace
27+
}
28+
29+
// GroupResult represents a single group of assets
30+
type GroupResult struct {
31+
Fields []GroupField
32+
Assets []Asset
33+
}
34+
35+
// GroupField represents a key-value pair identifying a group
36+
type GroupField struct {
37+
Key string
38+
Value string
1739
}
1840

1941
// SearchFilter is a filter intended to be used as a search

core/asset/mocks/discovery_repository.go

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/asset/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ func (s *Service) GetTypes(ctx context.Context, flt Filter) (map[Type]int, error
195195
func (s *Service) SearchAssets(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error) {
196196
return s.discoveryRepository.Search(ctx, cfg)
197197
}
198+
199+
func (s *Service) GroupAssets(ctx context.Context, cfg GroupConfig) ([]GroupResult, error) {
200+
return s.discoveryRepository.GroupAssets(ctx, cfg)
201+
}
198202
func (s *Service) SuggestAssets(ctx context.Context, cfg SearchConfig) (suggestions []string, err error) {
199203
return s.discoveryRepository.Suggest(ctx, cfg)
200204
}

internal/server/v1beta1/asset.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type AssetService interface {
3636
SuggestAssets(ctx context.Context, cfg asset.SearchConfig) (suggestions []string, err error)
3737

3838
AddProbe(ctx context.Context, ns *namespace.Namespace, assetURN string, probe *asset.Probe) error
39+
40+
GroupAssets(ctx context.Context, cfg asset.GroupConfig) ([]asset.GroupResult, error)
3941
}
4042

4143
func (server *APIServer) GetAllAssets(ctx context.Context, req *compassv1beta1.GetAllAssetsRequest) (*compassv1beta1.GetAllAssetsResponse, error) {

internal/server/v1beta1/mocks/asset_service.go

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/server/v1beta1/search.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,60 @@ func (server *APIServer) SuggestAssets(ctx context.Context, req *compassv1beta1.
8080
}, nil
8181
}
8282

83+
func (server *APIServer) GroupAssets(ctx context.Context, req *compassv1beta1.GroupAssetsRequest) (*compassv1beta1.GroupAssetsResponse, error) {
84+
if err := req.ValidateAll(); err != nil {
85+
return nil, status.Error(codes.InvalidArgument, bodyParserErrorMsg(err))
86+
}
87+
ns := grpc_interceptor.FetchNamespaceFromContext(ctx)
88+
if _, err := server.validateUserInCtx(ctx, ns); err != nil {
89+
return nil, err
90+
}
91+
92+
if len(req.GetGroupby()) == 0 {
93+
return nil, status.Error(codes.InvalidArgument, "groupby must be specified")
94+
}
95+
96+
cfg := asset.GroupConfig{
97+
GroupBy: req.GetGroupby(),
98+
Filters: filterConfigFromValues(req.GetFilter()),
99+
IncludeFields: req.GetIncludeFields(),
100+
Size: int(req.GetSize()),
101+
Namespace: ns,
102+
}
103+
104+
results, err := server.assetService.GroupAssets(ctx, cfg)
105+
if err != nil {
106+
return nil, internalServerError(server.logger, fmt.Sprintf("error grouping assets: %s", err.Error()))
107+
}
108+
109+
var groups []*compassv1beta1.AssetGroup
110+
for _, gr := range results {
111+
var fields []*compassv1beta1.GroupField
112+
for _, f := range gr.Fields {
113+
fields = append(fields, &compassv1beta1.GroupField{
114+
GroupKey: f.Key,
115+
GroupValue: f.Value,
116+
})
117+
}
118+
var assets []*compassv1beta1.Asset
119+
for _, a := range gr.Assets {
120+
ap, err := assetToProto(a, false)
121+
if err != nil {
122+
return nil, internalServerError(server.logger, fmt.Sprintf("error converting asset to proto: %s", err.Error()))
123+
}
124+
assets = append(assets, ap)
125+
}
126+
groups = append(groups, &compassv1beta1.AssetGroup{
127+
GroupFields: fields,
128+
Assets: assets,
129+
})
130+
}
131+
132+
return &compassv1beta1.GroupAssetsResponse{
133+
AssetGroups: groups,
134+
}, nil
135+
}
136+
83137
func getSearchFlagsFromProto(flags *compassv1beta1.SearchFlags) asset.SearchFlags {
84138
if flags == nil {
85139
return asset.SearchFlags{}

internal/store/elasticsearch/discovery_search_repository.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,3 +330,152 @@ func (repo *DiscoveryRepository) toSuggestions(response searchResponse) (results
330330

331331
return
332332
}
333+
334+
const defaultGroupsSize = 10
335+
336+
// GroupAssets groups assets by specified fields using ES composite aggregation
337+
func (repo *DiscoveryRepository) GroupAssets(ctx context.Context, cfg asset.GroupConfig) ([]asset.GroupResult, error) {
338+
if cfg.Namespace == nil {
339+
return nil, asset.DiscoveryError{Err: fmt.Errorf("namespace cannot be empty")}
340+
}
341+
342+
size := cfg.Size
343+
if size <= 0 {
344+
size = defaultGroupsSize
345+
}
346+
347+
// Build composite aggregation sources from group-by fields
348+
sources := make([]map[string]interface{}, 0, len(cfg.GroupBy))
349+
for _, field := range cfg.GroupBy {
350+
sources = append(sources, map[string]interface{}{
351+
field: map[string]interface{}{
352+
"terms": map[string]interface{}{
353+
"field": fmt.Sprintf("%s.keyword", field),
354+
},
355+
},
356+
})
357+
}
358+
359+
// Build filter query
360+
boolQuery := elastic.NewBoolQuery()
361+
// Ensure group-by fields exist
362+
for _, field := range cfg.GroupBy {
363+
boolQuery.Filter(elastic.NewExistsQuery(fmt.Sprintf("%s.keyword", field)))
364+
}
365+
repo.buildFilterTermQueries(boolQuery, cfg.Filters)
366+
367+
includedFields := defaultIncludedFields
368+
if len(cfg.IncludeFields) > 0 {
369+
includedFields = cfg.IncludeFields
370+
}
371+
372+
// Build the aggregation query manually as JSON
373+
payload := map[string]interface{}{
374+
"size": 0,
375+
"query": map[string]interface{}{
376+
"bool": map[string]interface{}{},
377+
},
378+
"aggs": map[string]interface{}{
379+
"group_result": map[string]interface{}{
380+
"composite": map[string]interface{}{
381+
"size": size,
382+
"sources": sources,
383+
},
384+
"aggs": map[string]interface{}{
385+
"top_assets": map[string]interface{}{
386+
"top_hits": map[string]interface{}{
387+
"size": size,
388+
"_source": includedFields,
389+
},
390+
},
391+
},
392+
},
393+
},
394+
}
395+
396+
// Use the bool query
397+
boolSrc, err := boolQuery.Source()
398+
if err != nil {
399+
return nil, asset.DiscoveryError{Err: fmt.Errorf("error building query: %w", err)}
400+
}
401+
payload["query"] = boolSrc
402+
403+
body, err := json.Marshal(payload)
404+
if err != nil {
405+
return nil, asset.DiscoveryError{Err: fmt.Errorf("error encoding query: %w", err)}
406+
}
407+
408+
res, err := repo.cli.client.Search(
409+
repo.cli.client.Search.WithBody(strings.NewReader(string(body))),
410+
repo.cli.client.Search.WithIndex(BuildAliasNameFromNamespace(cfg.Namespace)),
411+
repo.cli.client.Search.WithIgnoreUnavailable(true),
412+
repo.cli.client.Search.WithContext(ctx),
413+
)
414+
if err != nil {
415+
return nil, asset.DiscoveryError{Err: fmt.Errorf("error executing group query: %w", err)}
416+
}
417+
defer res.Body.Close()
418+
419+
if res.IsError() {
420+
return nil, asset.DiscoveryError{Err: fmt.Errorf("error response from elasticsearch: %s", errorReasonFromResponse(res))}
421+
}
422+
423+
var response groupResponse
424+
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
425+
return nil, asset.DiscoveryError{Err: fmt.Errorf("error decoding group response: %w", err)}
426+
}
427+
428+
return repo.toGroupResults(response, cfg.GroupBy), nil
429+
}
430+
431+
type groupResponse struct {
432+
Aggregations struct {
433+
GroupResult struct {
434+
Buckets []groupBucket `json:"buckets"`
435+
} `json:"group_result"`
436+
} `json:"aggregations"`
437+
}
438+
439+
type groupBucket struct {
440+
Key map[string]string `json:"key"`
441+
DocCount int `json:"doc_count"`
442+
TopAssets struct {
443+
Hits struct {
444+
Hits []searchHit `json:"hits"`
445+
} `json:"hits"`
446+
} `json:"top_assets"`
447+
}
448+
449+
func (repo *DiscoveryRepository) toGroupResults(response groupResponse, groupBy []string) []asset.GroupResult {
450+
var results []asset.GroupResult
451+
for _, bucket := range response.Aggregations.GroupResult.Buckets {
452+
var fields []asset.GroupField
453+
for _, key := range groupBy {
454+
fields = append(fields, asset.GroupField{
455+
Key: key,
456+
Value: bucket.Key[key],
457+
})
458+
}
459+
460+
var assets []asset.Asset
461+
for _, hit := range bucket.TopAssets.Hits.Hits {
462+
r := hit.Source
463+
assets = append(assets, asset.Asset{
464+
ID: r.ID,
465+
URN: r.URN,
466+
Type: r.Type,
467+
Name: r.Name,
468+
Service: r.Service,
469+
Description: r.Description,
470+
Data: r.Data,
471+
Labels: r.Labels,
472+
})
473+
}
474+
475+
results = append(results, asset.GroupResult{
476+
Fields: fields,
477+
Assets: assets,
478+
})
479+
}
480+
return results
481+
}

0 commit comments

Comments
 (0)