Skip to content

Commit 84a63a2

Browse files
committed
feat: add dbt metadata extractor
Parse dbt manifest.json to extract models (with schema, materialization, columns, tags, meta) and sources (with database, schema, loader). Emits derived_from edges for model lineage and owned_by edges from meta.owner. Optionally enriches column types from catalog.json when provided. Closes #71
1 parent 00e4665 commit 84a63a2

File tree

8 files changed

+796
-2
lines changed

8 files changed

+796
-2
lines changed

docs/reference/extractors.mdx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ source:
2828
| [`clickhouse`][clickhouse] | `table` | — | ClickHouse SQL |
2929
| [`couchdb`][couchdb] | `table` | — | CouchDB HTTP API |
3030
| [`csv`][csv] | `table` | — | Local filesystem |
31+
| [`dbt`][dbt] | `model`, `source` | `derived_from`, `owned_by` | dbt manifest.json |
3132
| [`elastic`][elastic] | `table` | — | Elasticsearch API |
3233
| [`mariadb`][mariadb] | `table` | — | MariaDB SQL |
3334
| [`mongodb`][mongodb] | `table` | — | MongoDB driver |
@@ -65,6 +66,8 @@ Each extractor emits one or more entity types. All entities share the same flat
6566
| `team` | Teams within an organisation | github |
6667
| `document` | Documentation files from repositories | github |
6768
| `bucket` | Cloud storage containers | gcs |
69+
| `model` | dbt transformation models | dbt |
70+
| `source` | dbt external source definitions | dbt |
6871
| `job` | Scheduled data transformation tasks | optimus |
6972
| `application` | Services and applications | application_yaml |
7073

@@ -74,9 +77,9 @@ Edges represent relationships between entities. Not all extractors emit edges
7477

7578
| Edge Type | Meaning | Extractors |
7679
| :--- | :--- | :--- |
77-
| `derived_from` | Entity depends on / reads from target (upstream dependency) | bigquery, metabase, tableau, optimus, application_yaml |
80+
| `derived_from` | Entity depends on / reads from target (upstream dependency) | bigquery, dbt, metabase, tableau, optimus, application_yaml |
7881
| `generates` | Entity produces / writes to target (downstream output) | optimus, application_yaml |
79-
| `owned_by` | Entity is owned by a user or team | tableau, optimus, application_yaml, github |
82+
| `owned_by` | Entity is owned by a user or team | dbt, tableau, optimus, application_yaml, github |
8083
| `member_of` | User belongs to an org or team | github |
8184
| `belongs_to` | Entity belongs to a parent entity | github |
8285

@@ -86,6 +89,7 @@ Edges represent relationships between entities. Not all extractors emit edges
8689
[clickhouse]: https://github.com/raystack/meteor/tree/main/plugins/extractors/clickhouse/README.md
8790
[couchdb]: https://github.com/raystack/meteor/tree/main/plugins/extractors/couchdb/README.md
8891
[csv]: https://github.com/raystack/meteor/tree/main/plugins/extractors/csv/README.md
92+
[dbt]: https://github.com/raystack/meteor/tree/main/plugins/extractors/dbt/README.md
8993
[elastic]: https://github.com/raystack/meteor/tree/main/plugins/extractors/elastic/README.md
9094
[mariadb]: https://github.com/raystack/meteor/tree/main/plugins/extractors/mariadb/README.md
9195
[mongodb]: https://github.com/raystack/meteor/tree/main/plugins/extractors/mongodb/README.md

plugins/extractors/dbt/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# dbt
2+
3+
Extract metadata from dbt project artifacts including models, sources, lineage, and ownership.
4+
5+
## Usage
6+
7+
```yaml
8+
source:
9+
name: dbt
10+
scope: my-dbt-project
11+
config:
12+
manifest: target/manifest.json
13+
catalog: target/catalog.json
14+
```
15+
16+
## Configuration
17+
18+
| Key | Type | Required | Description |
19+
| :-- | :--- | :------- | :---------- |
20+
| `manifest` | `string` | Yes | Path to dbt `manifest.json` file. |
21+
| `catalog` | `string` | No | Path to dbt `catalog.json` file. When provided, column types are enriched with actual warehouse types. |
22+
23+
## Entities
24+
25+
### Entity: `model`
26+
27+
dbt models (tables/views produced by dbt transformations).
28+
29+
| Field | Sample Value |
30+
| :---- | :----------- |
31+
| `urn` | `urn:dbt:my-project:model:model.jaffle_shop.customers` |
32+
| `name` | `customers` |
33+
| `description` | `This table has basic information about a customer...` |
34+
| `properties.database` | `analytics` |
35+
| `properties.schema` | `jaffle_shop` |
36+
| `properties.materialization` | `table` |
37+
| `properties.sql_path` | `models/customers.sql` |
38+
| `properties.language` | `sql` |
39+
| `properties.tags` | `["finance", "daily"]` |
40+
| `properties.meta` | `{"owner": "analytics-team"}` |
41+
| `properties.columns` | `[{"name": "id", "data_type": "INTEGER", "description": "Primary key"}]` |
42+
43+
### Entity: `source`
44+
45+
External tables that dbt reads from (declared in `sources.yml`).
46+
47+
| Field | Sample Value |
48+
| :---- | :----------- |
49+
| `urn` | `urn:dbt:my-project:source:source.jaffle_shop.raw.orders` |
50+
| `name` | `orders` |
51+
| `description` | `Raw orders table from the payment system` |
52+
| `properties.database` | `raw` |
53+
| `properties.schema` | `jaffle_shop` |
54+
| `properties.source_name` | `raw` |
55+
| `properties.loader` | `stitch` |
56+
| `properties.columns` | `[{"name": "id", "data_type": "INTEGER"}]` |
57+
58+
### Edges
59+
60+
| Type | Source | Target | Description |
61+
| :--- | :----- | :----- | :---------- |
62+
| `derived_from` | `model` | `model` | Model depends on another model |
63+
| `derived_from` | `model` | `source` | Model depends on a source |
64+
| `owned_by` | `model` | `user` | Model ownership from `meta.owner` |
65+
| `owned_by` | `source` | `user` | Source ownership from `meta.owner` |
66+
67+
## How It Works
68+
69+
The extractor parses dbt's `manifest.json` artifact, which is generated by any dbt command (`dbt run`, `dbt build`, `dbt compile`, etc.). This file contains the full project graph — models, sources, tests, and their relationships.
70+
71+
When `catalog.json` is also provided (generated by `dbt docs generate`), column definitions are enriched with actual warehouse data types.
72+
73+
This approach works with both **dbt Core** (local artifacts in `target/`) and **dbt Cloud** (download artifacts via the API or CI).
74+
75+
## Contributing
76+
77+
Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module.

plugins/extractors/dbt/dbt.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package dbt
2+
3+
import (
4+
"context"
5+
_ "embed"
6+
"encoding/json"
7+
"fmt"
8+
"os"
9+
"strings"
10+
11+
"github.com/raystack/meteor/models"
12+
meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1"
13+
"github.com/raystack/meteor/plugins"
14+
"github.com/raystack/meteor/registry"
15+
log "github.com/raystack/salt/observability/logger"
16+
)
17+
18+
//go:embed README.md
19+
var summary string
20+
21+
type Config struct {
22+
Manifest string `json:"manifest" yaml:"manifest" mapstructure:"manifest" validate:"required"`
23+
Catalog string `json:"catalog" yaml:"catalog" mapstructure:"catalog"`
24+
}
25+
26+
var sampleConfig = `
27+
# Path to dbt manifest.json (required)
28+
manifest: target/manifest.json
29+
# Path to dbt catalog.json for column types and stats (optional)
30+
# catalog: target/catalog.json`
31+
32+
var info = plugins.Info{
33+
Description: "Extract metadata from dbt manifest including models, sources, and lineage.",
34+
SampleConfig: sampleConfig,
35+
Summary: summary,
36+
Tags: []string{"dbt", "extractor"},
37+
}
38+
39+
type Extractor struct {
40+
plugins.BaseExtractor
41+
logger log.Logger
42+
config Config
43+
manifest Manifest
44+
catalog *Catalog
45+
}
46+
47+
func New(logger log.Logger) *Extractor {
48+
e := &Extractor{logger: logger}
49+
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
50+
return e
51+
}
52+
53+
func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
54+
if err := e.BaseExtractor.Init(ctx, config); err != nil {
55+
return err
56+
}
57+
58+
manifest, err := readJSON[Manifest](e.config.Manifest)
59+
if err != nil {
60+
return fmt.Errorf("read manifest: %w", err)
61+
}
62+
e.manifest = manifest
63+
64+
if e.config.Catalog != "" {
65+
catalog, err := readJSON[Catalog](e.config.Catalog)
66+
if err != nil {
67+
return fmt.Errorf("read catalog: %w", err)
68+
}
69+
e.catalog = &catalog
70+
}
71+
72+
return nil
73+
}
74+
75+
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
76+
for _, node := range e.manifest.Nodes {
77+
if node.ResourceType != "model" {
78+
continue
79+
}
80+
emit(e.buildModelRecord(node))
81+
}
82+
83+
for _, src := range e.manifest.Sources {
84+
emit(e.buildSourceRecord(src))
85+
}
86+
87+
return nil
88+
}
89+
90+
func (e *Extractor) buildModelRecord(node ManifestNode) models.Record {
91+
urn := models.NewURN("dbt", e.UrnScope, "model", node.UniqueID)
92+
93+
columns := e.buildColumns(node.UniqueID, node.Columns)
94+
props := map[string]any{
95+
"database": node.Database,
96+
"schema": node.Schema,
97+
"materialization": node.Config.Materialized,
98+
"sql_path": node.Path,
99+
}
100+
if node.Language != "" {
101+
props["language"] = node.Language
102+
}
103+
if len(node.Tags) > 0 {
104+
props["tags"] = node.Tags
105+
}
106+
if len(node.Meta) > 0 {
107+
props["meta"] = node.Meta
108+
}
109+
if len(columns) > 0 {
110+
props["columns"] = columns
111+
}
112+
113+
entity := models.NewEntity(urn, "model", node.Name, "dbt", props)
114+
if node.Description != "" {
115+
entity.Description = node.Description
116+
}
117+
118+
var edges []*meteorv1beta1.Edge
119+
for _, dep := range node.DependsOn.Nodes {
120+
depURN := e.resolveDepURN(dep)
121+
edges = append(edges, models.DerivedFromEdge(urn, depURN, "dbt"))
122+
}
123+
if owner := getOwner(node.Meta); owner != "" {
124+
ownerURN := models.NewURN("dbt", e.UrnScope, "user", owner)
125+
edges = append(edges, models.OwnerEdge(urn, ownerURN, "dbt"))
126+
}
127+
128+
return models.NewRecord(entity, edges...)
129+
}
130+
131+
func (e *Extractor) buildSourceRecord(src ManifestSource) models.Record {
132+
urn := models.NewURN("dbt", e.UrnScope, "source", src.UniqueID)
133+
134+
columns := e.buildSourceColumns(src.Columns)
135+
props := map[string]any{
136+
"database": src.Database,
137+
"schema": src.Schema,
138+
"source_name": src.SourceName,
139+
}
140+
if src.Loader != "" {
141+
props["loader"] = src.Loader
142+
}
143+
if len(src.Tags) > 0 {
144+
props["tags"] = src.Tags
145+
}
146+
if len(src.Meta) > 0 {
147+
props["meta"] = src.Meta
148+
}
149+
if len(columns) > 0 {
150+
props["columns"] = columns
151+
}
152+
153+
entity := models.NewEntity(urn, "source", src.Name, "dbt", props)
154+
if src.Description != "" {
155+
entity.Description = src.Description
156+
}
157+
158+
var edges []*meteorv1beta1.Edge
159+
if owner := getOwner(src.Meta); owner != "" {
160+
ownerURN := models.NewURN("dbt", e.UrnScope, "user", owner)
161+
edges = append(edges, models.OwnerEdge(urn, ownerURN, "dbt"))
162+
}
163+
164+
return models.NewRecord(entity, edges...)
165+
}
166+
167+
func (e *Extractor) buildColumns(nodeID string, manifestCols map[string]ManifestColumn) []map[string]any {
168+
var columns []map[string]any
169+
for _, col := range manifestCols {
170+
c := map[string]any{
171+
"name": col.Name,
172+
}
173+
if col.Description != "" {
174+
c["description"] = col.Description
175+
}
176+
if col.DataType != "" {
177+
c["data_type"] = col.DataType
178+
}
179+
180+
// Enrich with catalog data if available.
181+
if e.catalog != nil {
182+
if catNode, ok := e.catalog.Nodes[nodeID]; ok {
183+
if catCol, ok := catNode.Columns[strings.ToLower(col.Name)]; ok {
184+
if catCol.Type != "" {
185+
c["data_type"] = catCol.Type
186+
}
187+
}
188+
}
189+
}
190+
191+
columns = append(columns, c)
192+
}
193+
return columns
194+
}
195+
196+
func (e *Extractor) buildSourceColumns(manifestCols map[string]ManifestColumn) []map[string]any {
197+
var columns []map[string]any
198+
for _, col := range manifestCols {
199+
c := map[string]any{
200+
"name": col.Name,
201+
}
202+
if col.Description != "" {
203+
c["description"] = col.Description
204+
}
205+
if col.DataType != "" {
206+
c["data_type"] = col.DataType
207+
}
208+
columns = append(columns, c)
209+
}
210+
return columns
211+
}
212+
213+
// resolveDepURN maps a manifest dependency ID to a URN.
214+
// Dependency IDs look like "model.project.name" or "source.project.src.table".
215+
func (e *Extractor) resolveDepURN(dep string) string {
216+
parts := strings.SplitN(dep, ".", 2)
217+
kind := parts[0] // "model", "source", "seed", etc.
218+
return models.NewURN("dbt", e.UrnScope, kind, dep)
219+
}
220+
221+
func getOwner(meta map[string]any) string {
222+
if meta == nil {
223+
return ""
224+
}
225+
if owner, ok := meta["owner"].(string); ok {
226+
return owner
227+
}
228+
return ""
229+
}
230+
231+
func readJSON[T any](path string) (T, error) {
232+
var v T
233+
data, err := os.ReadFile(path)
234+
if err != nil {
235+
return v, err
236+
}
237+
if err := json.Unmarshal(data, &v); err != nil {
238+
return v, err
239+
}
240+
return v, nil
241+
}
242+
243+
func init() {
244+
if err := registry.Extractors.Register("dbt", func() plugins.Extractor {
245+
return New(plugins.GetLog())
246+
}); err != nil {
247+
panic(err)
248+
}
249+
}

0 commit comments

Comments
 (0)