Skip to content

Commit 8ecb7d9

Browse files
committed
feat: update proto with search flags, lineage options, update_only, is_deleted, and group assets
- Update PROTON_COMMIT to include new proto fields - Wire SearchFlags (disable_fuzzy, enable_highlight, is_column_search) from proto to handler - Wire with_attributes (from proto optional bool) and include_deleted in lineage handler - Add update_only support to UpsertAsset and UpsertPatchAsset handlers - Add is_deleted filter to GetAllAssets and is_deleted field to Asset proto - Add IsDeleted to asset.Filter with builder method and postgres filter query - Add GroupAssets RPC definition (handler implementation pending) - Remove remaining StatsD references from handler layer
1 parent 5d27a0d commit 8ecb7d9

16 files changed

Lines changed: 4345 additions & 2811 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ COMMIT := $(shell git rev-parse --short HEAD)
33
TAG := "$(shell git rev-list --tags --max-count=1)"
44
VERSION := "$(shell git describe --tags ${TAG})-next"
55
BUILD_DIR=dist
6-
PROTON_COMMIT := "ccbf219312db35a934361ebad895cb40145ca235"
6+
PROTON_COMMIT := "755609a0cc9e55bf1933ae661558a328e2eadd5f"
77

88
.PHONY: all build clean test tidy vet proto setup format generat
99

buf.gen.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ plugins:
1515
out: proto
1616
opt:
1717
- paths=source_relative
18-
- lang=go
1918
- plugin: buf.build/grpc-ecosystem/openapiv2:v2.16.0
2019
out: proto
2120
opt:

core/asset/filter.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type Filter struct {
1616
QueryFields []string
1717
Query string
1818
Data map[string][]string
19+
IsDeleted *bool
1920
}
2021

2122
func (f *Filter) Validate() error {
@@ -32,6 +33,7 @@ type filterBuilder struct {
3233
offset int
3334
sortBy string
3435
sortDirection string
36+
isDeleted *bool
3537
}
3638

3739
func NewFilterBuilder() *filterBuilder {
@@ -83,13 +85,19 @@ func (fb *filterBuilder) SortDirection(sortDirection string) *filterBuilder {
8385
return fb
8486
}
8587

88+
func (fb *filterBuilder) IsDeleted(isDeleted bool) *filterBuilder {
89+
fb.isDeleted = &isDeleted
90+
return fb
91+
}
92+
8693
func (fb *filterBuilder) Build() (Filter, error) {
8794
flt := Filter{
8895
Size: fb.size,
8996
Offset: fb.offset,
9097
SortBy: fb.sortBy,
9198
SortDirection: fb.sortDirection,
9299
Query: fb.q,
100+
IsDeleted: fb.isDeleted,
93101
}
94102

95103
if len(fb.data) != 0 {

core/asset/lineage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type LineageQuery struct {
2626
Level int
2727
Direction LineageDirection
2828
WithAttributes bool
29+
IncludeDeleted bool
2930
}
3031

3132
//go:generate mockery --name=LineageRepository -r --case underscore --with-expecter --structname=LineageRepository --filename=lineage_repository.go --output=./mocks

internal/server/v1beta1/asset.go

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/raystack/compass/core/asset"
1313
"github.com/raystack/compass/core/star"
1414
"github.com/raystack/compass/core/user"
15-
"github.com/raystack/compass/pkg/statsd"
1615
compassv1beta1 "github.com/raystack/compass/proto/raystack/compass/v1beta1"
1716
"github.com/r3labs/diff/v2"
1817
"google.golang.org/grpc/codes"
@@ -21,11 +20,6 @@ import (
2120
"google.golang.org/protobuf/types/known/timestamppb"
2221
)
2322

24-
//go:generate mockery --name=StatsDClient -r --case underscore --with-expecter --structname StatsDClient --filename statsd_monitor.go --output=./mocks
25-
type StatsDClient interface {
26-
Incr(name string) *statsd.Metric
27-
}
28-
2923
type AssetService interface {
3024
GetAllAssets(ctx context.Context, flt asset.Filter, withTotal bool) ([]asset.Asset, uint32, error)
3125
GetAssetByID(ctx context.Context, id string) (asset.Asset, error)
@@ -54,7 +48,7 @@ func (server *APIServer) GetAllAssets(ctx context.Context, req *compassv1beta1.G
5448
return nil, err
5549
}
5650

57-
flt, err := asset.NewFilterBuilder().
51+
fb := asset.NewFilterBuilder().
5852
Types(req.GetTypes()).
5953
Services(req.GetServices()).
6054
Q(req.GetQ()).
@@ -63,8 +57,11 @@ func (server *APIServer) GetAllAssets(ctx context.Context, req *compassv1beta1.G
6357
Offset(int(req.GetOffset())).
6458
SortBy(req.GetSort()).
6559
SortDirection(req.GetDirection()).
66-
Data(req.GetData()).
67-
Build()
60+
Data(req.GetData())
61+
if req.GetIsDeleted() {
62+
fb = fb.IsDeleted(true)
63+
}
64+
flt, err := fb.Build()
6865
if err != nil {
6966
return nil, status.Error(codes.InvalidArgument, bodyParserErrorMsg(err))
7067
}
@@ -260,6 +257,9 @@ func (server *APIServer) UpsertAsset(ctx context.Context, req *compassv1beta1.Up
260257
req.GetDownstreams(),
261258
)
262259
if err != nil {
260+
if req.GetUpdateOnly() && errors.As(err, new(asset.NotFoundError)) {
261+
return &compassv1beta1.UpsertAssetResponse{Id: ""}, nil
262+
}
263263
return nil, err
264264
}
265265

@@ -290,8 +290,14 @@ func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1bet
290290
}
291291

292292
ast, err := server.assetService.GetAssetByID(ctx, urn)
293-
if err != nil && !errors.As(err, &asset.NotFoundError{}) {
294-
return nil, internalServerError(server.logger, err.Error())
293+
if err != nil {
294+
if errors.As(err, &asset.NotFoundError{}) {
295+
if req.GetUpdateOnly() {
296+
return &compassv1beta1.UpsertPatchAssetResponse{Id: ""}, nil
297+
}
298+
} else {
299+
return nil, internalServerError(server.logger, err.Error())
300+
}
295301
}
296302

297303
patchAssetMap := decodePatchAssetToMap(baseAsset)
@@ -332,12 +338,6 @@ func (server *APIServer) DeleteAsset(ctx context.Context, req *compassv1beta1.De
332338
if errors.As(err, new(asset.NotFoundError)) {
333339
return nil, status.Error(codes.NotFound, err.Error())
334340
}
335-
if errors.As(err, new(asset.DiscoveryError)) {
336-
server.sendStatsDCounterMetric("discovery_error",
337-
map[string]string{
338-
"method": "delete",
339-
})
340-
}
341341
return nil, internalServerError(server.logger, err.Error())
342342
}
343343

@@ -404,21 +404,9 @@ func (server *APIServer) upsertAsset(
404404
if errors.As(err, new(asset.InvalidError)) {
405405
return "", status.Error(codes.InvalidArgument, err.Error())
406406
} else if err != nil {
407-
if errors.As(err, new(asset.DiscoveryError)) {
408-
server.sendStatsDCounterMetric("discovery_error",
409-
map[string]string{
410-
"method": mode,
411-
})
412-
}
413407
return "", internalServerError(server.logger, err.Error())
414408
}
415409

416-
server.sendStatsDCounterMetric(mode,
417-
map[string]string{
418-
"type": ast.Type.String(),
419-
"service": ast.Service,
420-
})
421-
422410
return
423411
}
424412

@@ -431,24 +419,12 @@ func (server *APIServer) upsertAssetWithoutLineage(ctx context.Context, ns *name
431419

432420
assetID, err := server.assetService.UpsertAssetWithoutLineage(ctx, ns, &ast)
433421
if err != nil {
434-
switch {
435-
case errors.As(err, new(asset.InvalidError)):
422+
if errors.As(err, new(asset.InvalidError)) {
436423
return "", status.Error(codes.InvalidArgument, err.Error())
437-
438-
case errors.As(err, new(asset.DiscoveryError)):
439-
server.sendStatsDCounterMetric("discovery_error",
440-
map[string]string{
441-
"method": mode,
442-
})
443424
}
444-
445425
return "", internalServerError(server.logger, err.Error())
446426
}
447427

448-
server.sendStatsDCounterMetric(mode, map[string]string{
449-
"type": ast.Type.String(),
450-
"service": ast.Service,
451-
})
452428
return assetID, nil
453429
}
454430

@@ -629,6 +605,7 @@ func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.As
629605
CreatedAt: createdAtPB,
630606
UpdatedAt: updatedAtPB,
631607
Probes: probes,
608+
IsDeleted: a.IsDeleted,
632609
}
633610
return
634611
}

internal/server/v1beta1/lineage.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@ func (server *APIServer) GetGraph(ctx context.Context, req *compassv1beta1.GetGr
2828

2929
// Default to true for backward compatibility
3030
withAttributes := true
31+
if req.WithAttributes != nil {
32+
withAttributes = *req.WithAttributes
33+
}
3134

3235
lineage, err := server.assetService.GetLineage(ctx, req.GetUrn(), asset.LineageQuery{
3336
Level: int(req.GetLevel()),
3437
Direction: direction,
3538
WithAttributes: withAttributes,
39+
IncludeDeleted: req.GetIncludeDeleted(),
3640
})
3741
if err != nil {
3842
return nil, internalServerError(server.logger, err.Error())

internal/server/v1beta1/mocks/statsd_monitor.go

Lines changed: 0 additions & 80 deletions
This file was deleted.

internal/server/v1beta1/option.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
11
package handlersv1beta1
22

33
type Option func(*APIServer)
4-
5-
func WithStatsD(st StatsDClient) Option {
6-
return func(s *APIServer) {
7-
s.statsDReporter = st
8-
}
9-
}

internal/server/v1beta1/search.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func (server *APIServer) SearchAssets(ctx context.Context, req *compassv1beta1.S
2929
RankBy: req.GetRankby(),
3030
Queries: req.GetQuery(),
3131
IncludeFields: req.GetIncludeFields(),
32+
Flags: getSearchFlagsFromProto(req.GetFlags()),
3233
Namespace: ns,
3334
}
3435

@@ -79,6 +80,17 @@ func (server *APIServer) SuggestAssets(ctx context.Context, req *compassv1beta1.
7980
}, nil
8081
}
8182

83+
func getSearchFlagsFromProto(flags *compassv1beta1.SearchFlags) asset.SearchFlags {
84+
if flags == nil {
85+
return asset.SearchFlags{}
86+
}
87+
return asset.SearchFlags{
88+
DisableFuzzy: flags.GetDisableFuzzy(),
89+
EnableHighlight: flags.GetEnableHighlight(),
90+
IsColumnSearch: flags.GetIsColumnSearch(),
91+
}
92+
}
93+
8294
func filterConfigFromValues(fltMap map[string]string) map[string][]string {
8395
var filter = make(map[string][]string)
8496
for key, value := range fltMap {

internal/server/v1beta1/server.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ type APIServer struct {
2525
tagTemplateService TagTemplateService
2626
userService UserService
2727
logger log.Logger
28-
statsDReporter StatsDClient
2928
}
3029

3130
var (
@@ -72,16 +71,6 @@ func (server *APIServer) validateUserInCtx(ctx context.Context, ns *namespace.Na
7271
return userID, nil
7372
}
7473

75-
func (server *APIServer) sendStatsDCounterMetric(metricName string, kvTags map[string]string) {
76-
if server.statsDReporter != nil {
77-
metric := server.statsDReporter.Incr(metricName)
78-
for k, v := range kvTags {
79-
metric.Tag(k, v)
80-
}
81-
metric.Publish()
82-
}
83-
}
84-
8574
func internalServerError(logger log.Logger, msg string) error {
8675
ref := time.Now().Unix()
8776

0 commit comments

Comments
 (0)