Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ flowchart LR
chainlink-common --> chainlink-common/pkg/values
chainlink-common --> chainlink-protos/billing/go
chainlink-common --> chainlink-protos/cre/go
chainlink-common --> chainlink-protos/linking-service/go
chainlink-common --> chainlink-protos/storage-service
chainlink-common --> freeport
chainlink-common --> grpc-proxy
Expand All @@ -24,6 +25,8 @@ flowchart LR
click chainlink-protos/billing/go href "https://github.com/smartcontractkit/chainlink-protos"
chainlink-protos/cre/go
click chainlink-protos/cre/go href "https://github.com/smartcontractkit/chainlink-protos"
chainlink-protos/linking-service/go
click chainlink-protos/linking-service/go href "https://github.com/smartcontractkit/chainlink-protos"
chainlink-protos/storage-service
click chainlink-protos/storage-service href "https://github.com/smartcontractkit/chainlink-protos"
chainlink-protos/workflows/go
Expand All @@ -46,6 +49,7 @@ flowchart LR
subgraph chainlink-protos-repo[chainlink-protos]
chainlink-protos/billing/go
chainlink-protos/cre/go
chainlink-protos/linking-service/go
chainlink-protos/storage-service
chainlink-protos/workflows/go
end
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ require (
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7
github.com/smartcontractkit/libocr v0.0.0-20250707144819-babe0ec4e358
Expand Down Expand Up @@ -137,7 +139,6 @@ require (
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-87
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8=
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY=
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+CMJ26elVw/cAJqqhBQ3Xa/mBYWK0/rQ5MuI=
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA=
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 h1:8/qwOmcdSFa8A6ecnj3eH/mwNx7Ybw2tjQFydDymtOc=
Expand Down
116 changes: 116 additions & 0 deletions pkg/services/orgresolver/linking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package orgresolver

import (
"context"
"errors"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

log "github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1"
)

// OrgResolver interface defines methods for resolving organization IDs from workflow owners
type OrgResolver interface {
services.Service
Get(ctx context.Context, owner string) (string, error)
}

type Config struct {
URL string
TLSEnabled bool
WorkflowRegistryAddress string
Comment thread
jmank88 marked this conversation as resolved.
WorkflowRegistryChainSelector uint64
}

// orgResolver makes direct calls to the linking service to resolve organization IDs from workflow owners.
// This simplified implementation makes a network call for each Get() request.
type orgResolver struct {
workflowRegistryAddress string
workflowRegistryChainSelector uint64

client linkingclient.LinkingServiceClient
conn *grpc.ClientConn // nil if client was injected
logger log.SugaredLogger
}

// NewOrgResolver creates a new org resolver with the specified configuration
func NewOrgResolver(cfg Config, logger log.Logger) (*orgResolver, error) {
return NewOrgResolverWithClient(cfg, nil, logger)
}

// NewOrgResolverWithClient creates a new org resolver with an optional injected client (for testing)
func NewOrgResolverWithClient(cfg Config, client linkingclient.LinkingServiceClient, logger log.Logger) (*orgResolver, error) {
resolver := &orgResolver{
workflowRegistryAddress: cfg.WorkflowRegistryAddress,
workflowRegistryChainSelector: cfg.WorkflowRegistryChainSelector,
logger: log.Sugared(logger).Named("OrgResolver"),
}

if client != nil {
resolver.client = client
} else {
if cfg.URL == "" {
return nil, errors.New("URL is required when client is not provided")
}

var opts []grpc.DialOption
if cfg.TLSEnabled {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(nil)))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.NewClient(cfg.URL, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create linking service client at %s: %w", cfg.URL, err)
}

resolver.conn = conn
resolver.client = linkingclient.NewLinkingServiceClient(conn)
}

return resolver, nil
}

func (o *orgResolver) Get(ctx context.Context, owner string) (string, error) {
req := &linkingclient.GetOrganizationFromWorkflowOwnerRequest{
WorkflowOwner: owner,
WorkflowRegistryAddress: o.workflowRegistryAddress,
ChainSelector: o.workflowRegistryChainSelector,
}

resp, err := o.client.GetOrganizationFromWorkflowOwner(ctx, req)
if err != nil {
return "", fmt.Errorf("failed to fetch organization from workflow owner: %w", err)
}

return resp.OrganizationId, nil
}

func (o *orgResolver) Start(_ context.Context) error {
return nil
}

func (o *orgResolver) HealthReport() map[string]error {
return map[string]error{o.Name(): nil}
}

func (o *orgResolver) Close() error {
if o.conn != nil {
return o.conn.Close()
}
return nil
}

func (o *orgResolver) Name() string {
return o.logger.Name()
}

func (o *orgResolver) Ready() error {
return nil
}
115 changes: 115 additions & 0 deletions pkg/services/orgresolver/linking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package orgresolver

import (
"context"
"net"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1"
)

// mockLinkingClient implements the LinkingServiceClient interface for testing
type mockLinkingClient struct{}

func (m *mockLinkingClient) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest, opts ...grpc.CallOption) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) {
orgID := "org-" + req.WorkflowOwner
return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{
OrganizationId: orgID,
}, nil
}

// mockLinkingServer implements the LinkingServiceServer interface for testing
type mockLinkingServer struct {
linkingclient.UnimplementedLinkingServiceServer
}

func (s *mockLinkingServer) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) {
orgID := "org-" + req.WorkflowOwner
return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{
OrganizationId: orgID,
}, nil
}

func TestOrgResolver_Get(t *testing.T) {
ctx := context.Background()
client := &mockLinkingClient{}

cfg := Config{
URL: "test-url",
TLSEnabled: false,
WorkflowRegistryAddress: "0x1234567890abcdef",
WorkflowRegistryChainSelector: 1,
}

resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t))
require.NoError(t, err)

workflowOwner := "0xabcdef1234567890"

orgID, err := resolver.Get(ctx, workflowOwner)
require.NoError(t, err)
require.Equal(t, "org-"+workflowOwner, orgID)
}

func TestOrgResolver_NewOrgResolver_RequiresClientOrURL(t *testing.T) {
cfg := Config{
URL: "", // Empty URL should cause error
TLSEnabled: false,
WorkflowRegistryAddress: "0x1234567890abcdef",
WorkflowRegistryChainSelector: 1,
}

_, err := NewOrgResolverWithClient(cfg, nil, logger.Test(t))
require.Error(t, err)
require.Contains(t, err.Error(), "URL is required when client is not provided")
}

func TestOrgResolver_NewOrgResolver_WithMockServer(t *testing.T) {
// Use in-memory connection for faster testing
lis := bufconn.Listen(1024 * 1024)
server := grpc.NewServer()
linkingclient.RegisterLinkingServiceServer(server, &mockLinkingServer{})

go func() {
_ = server.Serve(lis)
}()
t.Cleanup(func() { server.Stop() })

// Create gRPC client connection using bufconn
ctx := context.Background()
conn, err := grpc.NewClient("passthrough:///bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()

client := linkingclient.NewLinkingServiceClient(conn)

// Create OrgResolver using the client (simulating what NewOrgResolver would do)
cfg := Config{
URL: "bufnet", // Not used since client is injected
TLSEnabled: false,
WorkflowRegistryAddress: "0x1234567890abcdef",
WorkflowRegistryChainSelector: 1,
}

resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t))
require.NoError(t, err)

workflowOwner := "0xabcdef1234567890"

orgID, err := resolver.Get(ctx, workflowOwner)
require.NoError(t, err)
require.Equal(t, "org-"+workflowOwner, orgID)

err = resolver.Close()
require.NoError(t, err)
}
80 changes: 80 additions & 0 deletions pkg/workflows/events/trigger_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package events

import (
"context"
"fmt"
"strconv"
"time"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
workflowsevents "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
)

// Label keys for trigger events
const (
KeyTriggerID = "trigger_id"
KeyWorkflowID = "workflow_id"
KeyWorkflowOwner = "workflow_owner"
KeyWorkflowName = "workflow_name"
KeyWorkflowExecutionID = "workflow_execution_id"
KeyDonID = "don_id"
KeyDonVersion = "don_version"
KeyOrganizationID = "organization_id"
)

// EmitTriggerExecutionStarted emits a TriggerExecutionStarted event using the provided labeler
func EmitTriggerExecutionStarted(ctx context.Context, labeler custmsg.MessageEmitter) error {
labels := labeler.Labels()

// Extract required fields
triggerID, ok := labels[KeyTriggerID]
if !ok {
return fmt.Errorf("missing required field: %s", KeyTriggerID)
}

workflowID, ok := labels[KeyWorkflowID]
if !ok {
return fmt.Errorf("missing required field: %s", KeyWorkflowID)
}

workflowExecutionID, ok := labels[KeyWorkflowExecutionID]
if !ok {
return fmt.Errorf("missing required field: %s", KeyWorkflowExecutionID)
}

event := &workflowsevents.TriggerExecutionStarted{
TriggerID: triggerID,
WorkflowExecutionID: workflowExecutionID,
Workflow: &workflowsevents.WorkflowKey{
WorkflowID: workflowID,
WorkflowOwner: labels[KeyWorkflowOwner],
WorkflowName: labels[KeyWorkflowName],
OrganizationID: labels[KeyOrganizationID],
},
Timestamp: time.Now().Format(time.RFC3339),
}

// Optional; downstream consumers could infer from csa public key,
// as of now Beholder/ChiP autohydrates csa public key
if donIDStr, exists := labels[KeyDonID]; exists {
if donID, err := strconv.ParseInt(donIDStr, 10, 32); err == nil {
event.CreInfo = &workflowsevents.CreInfo{
DonID: int32(donID),
DonVersion: labels[KeyDonVersion],
}
}
}

b, err := proto.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal TriggerExecutionStarted event: %w", err)
}

return beholder.GetEmitter().Emit(ctx, b,
"beholder_data_schema", "workflows.v2.trigger_execution_started", // required
"beholder_domain", "platform", // required
"beholder_entity", "workflows.v2.TriggerExecutionStarted") // required
}
Loading
Loading