Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chain-selectors v1.0.62
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.3
github.com/smartcontractkit/chainlink-common/pkg/values v0.0.0-20250806152407-159881c7589c
github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/v2/pb v0.0.0-20250806155403-1d805e639a0f
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722175102-6dcdf5122683
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartcontractkit/chain-selectors v1.0.62 h1:KWLEyKQXHxGGHIlUfLrzjYldlB8hBjRZi9GP49WtgYs=
github.com/smartcontractkit/chain-selectors v1.0.62/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1 h1:ca2z5OXgnbBPQRxpwXwBLJsUA1+cAp5ncfW4Ssvd6eY=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1/go.mod h1:NZv/qKYGFRnkjOYBouajnDfFoZ+WDa6H2KNmSf1dnKc=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.3 h1:xPM32++82Jbcsn+ghxyrSjLQgKCdpZ8z/hwtQOnWf/o=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.3/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc=
github.com/smartcontractkit/chainlink-common/pkg/values v0.0.0-20250806152407-159881c7589c h1:QaImySzrLcGzQc4wCF2yDqqb73jA3+9EIqybgx8zT4w=
github.com/smartcontractkit/chainlink-common/pkg/values v0.0.0-20250806152407-159881c7589c/go.mod h1:U1UAbPhy6D7Qz0wHKGPoQO+dpR0hsYjgUz8xwRrmKwI=
github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/v2/pb v0.0.0-20250806155403-1d805e639a0f h1:mnnlyMH5LgJRAzx/4mW2R+sbK1Acpfs3q0EokeAX5RI=
Expand Down
5 changes: 5 additions & 0 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
Expand Down Expand Up @@ -230,6 +231,10 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
headerProvider := NewStaticAuthHeaderProvider(cfg.AuthHeaders)
chipIngressOpts = append(chipIngressOpts, chipingress.WithTokenAuth(headerProvider))
}
if cfg.ChipIngressForceIPV4 {
// Force the use of IPv4 addresses for the chip ingress connection
chipIngressOpts = append(chipIngressOpts, chipingress.WithForceIPV4())
}

chipIngressClient, err := chipingress.NewClient(
cfg.ChipIngressEmitterGRPCEndpoint,
Expand Down
1 change: 1 addition & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
ChipIngressEmitterEnabled bool
ChipIngressEmitterGRPCEndpoint string
ChipIngressInsecureConnection bool // Disables TLS for Chip Ingress Emitter
ChipIngressForceIPV4 bool // Forces IPv4 connections for Chip Ingress Emitter

// OTel Log
LogExportTimeout time.Duration
Expand Down
2 changes: 1 addition & 1 deletion pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[]}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false ChipIngressForceIPV4:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[]}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
48 changes: 48 additions & 0 deletions pkg/chipingress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ type clientConfig struct {
headerProvider HeaderProvider
insecureConnection bool
host string
forceIPV4 bool
}

func newClientConfig(host string) *clientConfig {
cfg := &clientConfig{
headerProvider: nil,
perRPCCredentials: nil,
host: host,
forceIPV4: false,
}
WithInsecureConnection()(cfg) // Default to insecure connection
return cfg
Expand All @@ -72,6 +74,11 @@ func NewClient(address string, opts ...Opt) (Client, error) {
grpcOpts := []grpc.DialOption{
grpc.WithTransportCredentials(cfg.transportCredentials),
}
// Add our custom dialer if IPv4 is forced
if cfg.forceIPV4 {
grpcOpts = append(grpcOpts, grpc.WithContextDialer(forceIPV4Dialer))
}

// Auth
if cfg.perRPCCredentials != nil {
grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(cfg.perRPCCredentials))
Expand Down Expand Up @@ -110,6 +117,13 @@ func (c *client) Close() error {
return c.conn.Close()
}

// WithForceIPV4 forces the client to use IPv4 for connections.
func WithForceIPV4() Opt {
return func(c *clientConfig) {
c.forceIPV4 = true
}
}

// WithBasicAuth sets the basic-auth credentials for the ChipIngress service.
// Default is to require TLS for security.
func WithBasicAuth(user, pass string) Opt {
Expand Down Expand Up @@ -172,6 +186,40 @@ func newHeaderInterceptor(provider HeaderProvider) grpc.UnaryClientInterceptor {
}
}

// forceIPV4Dialer is a custom dialer that resolves a hostname and forces the connection over IPv4.
func forceIPV4Dialer(ctx context.Context, addr string) (net.Conn, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("failed to split host and port: %w", err)
}

// Resolve the hostname to a list of IP addresses.
ips, err := net.LookupIP(host)
if err != nil {
return nil, fmt.Errorf("failed to resolve IP addresses for host %s: %w", host, err)
}

var ipv4Addr string
// Find the first IPv4 address.
for _, ip := range ips {
if ip.To4() != nil {
ipv4Addr = ip.String()
break
}
}

if ipv4Addr == "" {
return nil, fmt.Errorf("no IPv4 address found for host: %s", host)
}

// Create the new address with the resolved IPv4 and original port.
ipv4AddrWithPort := net.JoinHostPort(ipv4Addr, port)

// Dial the new IPv4 address, explicitly using "tcp4".
var d net.Dialer
return d.DialContext(ctx, "tcp4", ipv4AddrWithPort)
}

// NewEvent creates a new CloudEvent with the specified domain, entity, payload, and optional attributes.
func NewEvent(domain, entity string, payload []byte, attributes map[string]any) (CloudEvent, error) {

Expand Down
18 changes: 16 additions & 2 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ const (
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"

envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
envChipIngressForceIPV4 = "CL_CHIP_INGRESS_FORCE_IPV4"
)

// EnvConfig is the configuration between the application and the LOOP executable. The values
Expand Down Expand Up @@ -116,7 +118,9 @@ type EnvConfig struct {
TelemetryEmitterExportMaxBatchSize int
TelemetryEmitterMaxQueueSize int

ChipIngressEndpoint string
ChipIngressEndpoint string
ChipIngressInsecureConnection bool
ChipIngressForceIPV4 bool
}

// AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.
Expand Down Expand Up @@ -184,6 +188,8 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))

add(envChipIngressEndpoint, e.ChipIngressEndpoint)
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
add(envChipIngressForceIPV4, strconv.FormatBool(e.ChipIngressForceIPV4))

return
}
Expand Down Expand Up @@ -343,6 +349,14 @@ func (e *EnvConfig) parse() error {
}
// Optional
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressInsecureConnection, err)
}
e.ChipIngressForceIPV4, err = getBool(envChipIngressForceIPV4)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressForceIPV4, err)
}
}

return nil
Expand Down
12 changes: 9 additions & 3 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func TestEnvConfig_parse(t *testing.T) {
envTelemetryEmitterExportMaxBatchSize: "100",
envTelemetryEmitterMaxQueueSize: "1000",

envChipIngressEndpoint: "http://chip-ingress.example.com",
envChipIngressEndpoint: "chip-ingress.example.com:50051",
envChipIngressInsecureConnection: "true",
envChipIngressForceIPV4: "true",
},
expectError: false,
expectConfig: envCfgFull,
Expand Down Expand Up @@ -172,7 +174,9 @@ var envCfgFull = EnvConfig{
TelemetryEmitterExportMaxBatchSize: 100,
TelemetryEmitterMaxQueueSize: 1000,

ChipIngressEndpoint: "http://chip-ingress.example.com",
ChipIngressEndpoint: "chip-ingress.example.com:50051",
ChipIngressInsecureConnection: true,
ChipIngressForceIPV4: true,
}

func TestEnvConfig_AsCmdEnv(t *testing.T) {
Expand Down Expand Up @@ -221,7 +225,9 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) {
assert.Equal(t, "1000", got[envTelemetryEmitterMaxQueueSize])

// Assert ChipIngress environment variables
assert.Equal(t, "http://chip-ingress.example.com", got[envChipIngressEndpoint])
assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint])
assert.Equal(t, "true", got[envChipIngressInsecureConnection])
assert.Equal(t, "true", got[envChipIngressForceIPV4])
}

func TestGetMap(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *Server) start() error {
EmitterMaxQueueSize: s.EnvConfig.TelemetryEmitterMaxQueueSize,
ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "",
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
ChipIngressInsecureConnection: s.EnvConfig.TelemetryInsecureConnection,
ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection,
}

if tracingConfig.Enabled {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *Server) start() error {
LockTimeout: s.EnvConfig.DatabaseLockTimeout,
MaxOpenConns: s.EnvConfig.DatabaseMaxOpenConns,
MaxIdleConns: s.EnvConfig.DatabaseMaxIdleConns,
EnableTracing: s.EnvConfig.DatabaseTracingEnabled,
EnableTracing: s.EnvConfig.DatabaseTracingEnabled,
}.New(ctx, dbURL, pg.DriverPostgres)
if err != nil {
return fmt.Errorf("error connecting to DataBase: %w", err)
Expand Down
Loading