Skip to content

Commit bd6a4fc

Browse files
authored
Merge pull request #20 from OpsLevel/kr/custom-data-mapping
Add ability to pipe the agent to a custom integration for data mapping
2 parents 924c42b + f3fcf70 commit bd6a4fc

3 files changed

Lines changed: 60 additions & 20 deletions

File tree

cmd/root.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ import (
99
"sync"
1010
"time"
1111

12-
"gopkg.in/yaml.v3"
13-
"k8s.io/utils/path"
14-
"opslevel-agent/config"
15-
12+
"github.com/go-resty/resty/v2"
1613
"github.com/opslevel/opslevel-go/v2025"
1714
"github.com/rs/zerolog"
1815
"github.com/rs/zerolog/log"
1916
"github.com/spf13/cobra"
2017
"github.com/spf13/viper"
2118
"go.uber.org/automaxprocs/maxprocs"
19+
"gopkg.in/yaml.v3"
20+
"k8s.io/utils/path"
21+
"opslevel-agent/config"
2222
"opslevel-agent/signal"
2323
"opslevel-agent/workers"
2424
)
@@ -54,7 +54,7 @@ opslevel-agent commit: %s (%s)
5454
var wg sync.WaitGroup
5555
ctx := signal.Init(context.Background())
5656
// go workers.NewWebhookWorker().Run(ctx, &wg)
57-
go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newClient(), resync, flush)
57+
go workers.NewK8SWorker(ctx, &wg, cluster, integration, configuration.Selectors, newGraphClient(), newRESTClient(), resync, flush)
5858

5959
time.Sleep(1 * time.Second)
6060
wg.Wait()
@@ -165,7 +165,14 @@ func LoadConfig() (*config.Configuration, error) {
165165
return &output, nil
166166
}
167167

168-
func newClient() *opslevel.Client {
168+
func newRESTClient() *resty.Client {
169+
return opslevel.NewRestClient(
170+
opslevel.SetUserAgentExtra(fmt.Sprintf("agent-%s", _version)),
171+
opslevel.SetTimeout(time.Second*time.Duration(viper.GetInt("api-timeout"))),
172+
)
173+
}
174+
175+
func newGraphClient() *opslevel.Client {
169176
client := opslevel.NewGQLClient(
170177
opslevel.SetAPIToken(viper.GetString("api-token")),
171178
opslevel.SetURL(viper.GetString("api-url")),

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.24.0
55
toolchain go1.24.2
66

77
require (
8+
github.com/go-resty/resty/v2 v2.16.5
89
github.com/opslevel/opslevel-go/v2025 v2025.6.13
910
github.com/rs/zerolog v1.34.0
1011
github.com/spf13/cobra v1.9.1
@@ -31,7 +32,6 @@ require (
3132
github.com/go-playground/locales v0.14.1 // indirect
3233
github.com/go-playground/universal-translator v0.18.1 // indirect
3334
github.com/go-playground/validator/v10 v10.26.0 // indirect
34-
github.com/go-resty/resty/v2 v2.16.5 // indirect
3535
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
3636
github.com/gogo/protobuf v1.3.2 // indirect
3737
github.com/google/gnostic-models v0.6.9 // indirect

workers/k8s.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ package workers
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
7+
"strings"
68
"sync"
79
"time"
810

11+
"github.com/go-resty/resty/v2"
12+
913
"github.com/spf13/viper"
1014

1115
"github.com/opslevel/opslevel-go/v2025"
@@ -17,30 +21,39 @@ import (
1721
type K8SWorker struct {
1822
cluster string
1923
integration string
20-
client *opslevel.Client
24+
gqlClient *opslevel.Client
25+
restClient *resty.Client
2126
}
2227

23-
func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, client *opslevel.Client, resync, flush time.Duration) {
28+
func NewK8SWorker(ctx context.Context, wg *sync.WaitGroup, cluster string, integration string, selectors []controller.Selector, gqlClient *opslevel.Client, restClient *resty.Client, resync, flush time.Duration) {
2429
controller.Run(ctx, wg, selectors, resync, flush, &K8SWorker{
25-
client: client,
2630
cluster: cluster,
2731
integration: integration,
32+
gqlClient: gqlClient,
33+
restClient: restClient,
2834
})
2935
}
3036

3137
func (s *K8SWorker) Handle(evt controller.Event) {
3238
kind := evt.ExternalKind()
3339
id := evt.ExternalID(s.cluster)
3440

35-
switch evt.Op {
36-
case controller.OpCreate, controller.OpUpdate:
37-
value, err := s.parse(evt.New)
38-
if err != nil {
39-
log.Error().Err(err).Msgf("failed to convert k8s resource")
41+
if strings.Contains(s.integration, "integrations/custom/webhook") {
42+
switch evt.Op {
43+
case controller.OpCreate, controller.OpUpdate:
44+
s.sendEvent(kind, id, evt.New)
45+
}
46+
} else {
47+
switch evt.Op {
48+
case controller.OpCreate, controller.OpUpdate:
49+
value, err := s.parse(evt.New)
50+
if err != nil {
51+
log.Error().Err(err).Msgf("failed to convert k8s resource")
52+
}
53+
s.sendUpsert(kind, id, value)
54+
case controller.OpDelete:
55+
s.sendDelete(kind, id)
4056
}
41-
s.sendUpsert(kind, id, value)
42-
case controller.OpDelete:
43-
s.sendDelete(kind, id)
4457
}
4558
}
4659

@@ -61,6 +74,26 @@ func (s *K8SWorker) parse(item *unstructured.Unstructured) (opslevel.JSON, error
6174
return data, nil
6275
}
6376

77+
func (s *K8SWorker) sendEvent(kind string, id string, value *unstructured.Unstructured) {
78+
kind = strings.Replace(kind, "/", "_", -1)
79+
if viper.GetBool("dry-run") {
80+
log.Info().Msgf("[DRYRUN] POST %s | %s", kind, id)
81+
log.Debug().Msgf("\t%#v", value)
82+
} else {
83+
url := fmt.Sprintf("%s?external_kind=%s", s.integration, kind)
84+
resp, err := s.restClient.R().SetBody(value).Post(url)
85+
if err != nil {
86+
log.Error().Err(err).Msgf("error during post")
87+
return
88+
}
89+
if resp.StatusCode() > 299 {
90+
log.Error().Msgf("%v", resp)
91+
return
92+
}
93+
log.Info().Msgf("POST %s | %s", kind, id)
94+
}
95+
}
96+
6497
func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) {
6598
var m struct {
6699
Payload struct {
@@ -78,7 +111,7 @@ func (s *K8SWorker) sendUpsert(kind string, id string, value opslevel.JSON) {
78111
log.Debug().Msgf("\t%#v", value)
79112
} else {
80113
log.Info().Msgf("UPSERT %s | %s", kind, id)
81-
err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert"))
114+
err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectUpsert"))
82115
if err != nil {
83116
log.Error().Err(err).Msgf("error during upsert mutate")
84117
}
@@ -100,7 +133,7 @@ func (s *K8SWorker) sendDelete(kind string, id string) {
100133
log.Info().Msgf("[DRYRUN] DELETE %s | %s ", kind, id)
101134
} else {
102135
log.Info().Msgf("DELETE %s | %s ", kind, id)
103-
err := s.client.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete"))
136+
err := s.gqlClient.Mutate(&m, v, opslevel.WithName("IntegrationSourceObjectDelete"))
104137
if err != nil {
105138
log.Error().Err(err).Msgf("error during delete mutate")
106139
}

0 commit comments

Comments
 (0)