Skip to content

Commit dbb8aeb

Browse files
authored
feat(connection): add HAR collector support to connections
* feat(context): add HAR collector context support to AWS connection Allow HAR collector to be passed through context, falling back to explicit options if not set. This enables middleware collection of HTTP traffic at the context level. * feat(connection): add HAR collector support to remaining connections Add HAR collector context support to HTTPConnection, Prometheus, OpenSearch, GKE, EKS, and CNRM connections. Also add context fallback to GCS (matching AWS pattern). All connections that make HTTP requests now support HAR capture via ClientOption or context-level collector.
1 parent 57a9814 commit dbb8aeb

12 files changed

Lines changed: 370 additions & 32 deletions

File tree

connection/aws.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,12 @@ func (t *AWSConnection) Client(ctx context.Context, opts ...types.ClientOption)
145145
TLSClientConfig: &tls.Config{InsecureSkipVerify: t.SkipTLSVerify},
146146
}
147147

148-
if o.HARCollector != nil {
149-
tr = o.HARCollector.Middleware()(tr)
148+
harCollector := o.HARCollector
149+
if harCollector == nil {
150+
harCollector = ctx.HARCollector()
151+
}
152+
if harCollector != nil {
153+
tr = harCollector.Middleware()(tr)
150154
}
151155

152156
if ctx.IsTrace() {

connection/cnrm.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/flanksource/duty/context"
88
dutyKube "github.com/flanksource/duty/kubernetes"
9+
"github.com/flanksource/duty/types"
910
container "google.golang.org/api/container/v1"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -25,8 +26,8 @@ func (t *CNRMConnection) Populate(ctx ConnectionContext) error {
2526
return t.GKE.Populate(ctx)
2627
}
2728

28-
func (t *CNRMConnection) KubernetesClient(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) {
29-
cnrmCluster, restConfig, err := t.GKE.KubernetesClient(ctx, freshToken)
29+
func (t *CNRMConnection) KubernetesClient(ctx context.Context, freshToken bool, opts ...types.ClientOption) (kubernetes.Interface, *rest.Config, error) {
30+
cnrmCluster, restConfig, err := t.GKE.KubernetesClient(ctx, freshToken, opts...)
3031
if err != nil {
3132
return nil, nil, fmt.Errorf("failed to create Kubernetes client for GKE: %w", err)
3233
}

connection/eks.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
1212
"github.com/aws/aws-sdk-go-v2/service/eks"
1313
"github.com/flanksource/duty/context"
14+
"github.com/flanksource/duty/types"
1415
"k8s.io/client-go/kubernetes"
1516
"k8s.io/client-go/rest"
1617
)
@@ -32,8 +33,8 @@ func (t *EKSConnection) Populate(ctx ConnectionContext) error {
3233
return t.AWSConnection.Populate(ctx)
3334
}
3435

35-
func (t *EKSConnection) KubernetesClient(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) {
36-
awsConfig, err := t.AWSConnection.Client(ctx)
36+
func (t *EKSConnection) KubernetesClient(ctx context.Context, freshToken bool, opts ...types.ClientOption) (kubernetes.Interface, *rest.Config, error) {
37+
awsConfig, err := t.AWSConnection.Client(ctx, opts...)
3738
if err != nil {
3839
return nil, nil, err
3940
}

connection/gcs.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,16 @@ func (g *GCSConnection) Client(ctx context.Context, opts ...types.ClientOption)
3737
clientOpts = append(clientOpts, option.WithEndpoint(g.Endpoint))
3838
}
3939

40-
if o.HARCollector != nil {
40+
harCollector := o.HARCollector
41+
if harCollector == nil {
42+
harCollector = ctx.HARCollector()
43+
}
44+
if harCollector != nil {
4145
base := http.RoundTripper(http.DefaultTransport)
4246
if g.SkipTLSVerify {
4347
base = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
4448
}
45-
tr := o.HARCollector.Middleware()(base)
49+
tr := harCollector.Middleware()(base)
4650
clientOpts = append(clientOpts, option.WithHTTPClient(&http.Client{Transport: tr}))
4751
} else if g.SkipTLSVerify {
4852
insecureHTTPClient := &http.Client{

connection/gke.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88

99
"github.com/flanksource/duty/context"
10+
"github.com/flanksource/duty/types"
1011
"golang.org/x/oauth2/google"
1112
container "google.golang.org/api/container/v1"
1213
"google.golang.org/api/option"
@@ -35,23 +36,33 @@ func (t *GKEConnection) Validate() *GKEConnection {
3536
return t
3637
}
3738

38-
func (t *GKEConnection) Client(ctx context.Context) (*container.Service, error) {
39+
func (t *GKEConnection) Client(ctx context.Context, opts ...types.ClientOption) (*container.Service, error) {
3940
t = t.Validate()
41+
o := types.NewClientOptions(opts...)
4042

4143
var clientOpts []option.ClientOption
4244

4345
if t.Endpoint != "" {
4446
clientOpts = append(clientOpts, option.WithEndpoint(t.Endpoint))
4547
}
4648

47-
if t.SkipTLSVerify {
48-
insecureHTTPClient := &http.Client{
49+
harCollector := o.HARCollector
50+
if harCollector == nil {
51+
harCollector = ctx.HARCollector()
52+
}
53+
if harCollector != nil {
54+
base := http.RoundTripper(http.DefaultTransport)
55+
if t.SkipTLSVerify {
56+
base = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
57+
}
58+
tr := harCollector.Middleware()(base)
59+
clientOpts = append(clientOpts, option.WithHTTPClient(&http.Client{Transport: tr}))
60+
} else if t.SkipTLSVerify {
61+
clientOpts = append(clientOpts, option.WithHTTPClient(&http.Client{
4962
Transport: &http.Transport{
5063
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
5164
},
52-
}
53-
54-
clientOpts = append(clientOpts, option.WithHTTPClient(insecureHTTPClient))
65+
}))
5566
}
5667

5768
if t.Credentials != nil && !t.Credentials.IsEmpty() {
@@ -76,8 +87,8 @@ func (t *GKEConnection) Client(ctx context.Context) (*container.Service, error)
7687
return svc, nil
7788
}
7889

79-
func (t *GKEConnection) KubernetesClient(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) {
80-
containerService, err := t.Client(ctx)
90+
func (t *GKEConnection) KubernetesClient(ctx context.Context, freshToken bool, opts ...types.ClientOption) (kubernetes.Interface, *rest.Config, error) {
91+
containerService, err := t.Client(ctx, opts...)
8192
if err != nil {
8293
return nil, nil, err
8394
}

connection/http.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,15 @@ func (h *HTTPConnection) Hydrate(ctx ConnectionContext, namespace string) (*HTTP
348348
return h, nil
349349
}
350350

351-
func (h HTTPConnection) Transport() netHTTP.RoundTripper {
351+
func (h HTTPConnection) Transport(opts ...types.ClientOption) netHTTP.RoundTripper {
352+
o := types.NewClientOptions(opts...)
353+
var base netHTTP.RoundTripper = &netHTTP.Transport{}
354+
if o.HARCollector != nil {
355+
base = o.HARCollector.Middleware()(base)
356+
}
352357
rt := &httpConnectionRoundTripper{
353358
HTTPConnection: h,
354-
Base: &netHTTP.Transport{},
359+
Base: base,
355360
}
356361
return rt
357362
}
@@ -400,8 +405,12 @@ func (rt *httpConnectionRoundTripper) RoundTrip(req *netHTTP.Request) (*netHTTP.
400405
}
401406

402407
// CreateHTTPClient requires a hydrated connection
403-
func CreateHTTPClient(ctx ConnectionContext, conn HTTPConnection) (*http.Client, error) {
408+
func CreateHTTPClient(ctx ConnectionContext, conn HTTPConnection, opts ...types.ClientOption) (*http.Client, error) {
409+
o := types.NewClientOptions(opts...)
404410
client := http.NewClient()
411+
if o.HARCollector != nil {
412+
client.HARCollector(o.HARCollector)
413+
}
405414
if !conn.HTTPBasicAuth.IsEmpty() {
406415
client.Auth(conn.GetUsername(), conn.GetPassword())
407416
client.Digest(conn.Digest)

connection/kubernetes.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,23 +176,23 @@ func (t KubernetesConnection) Populate(ctx context.Context, freshToken bool, opt
176176
return nil, nil, err
177177
}
178178

179-
return t.GKE.KubernetesClient(ctx, freshToken)
179+
return t.GKE.KubernetesClient(ctx, freshToken, opts...)
180180
}
181181

182182
if t.EKS != nil {
183183
if err := t.EKS.Populate(ctx); err != nil {
184184
return nil, nil, err
185185
}
186186

187-
return t.EKS.KubernetesClient(ctx, freshToken)
187+
return t.EKS.KubernetesClient(ctx, freshToken, opts...)
188188
}
189189

190190
if t.CNRM != nil {
191191
if err := t.CNRM.Populate(ctx); err != nil {
192192
return nil, nil, err
193193
}
194194

195-
return t.CNRM.KubernetesClient(ctx, freshToken)
195+
return t.CNRM.KubernetesClient(ctx, freshToken, opts...)
196196
}
197197

198198
return nil, nil, nil

connection/opensearch.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ func (c *OpensearchConnection) Hydrate(ctx ConnectionContext) error {
118118

119119
// Client creates and returns an OpenSearch client.
120120
// NOTE: Must be run on a hydrated OpensearchConnection.
121-
func (c *OpensearchConnection) Client() (*opensearch.Client, error) {
121+
func (c *OpensearchConnection) Client(ctx context.Context, opts ...types.ClientOption) (*opensearch.Client, error) {
122+
o := types.NewClientOptions(opts...)
122123
if len(c.URLs) == 0 {
123124
return nil, fmt.Errorf("opensearch connection urls cannot be empty")
124125
}
@@ -132,14 +133,28 @@ func (c *OpensearchConnection) Client() (*opensearch.Client, error) {
132133
cfg.Password = c.GetPassword()
133134
}
134135

136+
var tr http.RoundTripper
135137
if c.InsecureSkipVerify {
136-
cfg.Transport = &http.Transport{
137-
TLSClientConfig: &tls.Config{
138-
InsecureSkipVerify: true,
139-
},
138+
tr = &http.Transport{
139+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
140140
}
141141
}
142142

143+
harCollector := o.HARCollector
144+
if harCollector == nil {
145+
harCollector = ctx.HARCollector()
146+
}
147+
if harCollector != nil {
148+
if tr == nil {
149+
tr = http.DefaultTransport
150+
}
151+
tr = harCollector.Middleware()(tr)
152+
}
153+
154+
if tr != nil {
155+
cfg.Transport = tr
156+
}
157+
143158
client, err := opensearch.NewClient(cfg)
144159
if err != nil {
145160
return nil, fmt.Errorf("error creating opensearch client: %w", err)
@@ -148,7 +163,7 @@ func (c *OpensearchConnection) Client() (*opensearch.Client, error) {
148163
return client, nil
149164
}
150165

151-
func NewOpenSearchClient(ctx context.Context, connection models.Connection) (*opensearch.Client, error) {
166+
func NewOpenSearchClient(ctx context.Context, connection models.Connection, opts ...types.ClientOption) (*opensearch.Client, error) {
152167
var conn OpensearchConnection
153168
if err := conn.FromModel(connection); err != nil {
154169
return nil, fmt.Errorf("error creating opensearch connection from model: %w", err)
@@ -158,5 +173,5 @@ func NewOpenSearchClient(ctx context.Context, connection models.Connection) (*op
158173
return nil, fmt.Errorf("error hydrating opensearch connection: %w", err)
159174
}
160175

161-
return conn.Client()
176+
return conn.Client(ctx, opts...)
162177
}

connection/prometheus.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/flanksource/duty/context"
1010
"github.com/flanksource/duty/models"
11+
"github.com/flanksource/duty/types"
1112
)
1213

1314
// +kubebuilder:object:generate=true
@@ -34,10 +35,10 @@ func (p *PrometheusConnection) Populate(ctx ConnectionContext) error {
3435
return nil
3536
}
3637

37-
func (p *PrometheusConnection) NewClient(ctx context.Context) (v1.API, error) {
38+
func (p *PrometheusConnection) NewClient(ctx context.Context, opts ...types.ClientOption) (v1.API, error) {
3839
cfg := api.Config{
3940
Address: p.HTTPConnection.URL,
40-
RoundTripper: p.HTTPConnection.Transport(),
41+
RoundTripper: p.HTTPConnection.Transport(opts...),
4142
}
4243

4344
client, err := api.NewClient(cfg)

context/context.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
commons "github.com/flanksource/commons/context"
11+
"github.com/flanksource/commons/har"
1112
"github.com/flanksource/commons/logger"
1213
"github.com/jackc/pgx/v5/pgxpool"
1314
"github.com/labstack/echo/v4"
@@ -313,6 +314,17 @@ func (k Context) WithNamespace(namespace string) Context {
313314
return k.WithValue("namespace", namespace)
314315
}
315316

317+
func (k Context) WithHARCollector(collector *har.Collector) Context {
318+
return k.WithValue("har-collector", collector)
319+
}
320+
321+
func (k Context) HARCollector() *har.Collector {
322+
if v, ok := k.Value("har-collector").(*har.Collector); ok {
323+
return v
324+
}
325+
return nil
326+
}
327+
316328
func (k Context) WithDB(db *gorm.DB, pool *pgxpool.Pool) Context {
317329
return k.WithValue("db", db).WithValue("pgxpool", pool)
318330
}

0 commit comments

Comments
 (0)