Skip to content

Commit 2943889

Browse files
committed
Add ability to pipe the agent to a custom integration for data mapping
1 parent f194608 commit 2943889

2 files changed

Lines changed: 53 additions & 11 deletions

File tree

cmd/root.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cmd
33
import (
44
"context"
55
"fmt"
6+
"github.com/go-resty/resty/v2"
67
"os"
78
"runtime"
89
"strings"
@@ -51,7 +52,7 @@ var rootCmd = &cobra.Command{
5152
var wg sync.WaitGroup
5253
ctx := signal.Init(context.Background())
5354
// go workers.NewWebhookWorker().Run(ctx, &wg)
54-
go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), resync, flush)
55+
go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), newRESTClient(), resync, flush)
5556

5657
time.Sleep(1 * time.Second)
5758
wg.Wait()
@@ -162,6 +163,15 @@ func LoadConfig() (*config.Configuration, error) {
162163
return &output, nil
163164
}
164165

166+
func newRESTClient() *resty.Client {
167+
client := opslevel.NewRestClient(
168+
opslevel.SetURL(viper.GetString("api-url")),
169+
opslevel.SetUserAgentExtra(fmt.Sprintf("agent-%s", _version)),
170+
opslevel.SetTimeout(time.Second*time.Duration(viper.GetInt("api-timeout"))),
171+
)
172+
return client
173+
}
174+
165175
func newClient() *opslevel.Client {
166176
client := opslevel.NewGQLClient(
167177
opslevel.SetAPIToken(viper.GetString("api-token")),

workers/k8s.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package workers
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
7+
"github.com/go-resty/resty/v2"
8+
"strings"
69
"sync"
710
"time"
811

@@ -18,30 +21,39 @@ type K8SWorker struct {
1821
cluster string
1922
integration string
2023
client *opslevel.Client
24+
rest *resty.Client
2125
controller *controller.Controller
2226
}
2327

24-
func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, resync, flush time.Duration) {
28+
func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, rest *resty.Client, resync, flush time.Duration) {
2529
controller.Run(ctx, wg, selectors, resync, flush, &K8SWorker{
26-
client: client,
2730
cluster: cluster,
2831
integration: integration,
32+
client: client,
33+
rest: rest,
2934
})
3035
}
3136

3237
func (s *K8SWorker) Handle(evt controller.Event) {
3338
kind := evt.ExternalKind()
3439
id := evt.ExternalID(s.cluster)
3540

36-
switch evt.Op {
37-
case controller.OpCreate, controller.OpUpdate:
38-
value, err := s.parse(evt.New)
39-
if err != nil {
40-
log.Error().Err(err).Msgf("failed to convert k8s resource")
41+
if strings.Contains(s.integration, "integrations/custom/webhook") {
42+
switch evt.Op {
43+
case controller.OpCreate, controller.OpUpdate:
44+
s.sendEvent(kind, id, evt.New)
45+
}
46+
} else {
47+
switch evt.Op {
48+
case controller.OpCreate, controller.OpUpdate:
49+
value, err := s.parse(evt.New)
50+
if err != nil {
51+
log.Error().Err(err).Msgf("failed to convert k8s resource")
52+
}
53+
s.sendUpsert(kind, id, value)
54+
case controller.OpDelete:
55+
s.sendDelete(kind, id)
4156
}
42-
s.sendUpsert(kind, id, value)
43-
case controller.OpDelete:
44-
s.sendDelete(kind, id)
4557
}
4658
}
4759

@@ -62,6 +74,26 @@ func (s *K8SWorker) parse(item *unstructured.Unstructured) (opslevel.JSON, error
6274
return data, nil
6375
}
6476

77+
func (s *K8SWorker) sendEvent(kind string, id string, value *unstructured.Unstructured) {
78+
kind = strings.Replace(kind, "/", "_", -1)
79+
if viper.GetBool("dry-run") {
80+
log.Info().Msgf("[DRYRUN] POST %s | %s", kind, id)
81+
log.Debug().Msgf("\t%#v", value)
82+
} else {
83+
url := fmt.Sprintf("%s?external_kind=%s", s.integration, kind)
84+
resp, err := s.rest.R().SetBody(value).Post(url)
85+
if err != nil {
86+
log.Error().Err(err).Msgf("error during post")
87+
return
88+
}
89+
if resp.StatusCode() > 299 {
90+
log.Error().Msgf("%v", resp)
91+
return
92+
}
93+
log.Info().Msgf("POST %s | %s", kind, id)
94+
}
95+
}
96+
6597
func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) {
6698
var m struct {
6799
Payload struct {

0 commit comments

Comments
 (0)