Skip to content

Commit fe96cee

Browse files
committed
feat: add pagerduty, opsgenie, and kubernetes extractors
- Add pagerduty extractor for service and incident metadata from PagerDuty. Emits service and incident entities with owned_by and belongs_to edges. Supports configurable incident lookback window. - Add opsgenie extractor for service and incident metadata from OpsGenie. Supports EU instances via base_url config. - Add kubernetes extractor for cluster infrastructure topology. Extracts namespaces, deployments, services, and jobs with belongs_to edges. Uses raw HTTP against the K8s API with kubeconfig or in-cluster auth — no k8s.io/client-go dependency. All three use raw net/http clients with no new dependencies. Closes #505, closes #506.
1 parent 04fdea0 commit fe96cee

File tree

10 files changed

+1569
-0
lines changed

10 files changed

+1569
-0
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# kubernetes
2+
3+
Extract infrastructure topology metadata from a Kubernetes cluster.
4+
5+
## Usage
6+
7+
```yaml
8+
source:
9+
name: kubernetes
10+
scope: my-cluster
11+
config:
12+
kubeconfig: ~/.kube/config
13+
namespaces:
14+
- default
15+
- production
16+
extract:
17+
- namespaces
18+
- deployments
19+
- services
20+
- jobs
21+
exclude:
22+
- kube-system
23+
- kube-public
24+
```
25+
26+
## Config
27+
28+
| Key | Type | Required | Default | Description |
29+
| :-- | :--- | :------- | :------ | :---------- |
30+
| `kubeconfig` | `string` | No | Auto-detected | Path to kubeconfig file. Falls back to `KUBECONFIG` env, in-cluster config, then `~/.kube/config`. |
31+
| `namespaces` | `[]string` | No | All namespaces | Namespaces to extract from. |
32+
| `extract` | `[]string` | No | `["namespaces", "deployments", "services", "jobs"]` | Resource types to extract. Pods excluded by default. |
33+
| `exclude` | `[]string` | No | | Namespaces to exclude from extraction. |
34+
35+
## Entities
36+
37+
### `namespace`
38+
39+
Kubernetes namespace.
40+
41+
| Property | Description |
42+
| :------- | :---------- |
43+
| `labels` | Namespace labels |
44+
| `status` | Namespace phase (Active, Terminating) |
45+
| `created_at` | Creation timestamp |
46+
47+
### `deployment`
48+
49+
Kubernetes deployment.
50+
51+
| Property | Description |
52+
| :------- | :---------- |
53+
| `namespace` | Namespace name |
54+
| `replicas` | Desired replica count |
55+
| `ready_replicas` | Ready replica count |
56+
| `strategy` | Deployment strategy (RollingUpdate, Recreate) |
57+
| `containers` | List of container name/image pairs |
58+
| `labels` | Deployment labels |
59+
| `created_at` | Creation timestamp |
60+
61+
### `service`
62+
63+
Kubernetes service.
64+
65+
| Property | Description |
66+
| :------- | :---------- |
67+
| `namespace` | Namespace name |
68+
| `type` | Service type (ClusterIP, NodePort, LoadBalancer) |
69+
| `cluster_ip` | Cluster IP address |
70+
| `ports` | List of port configurations |
71+
| `selector` | Label selector |
72+
| `labels` | Service labels |
73+
| `created_at` | Creation timestamp |
74+
75+
### `job`
76+
77+
Kubernetes job.
78+
79+
| Property | Description |
80+
| :------- | :---------- |
81+
| `namespace` | Namespace name |
82+
| `completions` | Desired completions |
83+
| `active` | Active pod count |
84+
| `succeeded` | Succeeded pod count |
85+
| `failed` | Failed pod count |
86+
| `labels` | Job labels |
87+
| `created_at` | Creation timestamp |
88+
89+
## Edges
90+
91+
| From | To | Type | Description |
92+
| :--- | :- | :--- | :---------- |
93+
| `deployment` | `namespace` | `belongs_to` | Deployment belongs to a namespace |
94+
| `service` | `namespace` | `belongs_to` | Service belongs to a namespace |
95+
| `job` | `namespace` | `belongs_to` | Job belongs to a namespace |
96+
97+
## Authentication
98+
99+
The extractor supports three authentication methods, tried in order:
100+
101+
1. **Explicit kubeconfig** — set `kubeconfig` in config
102+
2. **In-cluster** — uses service account token at `/var/run/secrets/kubernetes.io/serviceaccount/token`
103+
3. **Default kubeconfig** — `~/.kube/config`
104+
105+
Both bearer token and client certificate authentication from kubeconfig are supported.
106+
107+
## Contribute
108+
109+
Refer to the [contribution guidelines](../../../docs/contribute/guide.mdx#adding-a-new-extractor) for information on contributing to this module.
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package kubernetes
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"encoding/base64"
8+
"encoding/json"
9+
"fmt"
10+
"io"
11+
"net/http"
12+
"os"
13+
"path/filepath"
14+
"time"
15+
16+
"gopkg.in/yaml.v3"
17+
)
18+
19+
// Client talks to the Kubernetes REST API.
20+
type Client struct {
21+
baseURL string
22+
httpClient *http.Client
23+
token string
24+
}
25+
26+
// NewClient creates a client from kubeconfig or in-cluster config.
27+
func NewClient(kubeconfigPath string) (*Client, error) {
28+
if kubeconfigPath != "" {
29+
return fromKubeconfig(kubeconfigPath)
30+
}
31+
32+
// Try KUBECONFIG env.
33+
if p := os.Getenv("KUBECONFIG"); p != "" {
34+
return fromKubeconfig(p)
35+
}
36+
37+
// Try in-cluster.
38+
if c, err := fromInCluster(); err == nil {
39+
return c, nil
40+
}
41+
42+
// Try default path.
43+
home, err := os.UserHomeDir()
44+
if err != nil {
45+
return nil, fmt.Errorf("cannot determine home directory: %w", err)
46+
}
47+
return fromKubeconfig(filepath.Join(home, ".kube", "config"))
48+
}
49+
50+
func fromInCluster() (*Client, error) {
51+
const (
52+
tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
53+
caPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
54+
host = "https://kubernetes.default.svc"
55+
)
56+
57+
tokenBytes, err := os.ReadFile(tokenPath)
58+
if err != nil {
59+
return nil, fmt.Errorf("read service account token: %w", err)
60+
}
61+
62+
tlsCfg := &tls.Config{MinVersion: tls.VersionTLS12}
63+
if caBytes, err := os.ReadFile(caPath); err == nil {
64+
pool := x509.NewCertPool()
65+
pool.AppendCertsFromPEM(caBytes)
66+
tlsCfg.RootCAs = pool
67+
}
68+
69+
return &Client{
70+
baseURL: host,
71+
token: string(tokenBytes),
72+
httpClient: &http.Client{
73+
Timeout: 30 * time.Second,
74+
Transport: &http.Transport{TLSClientConfig: tlsCfg},
75+
},
76+
}, nil
77+
}
78+
79+
// kubeconfig YAML structures (minimal).
80+
type kubeconfig struct {
81+
Clusters []kcCluster `yaml:"clusters"`
82+
Users []kcUser `yaml:"users"`
83+
Contexts []kcContext `yaml:"contexts"`
84+
CurrentContext string `yaml:"current-context"`
85+
}
86+
type kcCluster struct {
87+
Name string `yaml:"name"`
88+
Cluster struct {
89+
Server string `yaml:"server"`
90+
CertificateAuthorityData string `yaml:"certificate-authority-data"`
91+
InsecureSkipTLSVerify bool `yaml:"insecure-skip-tls-verify"`
92+
} `yaml:"cluster"`
93+
}
94+
type kcUser struct {
95+
Name string `yaml:"name"`
96+
User struct {
97+
Token string `yaml:"token"`
98+
ClientCertificateData string `yaml:"client-certificate-data"`
99+
ClientKeyData string `yaml:"client-key-data"`
100+
} `yaml:"user"`
101+
}
102+
type kcContext struct {
103+
Name string `yaml:"name"`
104+
Context struct {
105+
Cluster string `yaml:"cluster"`
106+
User string `yaml:"user"`
107+
} `yaml:"context"`
108+
}
109+
110+
func fromKubeconfig(path string) (*Client, error) {
111+
data, err := os.ReadFile(path)
112+
if err != nil {
113+
return nil, fmt.Errorf("read kubeconfig %s: %w", path, err)
114+
}
115+
116+
var kc kubeconfig
117+
if err := yaml.Unmarshal(data, &kc); err != nil {
118+
return nil, fmt.Errorf("parse kubeconfig: %w", err)
119+
}
120+
121+
// Resolve current context.
122+
var ctxCluster, ctxUser string
123+
for _, c := range kc.Contexts {
124+
if c.Name == kc.CurrentContext {
125+
ctxCluster = c.Context.Cluster
126+
ctxUser = c.Context.User
127+
break
128+
}
129+
}
130+
if ctxCluster == "" {
131+
return nil, fmt.Errorf("current-context %q not found in kubeconfig", kc.CurrentContext)
132+
}
133+
134+
var server string
135+
tlsCfg := &tls.Config{MinVersion: tls.VersionTLS12}
136+
137+
for _, cl := range kc.Clusters {
138+
if cl.Name == ctxCluster {
139+
server = cl.Cluster.Server
140+
if cl.Cluster.InsecureSkipTLSVerify {
141+
tlsCfg.InsecureSkipVerify = true //nolint:gosec // user-configured
142+
}
143+
if ca := cl.Cluster.CertificateAuthorityData; ca != "" {
144+
caBytes, err := base64.StdEncoding.DecodeString(ca)
145+
if err == nil {
146+
pool := x509.NewCertPool()
147+
pool.AppendCertsFromPEM(caBytes)
148+
tlsCfg.RootCAs = pool
149+
}
150+
}
151+
break
152+
}
153+
}
154+
if server == "" {
155+
return nil, fmt.Errorf("cluster %q not found in kubeconfig", ctxCluster)
156+
}
157+
158+
var token string
159+
for _, u := range kc.Users {
160+
if u.Name == ctxUser {
161+
token = u.User.Token
162+
if cert := u.User.ClientCertificateData; cert != "" {
163+
if key := u.User.ClientKeyData; key != "" {
164+
certBytes, _ := base64.StdEncoding.DecodeString(cert)
165+
keyBytes, _ := base64.StdEncoding.DecodeString(key)
166+
tlsCert, err := tls.X509KeyPair(certBytes, keyBytes)
167+
if err == nil {
168+
tlsCfg.Certificates = []tls.Certificate{tlsCert}
169+
}
170+
}
171+
}
172+
break
173+
}
174+
}
175+
176+
return &Client{
177+
baseURL: server,
178+
token: token,
179+
httpClient: &http.Client{
180+
Timeout: 30 * time.Second,
181+
Transport: &http.Transport{TLSClientConfig: tlsCfg},
182+
},
183+
}, nil
184+
}
185+
186+
func (c *Client) get(ctx context.Context, path string) (map[string]any, error) {
187+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
188+
if err != nil {
189+
return nil, fmt.Errorf("create request: %w", err)
190+
}
191+
if c.token != "" {
192+
req.Header.Set("Authorization", "Bearer "+c.token)
193+
}
194+
req.Header.Set("Accept", "application/json")
195+
196+
resp, err := c.httpClient.Do(req)
197+
if err != nil {
198+
return nil, fmt.Errorf("execute request: %w", err)
199+
}
200+
defer resp.Body.Close()
201+
202+
if resp.StatusCode != http.StatusOK {
203+
body, _ := io.ReadAll(resp.Body)
204+
return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, truncate(string(body), 200))
205+
}
206+
207+
var result map[string]any
208+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
209+
return nil, fmt.Errorf("decode response: %w", err)
210+
}
211+
return result, nil
212+
}
213+
214+
func (c *Client) ListNamespaces(ctx context.Context) (map[string]any, error) {
215+
return c.get(ctx, "/api/v1/namespaces")
216+
}
217+
218+
func (c *Client) ListDeployments(ctx context.Context, namespace string) (map[string]any, error) {
219+
return c.get(ctx, "/apis/apps/v1/namespaces/"+namespace+"/deployments")
220+
}
221+
222+
func (c *Client) ListServices(ctx context.Context, namespace string) (map[string]any, error) {
223+
return c.get(ctx, "/api/v1/namespaces/"+namespace+"/services")
224+
}
225+
226+
func (c *Client) ListPods(ctx context.Context, namespace string) (map[string]any, error) {
227+
return c.get(ctx, "/api/v1/namespaces/"+namespace+"/pods")
228+
}
229+
230+
func (c *Client) ListJobs(ctx context.Context, namespace string) (map[string]any, error) {
231+
return c.get(ctx, "/apis/batch/v1/namespaces/"+namespace+"/jobs")
232+
}
233+
234+
func truncate(s string, n int) string {
235+
if len(s) <= n {
236+
return s
237+
}
238+
return s[:n] + "..."
239+
}

0 commit comments

Comments
 (0)