Skip to content

Commit 56ae808

Browse files
authored
test: add missing tests for Entity+Edge model migration (#513)
* test: add missing tests for Entity+Edge model migration Restore and strengthen test coverage that was lost during the Asset → Entity+Edge migration. Adds new test files for previously untested components and extends existing test files with edge-case coverage. New test files: - processors/enrich: Init validation, Process with nil/existing props, edge passthrough - processors/labels: Init validation, Process with nil/existing labels, merge, edge passthrough - sinks/console: Init, Sink single/multiple/empty/with-edges, Close - sinks/kafka: Init config validation (missing brokers, topic, etc.) Strengthened tests: - models/record: multiple edges, nil entity - models/util: nil/empty props, nested props sanitization, JSON with/without edges - processors/script: modify entity name, nil properties, edge preservation - sinks/compass: empty batch handling - sinks/file: empty batch, records with edges (ndjson + yaml) * docs: update context_graph.md to reflect Entity+Edge model Replace outdated Asset terminology with the current Record/Entity/Edge model that was introduced in the migration. * fix: update Makefile proto generation to use meteor/v1beta1 Update PROTON_COMMIT to the commit that added the new Entity+Edge protos and removed the legacy assets. Update the --path flag from the deleted raystack/assets/v1beta2 to raystack/meteor/v1beta1.
1 parent d71b46a commit 56ae808

File tree

13 files changed

+712
-6
lines changed

13 files changed

+712
-6
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
NAME="github.com/raystack/meteor"
22
VERSION=$(shell git describe --always --tags 2>/dev/null)
33
COVERFILE="/tmp/app.coverprofile"
4-
PROTON_COMMIT := "ae895e033f71df187c62d7cf9431a2e259ddd423"
4+
PROTON_COMMIT := "f5514e23005e7480319a18ba905dfecaa17379f8"
55
.PHONY: all build clean test
66

77
all: build
@@ -36,7 +36,7 @@ test-coverage:
3636
generate-proto:
3737
@echo " > cloning protobuf from raystack/proton"
3838
@echo " > generating protobuf"
39-
@buf generate --template buf.gen.yaml https://github.com/raystack/proton/archive/${PROTON_COMMIT}.zip#strip_components=1 --path raystack/assets/v1beta2
39+
@buf generate --template buf.gen.yaml https://github.com/raystack/proton/archive/${PROTON_COMMIT}.zip#strip_components=1 --path raystack/meteor/v1beta1
4040
@echo " > protobuf compilation finished"
4141

4242
lint:

docs/docs/concepts/context_graph.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ Meteor operates as a metadata supply chain with three stages:
1919

2020
### Extract
2121

22-
Meteor's extractors connect to 30+ data sources — databases (BigQuery, Postgres, Snowflake), BI tools (Tableau, Metabase, Superset), streaming platforms (Kafka), cloud storage (GCS), orchestrators (Optimus), and more. Each extractor produces standardized **Asset** records containing:
22+
Meteor's extractors connect to 30+ data sources — databases (BigQuery, Postgres, Snowflake), BI tools (Tableau, Metabase, Superset), streaming platforms (Kafka), cloud storage (GCS), orchestrators (Optimus), and more. Each extractor produces **Record**s — an **Entity** with flat properties plus **Edge**s representing relationships:
2323

24-
- **Schema metadata** — column names, types, descriptions, constraints
25-
- **Lineage** — upstream and downstream asset references
26-
- **Ownership** — who owns and maintains the asset
24+
- **Schema metadata** — column names, types, descriptions, constraints (stored as entity properties)
25+
- **Lineage** — upstream and downstream relationships (represented as edges)
26+
- **Ownership** — who owns and maintains the entity (represented as edges)
2727
- **Service context** — source system, URLs, timestamps
2828

2929
### Process

models/record_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,36 @@ func TestNewRecordWithEdges(t *testing.T) {
3434
assert.Len(t, record.Edges(), 1)
3535
assert.Equal(t, edge, record.Edges()[0])
3636
}
37+
38+
func TestNewRecordWithMultipleEdges(t *testing.T) {
39+
entity := &meteorv1beta1.Entity{
40+
Urn: "urn:test:scope:table:t1",
41+
Name: "t1",
42+
Type: "table",
43+
}
44+
lineageEdge := &meteorv1beta1.Edge{
45+
SourceUrn: "urn:test:scope:table:t1",
46+
TargetUrn: "urn:test:scope:table:t2",
47+
Type: "lineage",
48+
Source: "test",
49+
}
50+
ownerEdge := &meteorv1beta1.Edge{
51+
SourceUrn: "urn:test:scope:table:t1",
52+
TargetUrn: "urn:user:alice@co.com",
53+
Type: "owned_by",
54+
Source: "test",
55+
}
56+
record := models.NewRecord(entity, lineageEdge, ownerEdge)
57+
assert.Equal(t, entity, record.Entity())
58+
assert.Len(t, record.Edges(), 2)
59+
assert.Equal(t, lineageEdge, record.Edges()[0])
60+
assert.Equal(t, ownerEdge, record.Edges()[1])
61+
assert.Equal(t, "lineage", record.Edges()[0].GetType())
62+
assert.Equal(t, "owned_by", record.Edges()[1].GetType())
63+
}
64+
65+
func TestNewRecordWithNilEntity(t *testing.T) {
66+
record := models.NewRecord(nil)
67+
assert.Nil(t, record.Entity())
68+
assert.Empty(t, record.Edges())
69+
}

models/util_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,66 @@ func TestRecordToJSON(t *testing.T) {
8888
assert.Contains(t, string(b), `"entity"`)
8989
assert.Contains(t, string(b), `"edges"`)
9090
}
91+
92+
func TestNewEntityWithNilProps(t *testing.T) {
93+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", nil)
94+
assert.Equal(t, "urn:test:s:table:t1", entity.GetUrn())
95+
assert.Equal(t, "table", entity.GetType())
96+
assert.Nil(t, entity.GetProperties())
97+
}
98+
99+
func TestNewEntityWithEmptyProps(t *testing.T) {
100+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", map[string]any{})
101+
assert.Equal(t, "urn:test:s:table:t1", entity.GetUrn())
102+
assert.Nil(t, entity.GetProperties())
103+
}
104+
105+
func TestNewEntityWithNestedProps(t *testing.T) {
106+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", map[string]any{
107+
"labels": map[string]string{"env": "production", "team": "data"},
108+
"tags": []string{"important", "verified"},
109+
})
110+
assert.Equal(t, "urn:test:s:table:t1", entity.GetUrn())
111+
props := entity.GetProperties()
112+
require.NotNil(t, props)
113+
labelsVal := props.GetFields()["labels"].GetStructValue()
114+
require.NotNil(t, labelsVal)
115+
assert.Equal(t, "production", labelsVal.GetFields()["env"].GetStringValue())
116+
assert.Equal(t, "data", labelsVal.GetFields()["team"].GetStringValue())
117+
tagsVal := props.GetFields()["tags"].GetListValue()
118+
require.NotNil(t, tagsVal)
119+
assert.Len(t, tagsVal.GetValues(), 2)
120+
assert.Equal(t, "important", tagsVal.GetValues()[0].GetStringValue())
121+
}
122+
123+
func TestRecordToJSONWithoutEdges(t *testing.T) {
124+
entity := &meteorv1beta1.Entity{
125+
Urn: "urn:test:s:table:t1",
126+
Name: "t1",
127+
}
128+
record := models.NewRecord(entity)
129+
130+
b, err := models.RecordToJSON(record)
131+
require.NoError(t, err)
132+
assert.Contains(t, string(b), `"entity"`)
133+
assert.NotContains(t, string(b), `"edges"`)
134+
}
135+
136+
func TestRecordToJSONWithMultipleEdges(t *testing.T) {
137+
entity := &meteorv1beta1.Entity{
138+
Urn: "urn:test:s:table:t1",
139+
Name: "t1",
140+
}
141+
lineage := models.LineageEdge("urn:a", "urn:b", "test")
142+
owner := models.OwnerEdge("urn:test:s:table:t1", "urn:user:bob@co.com", "test")
143+
record := models.NewRecord(entity, lineage, owner)
144+
145+
b, err := models.RecordToJSON(record)
146+
require.NoError(t, err)
147+
s := string(b)
148+
assert.Contains(t, s, `"entity"`)
149+
assert.Contains(t, s, `"edges"`)
150+
assert.Contains(t, s, `"lineage"`)
151+
assert.Contains(t, s, `"owned_by"`)
152+
assert.Contains(t, s, `"urn:user:bob@co.com"`)
153+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
//go:build plugins
2+
3+
package enrich_test
4+
5+
import (
6+
"context"
7+
"testing"
8+
9+
"github.com/raystack/meteor/models"
10+
"github.com/raystack/meteor/plugins"
11+
"github.com/raystack/meteor/plugins/processors/enrich"
12+
testutils "github.com/raystack/meteor/test/utils"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestInit(t *testing.T) {
18+
t.Run("should return error for invalid config", func(t *testing.T) {
19+
proc := enrich.New(testutils.Logger)
20+
err := proc.Init(context.Background(), plugins.Config{
21+
RawConfig: map[string]any{},
22+
})
23+
assert.Error(t, err)
24+
})
25+
26+
t.Run("should return no error for valid config", func(t *testing.T) {
27+
proc := enrich.New(testutils.Logger)
28+
err := proc.Init(context.Background(), plugins.Config{
29+
RawConfig: map[string]any{
30+
"attributes": map[string]any{
31+
"team": "data-engineering",
32+
},
33+
},
34+
})
35+
assert.NoError(t, err)
36+
})
37+
}
38+
39+
func TestProcess(t *testing.T) {
40+
t.Run("should enrich entity with nil properties", func(t *testing.T) {
41+
proc := enrich.New(testutils.Logger)
42+
err := proc.Init(context.Background(), plugins.Config{
43+
RawConfig: map[string]any{
44+
"attributes": map[string]any{
45+
"team": "data-engineering",
46+
},
47+
},
48+
})
49+
require.NoError(t, err)
50+
51+
entity := models.NewEntity("urn:table:1", "table", "my-table", "bigquery", nil)
52+
rec := models.NewRecord(entity)
53+
54+
result, err := proc.Process(context.Background(), rec)
55+
require.NoError(t, err)
56+
57+
props := result.Entity().GetProperties().AsMap()
58+
assert.Equal(t, "data-engineering", props["team"])
59+
})
60+
61+
t.Run("should merge with existing properties", func(t *testing.T) {
62+
proc := enrich.New(testutils.Logger)
63+
err := proc.Init(context.Background(), plugins.Config{
64+
RawConfig: map[string]any{
65+
"attributes": map[string]any{
66+
"team": "data-engineering",
67+
},
68+
},
69+
})
70+
require.NoError(t, err)
71+
72+
entity := models.NewEntity("urn:table:1", "table", "my-table", "bigquery", map[string]any{
73+
"existing_key": "existing_value",
74+
})
75+
rec := models.NewRecord(entity)
76+
77+
result, err := proc.Process(context.Background(), rec)
78+
require.NoError(t, err)
79+
80+
props := result.Entity().GetProperties().AsMap()
81+
assert.Equal(t, "existing_value", props["existing_key"])
82+
assert.Equal(t, "data-engineering", props["team"])
83+
})
84+
85+
t.Run("should preserve edges through processing", func(t *testing.T) {
86+
proc := enrich.New(testutils.Logger)
87+
err := proc.Init(context.Background(), plugins.Config{
88+
RawConfig: map[string]any{
89+
"attributes": map[string]any{
90+
"team": "data-engineering",
91+
},
92+
},
93+
})
94+
require.NoError(t, err)
95+
96+
entity := models.NewEntity("urn:table:1", "table", "my-table", "bigquery", nil)
97+
lineage := models.LineageEdge("urn:table:1", "urn:table:2", "bigquery")
98+
owner := models.OwnerEdge("urn:table:1", "urn:user:alice", "bigquery")
99+
rec := models.NewRecord(entity, lineage, owner)
100+
101+
result, err := proc.Process(context.Background(), rec)
102+
require.NoError(t, err)
103+
104+
assert.Len(t, result.Edges(), 2)
105+
assert.Equal(t, lineage, result.Edges()[0])
106+
assert.Equal(t, owner, result.Edges()[1])
107+
})
108+
109+
t.Run("should only add string values and skip non-string values", func(t *testing.T) {
110+
proc := enrich.New(testutils.Logger)
111+
err := proc.Init(context.Background(), plugins.Config{
112+
RawConfig: map[string]any{
113+
"attributes": map[string]any{
114+
"team": "data-engineering",
115+
"count": 42,
116+
"enabled": true,
117+
"fraction": 3.14,
118+
},
119+
},
120+
})
121+
require.NoError(t, err)
122+
123+
entity := models.NewEntity("urn:table:1", "table", "my-table", "bigquery", nil)
124+
rec := models.NewRecord(entity)
125+
126+
result, err := proc.Process(context.Background(), rec)
127+
require.NoError(t, err)
128+
129+
props := result.Entity().GetProperties().AsMap()
130+
assert.Equal(t, "data-engineering", props["team"])
131+
assert.Nil(t, props["count"])
132+
assert.Nil(t, props["enabled"])
133+
assert.Nil(t, props["fraction"])
134+
})
135+
}

0 commit comments

Comments
 (0)