diff --git a/plugins/extractors/confluence/README.md b/plugins/extractors/confluence/README.md new file mode 100644 index 00000000..57219323 --- /dev/null +++ b/plugins/extractors/confluence/README.md @@ -0,0 +1,78 @@ +# Confluence + +Extract page metadata and relationships from Confluence spaces using the Confluence REST API v2. + +## Usage + +```yaml +source: + name: confluence + scope: my-confluence + config: + base_url: https://mycompany.atlassian.net/wiki + username: user@company.com + token: your-api-token + spaces: + - ENG + - DATA + exclude: + - ARCHIVE +``` + +## Configuration + +| Key | Type | Required | Description | +| :-- | :--- | :------- | :---------- | +| `base_url` | `string` | Yes | Confluence base URL (e.g. `https://mycompany.atlassian.net/wiki`). | +| `username` | `string` | Yes | Atlassian account email for API authentication. | +| `token` | `string` | Yes | Atlassian API token. | +| `spaces` | `[]string` | No | Space keys to extract. Defaults to all spaces. | +| `exclude` | `[]string` | No | Space keys to exclude from extraction. | + +## Entities + +The extractor emits two entity types and their relationships as edges. + +### Entity: `space` + +| Field | Sample Value | +| :---- | :----------- | +| `urn` | `urn:confluence:my-confluence:space:ENG` | +| `name` | `Engineering` | +| `description` | `Engineering team documentation` | +| `properties.space_key` | `ENG` | +| `properties.space_type` | `global` | +| `properties.status` | `current` | +| `properties.web_url` | `https://mycompany.atlassian.net/wiki/spaces/ENG` | + +### Entity: `document` + +| Field | Sample Value | +| :---- | :----------- | +| `urn` | `urn:confluence:my-confluence:document:12345` | +| `name` | `Data Pipeline Architecture` | +| `properties.page_id` | `12345` | +| `properties.space_key` | `ENG` | +| `properties.status` | `current` | +| `properties.version` | `5` | +| `properties.labels` | `["architecture", "data"]` | +| `properties.created_at` | `2024-01-15T10:30:00Z` | +| `properties.updated_at` | `2024-03-20T14:15:00Z` | +| `properties.web_url` | `https://mycompany.atlassian.net/wiki/spaces/ENG/pages/12345` | + +### Edges + +| Type | Source | Target | Description | +| :--- | :----- | :----- | :---------- | +| `belongs_to` | `document` | `space` | Page belongs to a space | +| `child_of` | `document` | `document` | Page is a child of another page | +| `owned_by` | `document` | `user` | Page is owned by its author | +| `documented_by` | `document` | any | Page references a data asset via URN in its content | + +### URN Reference Detection + +The extractor scans page content for URN patterns (`urn:service:scope:type:id`) and emits `documented_by` edges linking the page to referenced data assets. This enables connecting business documentation to technical metadata. + +## Contributing + +Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. diff --git a/plugins/extractors/confluence/client.go b/plugins/extractors/confluence/client.go new file mode 100644 index 00000000..b26e815c --- /dev/null +++ b/plugins/extractors/confluence/client.go @@ -0,0 +1,228 @@ +package confluence + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// Page represents a Confluence page from the v2 API. +type Page struct { + ID string `json:"id"` + Title string `json:"title"` + Status string `json:"status"` + SpaceID string `json:"spaceId"` + ParentID string `json:"parentId"` + AuthorID string `json:"authorId"` + CreatedAt time.Time `json:"createdAt"` + Version struct { + Number int `json:"number"` + AuthorID string `json:"authorId"` + CreatedAt time.Time `json:"createdAt"` + } `json:"version"` + Body struct { + Storage struct { + Value string `json:"value"` + } `json:"storage"` + } `json:"body"` + Labels struct { + Results []Label `json:"results"` + } `json:"labels"` + Links struct { + WebUI string `json:"webui"` + } `json:"_links"` +} + +// Space represents a Confluence space. +type Space struct { + ID string `json:"id"` + Key string `json:"key"` + Name string `json:"name"` + Type string `json:"type"` + Status string `json:"status"` + Description struct { + Plain struct { + Value string `json:"value"` + } `json:"plain"` + } `json:"description"` + Links struct { + WebUI string `json:"webui"` + } `json:"_links"` +} + +// Label represents a Confluence label. +type Label struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type pageResponse struct { + Results []Page `json:"results"` + Links struct { + Next string `json:"next"` + } `json:"_links"` +} + +type spaceResponse struct { + Results []Space `json:"results"` + Links struct { + Next string `json:"next"` + } `json:"_links"` +} + +// Client wraps the Confluence REST API v2. +type Client struct { + baseURL string + httpClient *http.Client + username string + token string +} + +// NewClient creates a new Confluence API client. +func NewClient(baseURL, username, token string) *Client { + return &Client{ + baseURL: strings.TrimRight(baseURL, "/"), + httpClient: &http.Client{Timeout: 30 * time.Second}, + username: username, + token: token, + } +} + +// GetSpaces returns all spaces, optionally filtered by keys. +func (c *Client) GetSpaces(ctx context.Context, keys []string) ([]Space, error) { + var all []Space + cursor := "" + for { + params := url.Values{} + params.Set("limit", "25") + if len(keys) > 0 { + params.Set("keys", strings.Join(keys, ",")) + } + if cursor != "" { + params.Set("cursor", cursor) + } + + var resp spaceResponse + if err := c.get(ctx, "/api/v2/spaces", params, &resp); err != nil { + return nil, fmt.Errorf("get spaces: %w", err) + } + all = append(all, resp.Results...) + + cursor = parseCursor(resp.Links.Next) + if cursor == "" { + break + } + } + return all, nil +} + +// GetPages returns all pages in a space. +func (c *Client) GetPages(ctx context.Context, spaceID string) ([]Page, error) { + var all []Page + cursor := "" + for { + params := url.Values{} + params.Set("space-id", spaceID) + params.Set("limit", "25") + params.Set("body-format", "storage") + if cursor != "" { + params.Set("cursor", cursor) + } + + var resp pageResponse + if err := c.get(ctx, "/api/v2/pages", params, &resp); err != nil { + return nil, fmt.Errorf("get pages for space %s: %w", spaceID, err) + } + all = append(all, resp.Results...) + + cursor = parseCursor(resp.Links.Next) + if cursor == "" { + break + } + } + return all, nil +} + +// GetPageLabels returns all labels for a page, handling pagination. +func (c *Client) GetPageLabels(ctx context.Context, pageID string) ([]Label, error) { + var all []Label + cursor := "" + for { + params := url.Values{} + params.Set("limit", "25") + if cursor != "" { + params.Set("cursor", cursor) + } + + var resp struct { + Results []Label `json:"results"` + Links struct { + Next string `json:"next"` + } `json:"_links"` + } + if err := c.get(ctx, "/api/v2/pages/"+pageID+"/labels", params, &resp); err != nil { + return nil, fmt.Errorf("get labels for page %s: %w", pageID, err) + } + all = append(all, resp.Results...) + + cursor = parseCursor(resp.Links.Next) + if cursor == "" { + break + } + } + return all, nil +} + +func (c *Client) get(ctx context.Context, path string, params url.Values, out any) error { + u := c.baseURL + path + if len(params) > 0 { + u += "?" + params.Encode() + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.SetBasicAuth(c.username, c.token) + req.Header.Set("Accept", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status %d: %s", resp.StatusCode, truncate(string(body), 200)) + } + + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode response: %w", err) + } + return nil +} + +// parseCursor extracts the cursor parameter from a next-link URL. +func parseCursor(nextLink string) string { + if nextLink == "" { + return "" + } + u, err := url.Parse(nextLink) + if err != nil { + return "" + } + return u.Query().Get("cursor") +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "..." +} diff --git a/plugins/extractors/confluence/confluence.go b/plugins/extractors/confluence/confluence.go new file mode 100644 index 00000000..192ad78c --- /dev/null +++ b/plugins/extractors/confluence/confluence.go @@ -0,0 +1,232 @@ +package confluence + +import ( + "context" + _ "embed" + "fmt" + "regexp" + "strings" + "time" + + "github.com/raystack/meteor/models" + meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1" + "github.com/raystack/meteor/plugins" + "github.com/raystack/meteor/registry" + log "github.com/raystack/salt/observability/logger" +) + +//go:embed README.md +var summary string + +type Config struct { + BaseURL string `json:"base_url" yaml:"base_url" mapstructure:"base_url" validate:"required"` + Username string `json:"username" yaml:"username" mapstructure:"username" validate:"required"` + Token string `json:"token" yaml:"token" mapstructure:"token" validate:"required"` + Spaces []string `json:"spaces" yaml:"spaces" mapstructure:"spaces"` + Exclude []string `json:"exclude" yaml:"exclude" mapstructure:"exclude"` +} + +var sampleConfig = ` +# Confluence base URL (required) +base_url: https://mycompany.atlassian.net/wiki +# Atlassian account email (required) +username: user@company.com +# Atlassian API token (required) +token: your-api-token +# Filter to specific space keys (optional, defaults to all spaces) +spaces: + - ENG + - DATA +# Exclude space keys (optional) +exclude: + - ARCHIVE` + +var info = plugins.Info{ + Description: "Extract page metadata and relationships from Confluence spaces.", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"confluence", "extractor"}, +} + +type Extractor struct { + plugins.BaseExtractor + logger log.Logger + config Config + client *Client + exclude map[string]bool +} + +func New(logger log.Logger) *Extractor { + e := &Extractor{logger: logger} + e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config) + return e +} + +func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { + if err := e.BaseExtractor.Init(ctx, config); err != nil { + return err + } + + e.client = NewClient(e.config.BaseURL, e.config.Username, e.config.Token) + + e.exclude = make(map[string]bool, len(e.config.Exclude)) + for _, key := range e.config.Exclude { + e.exclude[key] = true + } + + return nil +} + +func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { + spaces, err := e.client.GetSpaces(ctx, e.config.Spaces) + if err != nil { + return fmt.Errorf("get spaces: %w", err) + } + + for _, space := range spaces { + if e.exclude[space.Key] { + continue + } + + emit(e.buildSpaceRecord(space)) + + if err := e.extractPages(ctx, emit, space); err != nil { + e.logger.Warn("failed to extract pages from space, skipping", + "space", space.Key, "error", err) + } + } + + return nil +} + +func (e *Extractor) extractPages(ctx context.Context, emit plugins.Emit, space Space) error { + pages, err := e.client.GetPages(ctx, space.ID) + if err != nil { + return err + } + + spaceURN := models.NewURN("confluence", e.UrnScope, "space", space.Key) + for _, page := range pages { + labels, err := e.client.GetPageLabels(ctx, page.ID) + if err != nil { + e.logger.Warn("failed to get labels for page, skipping labels", + "page_id", page.ID, "error", err) + } + emit(e.buildPageRecord(page, space, spaceURN, labels)) + } + + return nil +} + +func (e *Extractor) buildSpaceRecord(space Space) models.Record { + urn := models.NewURN("confluence", e.UrnScope, "space", space.Key) + + props := map[string]any{ + "space_key": space.Key, + "space_type": space.Type, + "status": space.Status, + } + if space.Links.WebUI != "" { + props["web_url"] = e.config.BaseURL + space.Links.WebUI + } + + entity := models.NewEntity(urn, "space", space.Name, "confluence", props) + if desc := space.Description.Plain.Value; desc != "" { + entity.Description = desc + } + + return models.NewRecord(entity) +} + +func (e *Extractor) buildPageRecord(page Page, space Space, spaceURN string, labels []Label) models.Record { + urn := models.NewURN("confluence", e.UrnScope, "document", page.ID) + + labelNames := make([]string, 0, len(labels)) + for _, l := range labels { + labelNames = append(labelNames, l.Name) + } + + props := map[string]any{ + "page_id": page.ID, + "space_key": space.Key, + "status": page.Status, + "version": page.Version.Number, + "created_at": page.CreatedAt.UTC().Format(time.RFC3339), + "updated_at": page.Version.CreatedAt.UTC().Format(time.RFC3339), + } + if len(labelNames) > 0 { + props["labels"] = labelNames + } + if page.Links.WebUI != "" { + props["web_url"] = e.config.BaseURL + page.Links.WebUI + } + + entity := models.NewEntity(urn, "document", page.Title, "confluence", props) + + var edges []*meteorv1beta1.Edge + + // Page belongs to space. + edges = append(edges, &meteorv1beta1.Edge{ + SourceUrn: urn, + TargetUrn: spaceURN, + Type: "belongs_to", + Source: "confluence", + }) + + // Page hierarchy: child_of parent page. + if page.ParentID != "" { + parentURN := models.NewURN("confluence", e.UrnScope, "document", page.ParentID) + edges = append(edges, &meteorv1beta1.Edge{ + SourceUrn: urn, + TargetUrn: parentURN, + Type: "child_of", + Source: "confluence", + }) + } + + // Owner: page author. + if page.AuthorID != "" { + ownerURN := models.NewURN("confluence", e.UrnScope, "user", page.AuthorID) + edges = append(edges, models.OwnerEdge(urn, ownerURN, "confluence")) + } + + // Scan page body for URN references to link to data assets. + if body := page.Body.Storage.Value; body != "" { + for _, ref := range extractURNReferences(body) { + edges = append(edges, &meteorv1beta1.Edge{ + SourceUrn: urn, + TargetUrn: ref, + Type: "documented_by", + Source: "confluence", + }) + } + } + + return models.NewRecord(entity, edges...) +} + +// urnPattern matches URN references embedded in page content. +var urnPattern = regexp.MustCompile(`urn:[a-zA-Z0-9_-]+:[a-zA-Z0-9_.-]+:[a-zA-Z0-9_-]+:[a-zA-Z0-9_./-]+`) + +// extractURNReferences finds URN strings in HTML/storage-format content. +func extractURNReferences(body string) []string { + matches := urnPattern.FindAllString(body, -1) + seen := make(map[string]bool, len(matches)) + var unique []string + for _, m := range matches { + cleaned := strings.TrimRight(m, ".,;:!?\"')") + if !seen[cleaned] { + seen[cleaned] = true + unique = append(unique, cleaned) + } + } + return unique +} + +func init() { + if err := registry.Extractors.Register("confluence", func() plugins.Extractor { + return New(plugins.GetLog()) + }); err != nil { + panic(err) + } +} diff --git a/plugins/extractors/confluence/confluence_test.go b/plugins/extractors/confluence/confluence_test.go new file mode 100644 index 00000000..44d17ad8 --- /dev/null +++ b/plugins/extractors/confluence/confluence_test.go @@ -0,0 +1,344 @@ +//go:build plugins +// +build plugins + +package confluence_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/raystack/meteor/models" + meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1" + "github.com/raystack/meteor/plugins" + extractor "github.com/raystack/meteor/plugins/extractors/confluence" + "github.com/raystack/meteor/test/mocks" + testutils "github.com/raystack/meteor/test/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const urnScope = "test-confluence" + +func TestInit(t *testing.T) { + t.Run("should return error when base_url is missing", func(t *testing.T) { + err := extractor.New(testutils.Logger).Init(context.TODO(), plugins.Config{ + URNScope: urnScope, + RawConfig: map[string]any{ + "username": "user@test.com", + "token": "test-token", + }, + }) + assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) + }) + + t.Run("should return error when token is missing", func(t *testing.T) { + err := extractor.New(testutils.Logger).Init(context.TODO(), plugins.Config{ + URNScope: urnScope, + RawConfig: map[string]any{ + "base_url": "https://test.atlassian.net/wiki", + "username": "user@test.com", + }, + }) + assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) + }) + + t.Run("should succeed with valid config", func(t *testing.T) { + err := extractor.New(testutils.Logger).Init(context.TODO(), plugins.Config{ + URNScope: urnScope, + RawConfig: map[string]any{ + "base_url": "https://test.atlassian.net/wiki", + "username": "user@test.com", + "token": "test-token", + }, + }) + assert.NoError(t, err) + }) +} + +func TestExtract(t *testing.T) { + t.Run("should extract spaces and pages with edges", func(t *testing.T) { + server := newMockServer(t) + defer server.Close() + + extr := initExtractor(t, map[string]any{ + "base_url": server.URL, + "username": "user@test.com", + "token": "test-token", + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + // 1 space + 2 pages = 3 records. + require.Len(t, records, 3) + + // Verify space entity. + spaceRecord := findByType(records, "space") + require.NotNil(t, spaceRecord) + spaceEntity := spaceRecord.Entity() + assert.Equal(t, "Engineering", spaceEntity.GetName()) + assert.Equal(t, "confluence", spaceEntity.GetSource()) + props := spaceEntity.GetProperties().AsMap() + assert.Equal(t, "ENG", props["space_key"]) + + // Verify page entity. + pageRecords := findAllByType(records, "document") + require.Len(t, pageRecords, 2) + + // Find the child page. + childPage := findByURNSuffix(records, "202") + require.NotNil(t, childPage) + childEntity := childPage.Entity() + assert.Equal(t, "Child Page", childEntity.GetName()) + + // Verify edges on child page: belongs_to, child_of, owned_by. + edges := childPage.Edges() + assert.Len(t, edges, 3) + assert.NotNil(t, findEdge(edges, "belongs_to")) + assert.NotNil(t, findEdge(edges, "child_of")) + assert.NotNil(t, findEdge(edges, "owned_by")) + + childOfEdge := findEdge(edges, "child_of") + assert.Contains(t, childOfEdge.GetTargetUrn(), "201") + }) + + t.Run("should detect URN references in page content", func(t *testing.T) { + server := newMockServerWithURNs(t) + defer server.Close() + + extr := initExtractor(t, map[string]any{ + "base_url": server.URL, + "username": "user@test.com", + "token": "test-token", + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + pageRecords := findAllByType(records, "document") + require.Len(t, pageRecords, 1) + + edges := pageRecords[0].Edges() + docEdge := findEdge(edges, "documented_by") + require.NotNil(t, docEdge) + assert.Equal(t, "urn:bigquery:prod:table:project.dataset.orders", docEdge.GetTargetUrn()) + }) + + t.Run("should exclude spaces in exclude list", func(t *testing.T) { + server := newMockServerMultiSpace(t) + defer server.Close() + + extr := initExtractor(t, map[string]any{ + "base_url": server.URL, + "username": "user@test.com", + "token": "test-token", + "exclude": []any{"ARCHIVE"}, + }) + + emitter := mocks.NewEmitter() + err := extr.Extract(context.Background(), emitter.Push) + require.NoError(t, err) + + records := emitter.Get() + spaceRecords := findAllByType(records, "space") + require.Len(t, spaceRecords, 1) + + props := spaceRecords[0].Entity().GetProperties().AsMap() + assert.Equal(t, "ENG", props["space_key"]) + }) +} + +// --- test helpers --- + +func initExtractor(t *testing.T, rawConfig map[string]any) *extractor.Extractor { + t.Helper() + extr := extractor.New(testutils.Logger) + err := extr.Init(context.Background(), plugins.Config{ + URNScope: urnScope, + RawConfig: rawConfig, + }) + require.NoError(t, err) + return extr +} + +func findByType(records []models.Record, typ string) *models.Record { + for i, r := range records { + if r.Entity().GetType() == typ { + return &records[i] + } + } + return nil +} + +func findAllByType(records []models.Record, typ string) []models.Record { + var out []models.Record + for _, r := range records { + if r.Entity().GetType() == typ { + out = append(out, r) + } + } + return out +} + +func findByURNSuffix(records []models.Record, suffix string) *models.Record { + for i, r := range records { + urn := r.Entity().GetUrn() + if len(urn) >= len(suffix) && urn[len(urn)-len(suffix):] == suffix { + return &records[i] + } + } + return nil +} + +func findEdge(edges []*meteorv1beta1.Edge, typ string) *meteorv1beta1.Edge { + for _, e := range edges { + if e.GetType() == typ { + return e + } + } + return nil +} + +func newMockServer(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/api/v2/spaces", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + { + "id": "100", + "key": "ENG", + "name": "Engineering", + "type": "global", + "status": "current", + "_links": map[string]any{"webui": "/spaces/ENG"}, + }, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + { + "id": "201", + "title": "Architecture Overview", + "status": "current", + "spaceId": "100", + "parentId": "", + "authorId": "user-1", + "version": map[string]any{"number": 3, "authorId": "user-1", "createdAt": "2024-03-20T14:00:00Z"}, + "body": map[string]any{"storage": map[string]any{"value": "

Overview of architecture

"}}, + "_links": map[string]any{"webui": "/spaces/ENG/pages/201"}, + }, + { + "id": "202", + "title": "Child Page", + "status": "current", + "spaceId": "100", + "parentId": "201", + "authorId": "user-2", + "version": map[string]any{"number": 1, "authorId": "user-2", "createdAt": "2024-03-21T10:00:00Z"}, + "body": map[string]any{"storage": map[string]any{"value": "

Child content

"}}, + "_links": map[string]any{"webui": "/spaces/ENG/pages/202"}, + }, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages/201/labels", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + {"id": "1", "name": "architecture"}, + }, + }) + }) + + mux.HandleFunc("/api/v2/pages/202/labels", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{}, + }) + }) + + return httptest.NewServer(mux) +} + +func newMockServerWithURNs(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/api/v2/spaces", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + {"id": "100", "key": "DATA", "name": "Data", "type": "global", "status": "current", "_links": map[string]any{}}, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + { + "id": "301", + "title": "Orders Pipeline", + "status": "current", + "spaceId": "100", + "parentId": "", + "authorId": "user-1", + "version": map[string]any{"number": 1, "authorId": "user-1", "createdAt": "2024-01-01T00:00:00Z"}, + "body": map[string]any{"storage": map[string]any{ + "value": "

This pipeline reads from urn:bigquery:prod:table:project.dataset.orders and writes results.

", + }}, + "_links": map[string]any{}, + }, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages/301/labels", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{"results": []map[string]any{}}) + }) + + return httptest.NewServer(mux) +} + +func newMockServerMultiSpace(t *testing.T) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/api/v2/spaces", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + {"id": "100", "key": "ENG", "name": "Engineering", "type": "global", "status": "current", "_links": map[string]any{}}, + {"id": "200", "key": "ARCHIVE", "name": "Archive", "type": "global", "status": "current", "_links": map[string]any{}}, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{}, + "_links": map[string]any{}, + }) + }) + + return httptest.NewServer(mux) +} + +func writeJSON(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(v) //nolint:errcheck +} diff --git a/plugins/extractors/populate.go b/plugins/extractors/populate.go index 7f55803a..606922de 100644 --- a/plugins/extractors/populate.go +++ b/plugins/extractors/populate.go @@ -6,6 +6,7 @@ import ( _ "github.com/raystack/meteor/plugins/extractors/bigtable" _ "github.com/raystack/meteor/plugins/extractors/cassandra" _ "github.com/raystack/meteor/plugins/extractors/clickhouse" + _ "github.com/raystack/meteor/plugins/extractors/confluence" _ "github.com/raystack/meteor/plugins/extractors/couchdb" _ "github.com/raystack/meteor/plugins/extractors/csv" _ "github.com/raystack/meteor/plugins/extractors/dbt" diff --git a/test/e2e/confluence_file/confluence_file_test.go b/test/e2e/confluence_file/confluence_file_test.go new file mode 100644 index 00000000..0f9b9c71 --- /dev/null +++ b/test/e2e/confluence_file/confluence_file_test.go @@ -0,0 +1,328 @@ +//go:build integration +// +build integration + +package confluence_file_test + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/raystack/meteor/cmd" + _ "github.com/raystack/meteor/plugins/extractors" + _ "github.com/raystack/meteor/plugins/processors" + _ "github.com/raystack/meteor/plugins/sinks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfluenceToFile(t *testing.T) { + // Start a mock Confluence v2 API server. + mock := newMockConfluenceServer() + server := httptest.NewServer(mock) + defer server.Close() + + // Create a temp file for the file sink output. + outFile, err := os.CreateTemp("", "confluence-e2e-*.ndjson") + require.NoError(t, err) + outFile.Close() + defer os.Remove(outFile.Name()) + + // Write a temporary recipe. + recipeContent := fmt.Sprintf(` +name: confluence-to-file-e2e +version: v1beta1 +source: + name: confluence + scope: e2e-test + config: + base_url: %s + username: test@example.com + token: test-token + spaces: + - ENG +sinks: + - name: file + config: + path: %s + format: ndjson + overwrite: true +`, server.URL, outFile.Name()) + + recipeFile, err := os.CreateTemp("", "recipe-*.yml") + require.NoError(t, err) + defer os.Remove(recipeFile.Name()) + + _, err = recipeFile.WriteString(recipeContent) + require.NoError(t, err) + require.NoError(t, recipeFile.Close()) + + // Run meteor with the recipe. + command := cmd.RunCmd() + command.SetArgs([]string{recipeFile.Name()}) + err = command.Execute() + require.NoError(t, err) + + // Read and parse the output file. + data, err := os.ReadFile(outFile.Name()) + require.NoError(t, err) + + records := parseNDJSON(t, data) + + // Expect: 1 space + 3 pages = 4 records. + require.Len(t, records, 4, "expected 1 space + 3 pages") + + // Verify space record. + spaceRec := findByEntityType(records, "space") + require.Len(t, spaceRec, 1) + spaceEntity := spaceRec[0]["entity"].(map[string]any) + assert.Equal(t, "Engineering", spaceEntity["name"]) + assert.Equal(t, "confluence", spaceEntity["source"]) + assert.Contains(t, spaceEntity["urn"], "space:ENG") + + spaceProps := spaceEntity["properties"].(map[string]any) + assert.Equal(t, "ENG", spaceProps["space_key"]) + assert.Equal(t, "global", spaceProps["space_type"]) + + // Verify document records. + docRecs := findByEntityType(records, "document") + require.Len(t, docRecs, 3) + + // Find specific pages by name. + archPage := findByName(docRecs, "Architecture Overview") + require.NotNil(t, archPage) + archEntity := archPage["entity"].(map[string]any) + assert.Contains(t, archEntity["urn"], "document:101") + + // Verify edges on Architecture Overview: belongs_to + owned_by (no parent). + archEdges := toEdges(archPage) + assert.Len(t, archEdges, 2) + assertHasEdgeType(t, archEdges, "belongs_to") + assertHasEdgeType(t, archEdges, "owned_by") + + // Verify child page has child_of edge. + childPage := findByName(docRecs, "Database Design") + require.NotNil(t, childPage) + childEdges := toEdges(childPage) + assertHasEdgeType(t, childEdges, "child_of") + assertHasEdgeType(t, childEdges, "belongs_to") + assertHasEdgeType(t, childEdges, "owned_by") + + // Verify URN-reference page has documented_by edge. + urnPage := findByName(docRecs, "Pipeline Docs") + require.NotNil(t, urnPage) + urnEdges := toEdges(urnPage) + assertHasEdgeType(t, urnEdges, "documented_by") + + docByEdge := findEdgeByType(urnEdges, "documented_by") + require.NotNil(t, docByEdge) + assert.Equal(t, "urn:bigquery:prod:table:project.dataset.orders", docByEdge["target_urn"]) + + // Verify labels on Architecture Overview. + archProps := archEntity["properties"].(map[string]any) + labels, ok := archProps["labels"].([]any) + require.True(t, ok) + assert.Contains(t, labels, "architecture") + assert.Contains(t, labels, "design") + + // Print summary. + fmt.Printf("\n=== Confluence E2E Test Summary ===\n") + fmt.Printf("Total records: %d\n", len(records)) + fmt.Printf(" Spaces: %d\n", len(spaceRec)) + fmt.Printf(" Documents: %d\n", len(docRecs)) + for _, r := range records { + e := r["entity"].(map[string]any) + fmt.Printf(" - %s (type=%s, urn=%s)\n", e["name"], e["type"], e["urn"]) + } +} + +// --- Mock Confluence Server --- + +func newMockConfluenceServer() http.Handler { + mux := http.NewServeMux() + + mux.HandleFunc("/api/v2/spaces", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + { + "id": "10", + "key": "ENG", + "name": "Engineering", + "type": "global", + "status": "current", + "description": map[string]any{ + "plain": map[string]any{"value": "Engineering team docs"}, + }, + "_links": map[string]any{"webui": "/spaces/ENG"}, + }, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + { + "id": "101", + "title": "Architecture Overview", + "status": "current", + "spaceId": "10", + "parentId": "", + "authorId": "user-a", + "createdAt": "2024-01-10T09:00:00Z", + "version": map[string]any{"number": 5, "authorId": "user-a", "createdAt": "2024-03-15T14:30:00Z"}, + "body": map[string]any{"storage": map[string]any{"value": "

High-level architecture overview.

"}}, + "_links": map[string]any{"webui": "/spaces/ENG/pages/101"}, + }, + { + "id": "102", + "title": "Database Design", + "status": "current", + "spaceId": "10", + "parentId": "101", + "authorId": "user-b", + "createdAt": "2024-02-01T10:00:00Z", + "version": map[string]any{"number": 2, "authorId": "user-b", "createdAt": "2024-03-20T11:00:00Z"}, + "body": map[string]any{"storage": map[string]any{"value": "

Database schema design decisions.

"}}, + "_links": map[string]any{"webui": "/spaces/ENG/pages/102"}, + }, + { + "id": "103", + "title": "Pipeline Docs", + "status": "current", + "spaceId": "10", + "parentId": "101", + "authorId": "user-a", + "createdAt": "2024-03-01T08:00:00Z", + "version": map[string]any{"number": 1, "authorId": "user-a", "createdAt": "2024-03-01T08:00:00Z"}, + "body": map[string]any{"storage": map[string]any{ + "value": "

This pipeline reads from urn:bigquery:prod:table:project.dataset.orders daily.

", + }}, + "_links": map[string]any{"webui": "/spaces/ENG/pages/103"}, + }, + }, + "_links": map[string]any{}, + }) + }) + + mux.HandleFunc("/api/v2/pages/101/labels", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + {"id": "1", "name": "architecture"}, + {"id": "2", "name": "design"}, + }, + }) + }) + + mux.HandleFunc("/api/v2/pages/102/labels", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{ + {"id": "3", "name": "database"}, + }, + }) + }) + + mux.HandleFunc("/api/v2/pages/103/labels", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, map[string]any{ + "results": []map[string]any{}, + }) + }) + + return mux +} + +func writeJSON(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(v) //nolint:errcheck +} + +// --- Assertion helpers --- + +func parseNDJSON(t *testing.T, data []byte) []map[string]any { + t.Helper() + var records []map[string]any + for _, line := range splitLines(data) { + if len(line) == 0 { + continue + } + var rec map[string]any + require.NoError(t, json.Unmarshal(line, &rec)) + records = append(records, rec) + } + return records +} + +func splitLines(data []byte) [][]byte { + var lines [][]byte + start := 0 + for i, b := range data { + if b == '\n' { + lines = append(lines, data[start:i]) + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, data[start:]) + } + return lines +} + +func findByEntityType(records []map[string]any, typ string) []map[string]any { + var out []map[string]any + for _, r := range records { + if e, ok := r["entity"].(map[string]any); ok { + if e["type"] == typ { + out = append(out, r) + } + } + } + return out +} + +func findByName(records []map[string]any, name string) map[string]any { + for _, r := range records { + if e, ok := r["entity"].(map[string]any); ok { + if e["name"] == name { + return r + } + } + } + return nil +} + +func toEdges(record map[string]any) []map[string]any { + edgesRaw, ok := record["edges"].([]any) + if !ok { + return nil + } + var edges []map[string]any + for _, e := range edgesRaw { + if m, ok := e.(map[string]any); ok { + edges = append(edges, m) + } + } + return edges +} + +func findEdgeByType(edges []map[string]any, typ string) map[string]any { + for _, e := range edges { + if e["type"] == typ { + return e + } + } + return nil +} + +func assertHasEdgeType(t *testing.T, edges []map[string]any, typ string) { + t.Helper() + for _, e := range edges { + if e["type"] == typ { + return + } + } + t.Errorf("expected edge of type %q, found none in %d edges", typ, len(edges)) +}