Skip to content

Commit 86065e4

Browse files
committed
feat(compass): add max_concurrency config and skip edge-only entity upserts
Add max_concurrency option to limit concurrent requests per batch, preventing Compass from being overwhelmed during large extractions. Skip the entity upsert POST for records that carry only edges (no name, description, or properties), reducing unnecessary API calls for relationship-only data like collaborator permissions.
1 parent f548b19 commit 86065e4

3 files changed

Lines changed: 85 additions & 12 deletions

File tree

plugins/sinks/compass/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ sinks:
2020
| :-- | :--- | :------ | :---------- | :- |
2121
| `host` | `string` | `https://compass.com` | Hostname of the Compass service | *required* |
2222
| `headers` | `map` | `Compass-User-UUID: meteor@raystack.io` | Additional HTTP headers to send with each request. Multiple values are comma-separated. | *optional* |
23+
| `max_concurrency` | `int` | `10` | Maximum concurrent requests per batch. `0` means no limit (default). | *optional* |
2324

2425
## Behavior
2526

2627
For each Record the sink:
2728

28-
1. **Upserts the Entity** via `POST /raystack.compass.v1beta1.CompassService/UpsertEntity` -- sends `urn`, `type`, `name`, `description`, `source`, and `properties`.
29+
1. **Upserts the Entity** via `POST /raystack.compass.v1beta1.CompassService/UpsertEntity` -- sends `urn`, `type`, `name`, `description`, `source`, and `properties`. Skipped for edge-only records (no name, description, or properties).
2930
2. **Upserts all Edges** via `POST /raystack.compass.v1beta1.CompassService/UpsertEdge` -- one request per edge (e.g. `owned_by`, `derived_from`, `generates`), sending `source_urn`, `target_urn`, `type`, `source`, and `properties`.
3031

31-
Requests are made concurrently within a batch. Server errors (5xx) are returned as retryable errors.
32+
Requests are made concurrently within a batch (controlled by `max_concurrency`). Server errors (5xx) are returned as retryable errors.
3233

3334
## Contributing
3435

plugins/sinks/compass/sink.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import (
2424
var summary string
2525

2626
type Config struct {
27-
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
28-
Headers map[string]string `json:"headers" yaml:"headers" mapstructure:"headers"`
27+
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
28+
Headers map[string]string `json:"headers" yaml:"headers" mapstructure:"headers"`
29+
MaxConcurrency int `json:"max_concurrency" yaml:"max_concurrency" mapstructure:"max_concurrency"`
2930
}
3031

3132
var info = plugins.Info{
@@ -39,6 +40,8 @@ var info = plugins.Info{
3940
headers:
4041
Compass-User-UUID: meteor@raystack.io
4142
X-Other-Header: value1, value2
43+
# Maximum number of concurrent requests to compass per batch. 0 means no limit.
44+
# max_concurrency: 10
4245
`),
4346
}
4447

@@ -84,7 +87,11 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
8487
}
8588

8689
errGroup := errgroup.Group{}
87-
errGroup.SetLimit(len(batch))
90+
limit := len(batch)
91+
if s.config.MaxConcurrency > 0 && s.config.MaxConcurrency < limit {
92+
limit = s.config.MaxConcurrency
93+
}
94+
errGroup.SetLimit(limit)
8895

8996
for _, record := range batch {
9097
record := record
@@ -109,13 +116,17 @@ func (*Sink) Close() error { return nil }
109116
func (s *Sink) sinkRecord(ctx context.Context, record models.Record) error {
110117
entity := record.Entity()
111118

112-
// 1. Upsert the entity.
113-
entityReq := s.buildEntityRequest(record)
114-
if err := s.post(ctx, upsertEntityRoute, entityReq); err != nil {
115-
return fmt.Errorf("upsert entity: %w", err)
119+
// Skip entity upsert only for bare records that exist solely to carry edges
120+
// (no properties, no name, no description — just a URN and type for edge context).
121+
edgeOnly := entity.GetProperties() == nil && entity.GetName() == "" && entity.GetDescription() == ""
122+
if !edgeOnly {
123+
entityReq := s.buildEntityRequest(record)
124+
if err := s.post(ctx, upsertEntityRoute, entityReq); err != nil {
125+
return fmt.Errorf("upsert entity: %w", err)
126+
}
116127
}
117128

118-
// 2. Upsert all edges uniformly via UpsertEdge.
129+
// Upsert all edges uniformly via UpsertEdge.
119130
for _, edge := range record.Edges() {
120131
edgeReq := UpsertEdgeRequest{
121132
SourceURN: edge.GetSourceUrn(),

plugins/sinks/compass/sink_test.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestSink(t *testing.T) {
7373
}})
7474
require.NoError(t, err)
7575

76-
entity := &meteorv1beta1.Entity{Type: "table"}
76+
entity := &meteorv1beta1.Entity{Type: "table", Name: "my-table"}
7777
err = compassSink.Sink(ctx, []models.Record{models.NewRecord(entity)})
7878
require.Error(t, err)
7979
assert.Contains(t, err.Error(), "compass returns 404")
@@ -92,7 +92,7 @@ func TestSink(t *testing.T) {
9292
}})
9393
require.NoError(t, err)
9494

95-
entity := &meteorv1beta1.Entity{Type: "table"}
95+
entity := &meteorv1beta1.Entity{Type: "table", Name: "my-table"}
9696
err = compassSink.Sink(ctx, []models.Record{models.NewRecord(entity)})
9797
require.Error(t, err)
9898
assert.ErrorAs(t, err, &plugins.RetryError{})
@@ -305,6 +305,67 @@ func TestSink(t *testing.T) {
305305
assert.Contains(t, req.Header.Get("Compass-User-UUID"), "meteor@raystack.io")
306306
})
307307

308+
t.Run("should skip entity upsert for edge-only records", func(t *testing.T) {
309+
client := &mockHTTPClient{}
310+
client.SetupResponse(200, `{}`)
311+
ctx := context.TODO()
312+
313+
compassSink := compass.New(client, testutils.Logger)
314+
err := compassSink.Init(ctx, plugins.Config{RawConfig: map[string]any{
315+
"host": host,
316+
}})
317+
require.NoError(t, err)
318+
319+
// Entity with only URN/type/source — no name, description, or properties.
320+
entity := &meteorv1beta1.Entity{
321+
Urn: "my-repo-urn",
322+
Source: "github",
323+
Type: "repository",
324+
}
325+
edges := []*meteorv1beta1.Edge{
326+
{SourceUrn: "user-urn-1", TargetUrn: "my-repo-urn", Type: "has_access_to", Source: "github"},
327+
{SourceUrn: "user-urn-2", TargetUrn: "my-repo-urn", Type: "has_access_to", Source: "github"},
328+
}
329+
err = compassSink.Sink(ctx, []models.Record{models.NewRecord(entity, edges...)})
330+
assert.NoError(t, err)
331+
332+
// Should have only 2 edge requests, no entity upsert.
333+
require.Len(t, client.requests, 2)
334+
assert.Equal(t, upsertEdgeURL, reqURL(client.requests[0]))
335+
assert.Equal(t, upsertEdgeURL, reqURL(client.requests[1]))
336+
})
337+
338+
t.Run("should respect max_concurrency config", func(t *testing.T) {
339+
client := &mockHTTPClient{}
340+
client.SetupResponse(200, `{}`)
341+
ctx := context.TODO()
342+
343+
compassSink := compass.New(client, testutils.Logger)
344+
err := compassSink.Init(ctx, plugins.Config{RawConfig: map[string]any{
345+
"host": host,
346+
"max_concurrency": 2,
347+
}})
348+
require.NoError(t, err)
349+
350+
// Create a batch of 5 records.
351+
var batch []models.Record
352+
for i := 0; i < 5; i++ {
353+
entity := &meteorv1beta1.Entity{
354+
Urn: fmt.Sprintf("urn-%d", i),
355+
Name: fmt.Sprintf("entity-%d", i),
356+
Source: "test",
357+
Type: "table",
358+
}
359+
batch = append(batch, models.NewRecord(entity))
360+
}
361+
362+
err = compassSink.Sink(ctx, batch)
363+
assert.NoError(t, err)
364+
365+
// All 5 entity requests should complete despite concurrency limit.
366+
require.Len(t, client.requests, 5)
367+
})
368+
308369
t.Run("should flatten data into properties without @type", func(t *testing.T) {
309370
client := &mockHTTPClient{}
310371
client.SetupResponse(200, `{}`)

0 commit comments

Comments
 (0)