Skip to content

Commit 67677e0

Browse files
committed
feat: add markdown output format to console sink
Add a `format` config option to the console sink supporting `json` (default, backwards-compatible) and `markdown`. The markdown format renders metadata as scannable tables and bullet lists, making it more token-efficient for local AI tools like Claude Code.
1 parent fe8432e commit 67677e0

File tree

5 files changed

+428
-14
lines changed

5 files changed

+428
-14
lines changed

models/util.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package models
33
import (
44
"encoding/json"
55
"fmt"
6+
"sort"
7+
"strings"
68

79
meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1"
810
"google.golang.org/protobuf/encoding/protojson"
@@ -118,6 +120,206 @@ func RecordToJSON(r Record) ([]byte, error) {
118120
return json.Marshal(result)
119121
}
120122

123+
// RecordToMarkdown serializes a record (entity + edges) to Markdown.
124+
func RecordToMarkdown(r Record) ([]byte, error) {
125+
var b strings.Builder
126+
127+
e := r.Entity()
128+
b.WriteString("## ")
129+
b.WriteString(e.GetName())
130+
b.WriteString("\n\n")
131+
132+
// Metadata table.
133+
b.WriteString("| Field | Value |\n|---|---|\n")
134+
b.WriteString("| URN | `")
135+
b.WriteString(e.GetUrn())
136+
b.WriteString("` |\n")
137+
b.WriteString("| Type | ")
138+
b.WriteString(e.GetType())
139+
b.WriteString(" |\n")
140+
b.WriteString("| Source | ")
141+
b.WriteString(e.GetSource())
142+
b.WriteString(" |\n")
143+
if desc := e.GetDescription(); desc != "" {
144+
b.WriteString("| Description | ")
145+
b.WriteString(desc)
146+
b.WriteString(" |\n")
147+
}
148+
if ct := e.GetCreateTime(); ct != nil && ct.IsValid() {
149+
b.WriteString("| Created | ")
150+
b.WriteString(ct.AsTime().Format("2006-01-02T15:04:05Z"))
151+
b.WriteString(" |\n")
152+
}
153+
if ut := e.GetUpdateTime(); ut != nil && ut.IsValid() {
154+
b.WriteString("| Updated | ")
155+
b.WriteString(ut.AsTime().Format("2006-01-02T15:04:05Z"))
156+
b.WriteString(" |\n")
157+
}
158+
159+
// Properties.
160+
if props := e.GetProperties(); props != nil {
161+
m := props.AsMap()
162+
if len(m) > 0 {
163+
writePropertiesMarkdown(&b, m)
164+
}
165+
}
166+
167+
// Edges.
168+
if edges := r.Edges(); len(edges) > 0 {
169+
b.WriteString("\n### Edges\n\n")
170+
b.WriteString("| Type | Source URN | Target URN |\n|---|---|---|\n")
171+
for _, edge := range edges {
172+
b.WriteString("| ")
173+
b.WriteString(edge.GetType())
174+
b.WriteString(" | `")
175+
b.WriteString(edge.GetSourceUrn())
176+
b.WriteString("` | `")
177+
b.WriteString(edge.GetTargetUrn())
178+
b.WriteString("` |\n")
179+
}
180+
}
181+
182+
return []byte(b.String()), nil
183+
}
184+
185+
func writePropertiesMarkdown(b *strings.Builder, m map[string]any) {
186+
keys := sortedKeys(m)
187+
188+
// Split into scalar and list-of-maps properties.
189+
var scalarKeys []string
190+
var tableKeys []string
191+
for _, k := range keys {
192+
if items, ok := asSliceOfMaps(m[k]); ok && len(items) > 0 {
193+
_ = items
194+
tableKeys = append(tableKeys, k)
195+
} else {
196+
scalarKeys = append(scalarKeys, k)
197+
}
198+
}
199+
200+
if len(scalarKeys) > 0 {
201+
b.WriteString("\n### Properties\n\n")
202+
for _, k := range scalarKeys {
203+
writePropertyValue(b, k, m[k])
204+
}
205+
}
206+
207+
for _, k := range tableKeys {
208+
items, _ := asSliceOfMaps(m[k])
209+
writeMapSliceTable(b, k, items)
210+
}
211+
}
212+
213+
func writePropertyValue(b *strings.Builder, key string, val any) {
214+
switch v := val.(type) {
215+
case []any:
216+
b.WriteString("- **")
217+
b.WriteString(key)
218+
b.WriteString("**: ")
219+
strs := make([]string, 0, len(v))
220+
for _, item := range v {
221+
strs = append(strs, fmt.Sprintf("%v", item))
222+
}
223+
b.WriteString(strings.Join(strs, ", "))
224+
b.WriteString("\n")
225+
case map[string]any:
226+
b.WriteString("- **")
227+
b.WriteString(key)
228+
b.WriteString("**:\n")
229+
for _, sk := range sortedKeys(v) {
230+
b.WriteString(" - **")
231+
b.WriteString(sk)
232+
b.WriteString("**: ")
233+
b.WriteString(fmt.Sprintf("%v", v[sk]))
234+
b.WriteString("\n")
235+
}
236+
default:
237+
b.WriteString("- **")
238+
b.WriteString(key)
239+
b.WriteString("**: ")
240+
b.WriteString(fmt.Sprintf("%v", val))
241+
b.WriteString("\n")
242+
}
243+
}
244+
245+
func writeMapSliceTable(b *strings.Builder, title string, items []map[string]any) {
246+
// Collect all headers from the union of keys.
247+
headerSet := make(map[string]struct{})
248+
for _, item := range items {
249+
for k := range item {
250+
headerSet[k] = struct{}{}
251+
}
252+
}
253+
headers := sortedKeys(headerSet)
254+
255+
// Title case the section name.
256+
b.WriteString("\n### ")
257+
b.WriteString(titleCase(title))
258+
b.WriteString("\n\n")
259+
260+
// Header row.
261+
b.WriteString("|")
262+
for _, h := range headers {
263+
b.WriteString(" ")
264+
b.WriteString(titleCase(strings.ReplaceAll(h, "_", " ")))
265+
b.WriteString(" |")
266+
}
267+
b.WriteString("\n|")
268+
for range headers {
269+
b.WriteString("---|")
270+
}
271+
b.WriteString("\n")
272+
273+
// Data rows.
274+
for _, item := range items {
275+
b.WriteString("|")
276+
for _, h := range headers {
277+
b.WriteString(" ")
278+
if v, ok := item[h]; ok {
279+
b.WriteString(fmt.Sprintf("%v", v))
280+
}
281+
b.WriteString(" |")
282+
}
283+
b.WriteString("\n")
284+
}
285+
}
286+
287+
// asSliceOfMaps checks if val is a []any where all elements are map[string]any.
288+
func asSliceOfMaps(val any) ([]map[string]any, bool) {
289+
slice, ok := val.([]any)
290+
if !ok || len(slice) == 0 {
291+
return nil, false
292+
}
293+
result := make([]map[string]any, 0, len(slice))
294+
for _, item := range slice {
295+
m, ok := item.(map[string]any)
296+
if !ok {
297+
return nil, false
298+
}
299+
result = append(result, m)
300+
}
301+
return result, true
302+
}
303+
304+
func titleCase(s string) string {
305+
words := strings.Fields(s)
306+
for i, w := range words {
307+
if len(w) > 0 {
308+
words[i] = strings.ToUpper(w[:1]) + w[1:]
309+
}
310+
}
311+
return strings.Join(words, " ")
312+
}
313+
314+
func sortedKeys[M ~map[string]V, V any](m M) []string {
315+
keys := make([]string, 0, len(m))
316+
for k := range m {
317+
keys = append(keys, k)
318+
}
319+
sort.Strings(keys)
320+
return keys
321+
}
322+
121323
// sanitizeMap recursively converts typed maps (e.g., map[string]string) to
122324
// map[string]interface{} so they are compatible with structpb.NewStruct.
123325
func sanitizeMap(m map[string]any) map[string]any {

models/util_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,3 +159,109 @@ func TestRecordToJSONWithMultipleEdges(t *testing.T) {
159159
assert.Contains(t, s, `"owned_by"`)
160160
assert.Contains(t, s, `"urn:user:bob@co.com"`)
161161
}
162+
163+
func TestRecordToMarkdown(t *testing.T) {
164+
t.Run("minimal entity without properties or edges", func(t *testing.T) {
165+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", nil)
166+
record := models.NewRecord(entity)
167+
168+
md, err := models.RecordToMarkdown(record)
169+
require.NoError(t, err)
170+
s := string(md)
171+
assert.Contains(t, s, "## t1")
172+
assert.Contains(t, s, "| URN | `urn:test:s:table:t1` |")
173+
assert.Contains(t, s, "| Type | table |")
174+
assert.Contains(t, s, "| Source | test |")
175+
assert.NotContains(t, s, "### Properties")
176+
assert.NotContains(t, s, "### Edges")
177+
})
178+
179+
t.Run("entity with flat properties", func(t *testing.T) {
180+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", map[string]any{
181+
"database": "analytics",
182+
"schema": "public",
183+
})
184+
record := models.NewRecord(entity)
185+
186+
md, err := models.RecordToMarkdown(record)
187+
require.NoError(t, err)
188+
s := string(md)
189+
assert.Contains(t, s, "### Properties")
190+
assert.Contains(t, s, "- **database**: analytics")
191+
assert.Contains(t, s, "- **schema**: public")
192+
})
193+
194+
t.Run("entity with list-of-maps properties rendered as table", func(t *testing.T) {
195+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", map[string]any{
196+
"columns": []any{
197+
map[string]any{"name": "id", "data_type": "integer"},
198+
map[string]any{"name": "email", "data_type": "varchar"},
199+
},
200+
})
201+
record := models.NewRecord(entity)
202+
203+
md, err := models.RecordToMarkdown(record)
204+
require.NoError(t, err)
205+
s := string(md)
206+
assert.Contains(t, s, "### Columns")
207+
assert.Contains(t, s, "| Data Type | Name |")
208+
assert.Contains(t, s, "| integer | id |")
209+
assert.Contains(t, s, "| varchar | email |")
210+
})
211+
212+
t.Run("entity with edges", func(t *testing.T) {
213+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", nil)
214+
edges := []*meteorv1beta1.Edge{
215+
models.OwnerEdge("urn:test:s:table:t1", "urn:user:alice", "test"),
216+
models.DerivedFromEdge("urn:test:s:table:t1", "urn:test:s:table:upstream", "test"),
217+
}
218+
record := models.NewRecord(entity, edges...)
219+
220+
md, err := models.RecordToMarkdown(record)
221+
require.NoError(t, err)
222+
s := string(md)
223+
assert.Contains(t, s, "### Edges")
224+
assert.Contains(t, s, "| owned_by | `urn:test:s:table:t1` | `urn:user:alice` |")
225+
assert.Contains(t, s, "| derived_from | `urn:test:s:table:t1` | `urn:test:s:table:upstream` |")
226+
})
227+
228+
t.Run("entity with description", func(t *testing.T) {
229+
entity := &meteorv1beta1.Entity{
230+
Urn: "urn:test:s:table:t1",
231+
Name: "t1",
232+
Type: "table",
233+
Source: "test",
234+
Description: "Event tracking table",
235+
}
236+
record := models.NewRecord(entity)
237+
238+
md, err := models.RecordToMarkdown(record)
239+
require.NoError(t, err)
240+
assert.Contains(t, string(md), "| Description | Event tracking table |")
241+
})
242+
243+
t.Run("entity with nested map properties", func(t *testing.T) {
244+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", map[string]any{
245+
"labels": map[string]string{"env": "production", "team": "data"},
246+
})
247+
record := models.NewRecord(entity)
248+
249+
md, err := models.RecordToMarkdown(record)
250+
require.NoError(t, err)
251+
s := string(md)
252+
assert.Contains(t, s, "- **labels**:")
253+
assert.Contains(t, s, " - **env**: production")
254+
assert.Contains(t, s, " - **team**: data")
255+
})
256+
257+
t.Run("entity with scalar list properties", func(t *testing.T) {
258+
entity := models.NewEntity("urn:test:s:table:t1", "table", "t1", "test", map[string]any{
259+
"tags": []string{"important", "verified"},
260+
})
261+
record := models.NewRecord(entity)
262+
263+
md, err := models.RecordToMarkdown(record)
264+
require.NoError(t, err)
265+
assert.Contains(t, string(md), "- **tags**: important, verified")
266+
})
267+
}

plugins/sinks/console/README.md

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,62 @@
11
# Console
22

3-
Print metadata records to standard output as JSON.
3+
Print metadata records to standard output.
44

55
## Usage
66

77
```yaml
88
sinks:
99
- name: console
10+
config:
11+
format: json
1012
```
1113
1214
## Configuration
1315
14-
No configuration is required.
16+
| Key | Description | Default | Required |
17+
|---|---|---|---|
18+
| `format` | Output format: `json` or `markdown` | `json` | No |
1519

16-
## Behavior
20+
## Formats
1721

18-
Each Record (Entity + Edges) is serialized as JSON and printed to stdout, one JSON object per line. Useful for debugging recipes and inspecting extractor output.
22+
### JSON (default)
23+
24+
Each Record is serialized as a single JSON object per line:
25+
26+
```json
27+
{"entity":{"urn":"urn:postgres:prod:table:public.users","type":"table","name":"users","source":"postgres"},"edges":[...]}
28+
```
29+
30+
### Markdown
31+
32+
Each Record is rendered as a readable markdown document with tables for metadata, properties, and edges. Useful for piping into local AI tools like Claude Code:
33+
34+
```markdown
35+
## users
36+
37+
| Field | Value |
38+
|---|---|
39+
| URN | `urn:postgres:prod:table:public.users` |
40+
| Type | table |
41+
| Source | postgres |
42+
43+
### Properties
44+
45+
- **database**: mydb
46+
47+
### Columns
48+
49+
| Data Type | Name |
50+
|---|---|
51+
| integer | id |
52+
| varchar | email |
53+
54+
### Edges
55+
56+
| Type | Source URN | Target URN |
57+
|---|---|---|
58+
| owned_by | `urn:postgres:prod:table:public.users` | `urn:org:team:data-eng` |
59+
```
1960

2061
## Contributing
2162

0 commit comments

Comments
 (0)