diff --git a/go.md b/go.md index 1e9f700af..6d1fd2a55 100644 --- a/go.md +++ b/go.md @@ -9,6 +9,7 @@ flowchart LR chainlink-common --> chainlink-common/pkg/values chainlink-common --> chainlink-protos/billing/go chainlink-common --> chainlink-protos/cre/go + chainlink-common --> chainlink-protos/linking-service/go chainlink-common --> chainlink-protos/storage-service chainlink-common --> freeport chainlink-common --> grpc-proxy @@ -24,6 +25,8 @@ flowchart LR click chainlink-protos/billing/go href "https://github.com/smartcontractkit/chainlink-protos" chainlink-protos/cre/go click chainlink-protos/cre/go href "https://github.com/smartcontractkit/chainlink-protos" + chainlink-protos/linking-service/go + click chainlink-protos/linking-service/go href "https://github.com/smartcontractkit/chainlink-protos" chainlink-protos/storage-service click chainlink-protos/storage-service href "https://github.com/smartcontractkit/chainlink-protos" chainlink-protos/workflows/go @@ -46,6 +49,7 @@ flowchart LR subgraph chainlink-protos-repo[chainlink-protos] chainlink-protos/billing/go chainlink-protos/cre/go + chainlink-protos/linking-service/go chainlink-protos/storage-service chainlink-protos/workflows/go end diff --git a/go.mod b/go.mod index 9fcffb365..b99f76110 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,9 @@ require ( github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 + github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 github.com/smartcontractkit/libocr v0.0.0-20250707144819-babe0ec4e358 @@ -137,7 +139,6 @@ require ( github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/sanity-io/litter v1.5.5 // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/go.sum b/go.sum index 81d1a79c1..26c037d06 100644 --- a/go.sum +++ b/go.sum @@ -332,6 +332,8 @@ github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-87 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= +github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= +github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+CMJ26elVw/cAJqqhBQ3Xa/mBYWK0/rQ5MuI= github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA= github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250822025801-598d3d86f873 h1:8/qwOmcdSFa8A6ecnj3eH/mwNx7Ybw2tjQFydDymtOc= diff --git a/pkg/services/orgresolver/linking.go b/pkg/services/orgresolver/linking.go new file mode 100644 index 000000000..3b05d52d9 --- /dev/null +++ b/pkg/services/orgresolver/linking.go @@ -0,0 +1,116 @@ +package orgresolver + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + log "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1" +) + +// OrgResolver interface defines methods for resolving organization IDs from workflow owners +type OrgResolver interface { + services.Service + Get(ctx context.Context, owner string) (string, error) +} + +type Config struct { + URL string + TLSEnabled bool + WorkflowRegistryAddress string + WorkflowRegistryChainSelector uint64 +} + +// orgResolver makes direct calls to the linking service to resolve organization IDs from workflow owners. +// This simplified implementation makes a network call for each Get() request. +type orgResolver struct { + workflowRegistryAddress string + workflowRegistryChainSelector uint64 + + client linkingclient.LinkingServiceClient + conn *grpc.ClientConn // nil if client was injected + logger log.SugaredLogger +} + +// NewOrgResolver creates a new org resolver with the specified configuration +func NewOrgResolver(cfg Config, logger log.Logger) (*orgResolver, error) { + return NewOrgResolverWithClient(cfg, nil, logger) +} + +// NewOrgResolverWithClient creates a new org resolver with an optional injected client (for testing) +func NewOrgResolverWithClient(cfg Config, client linkingclient.LinkingServiceClient, logger log.Logger) (*orgResolver, error) { + resolver := &orgResolver{ + workflowRegistryAddress: cfg.WorkflowRegistryAddress, + workflowRegistryChainSelector: cfg.WorkflowRegistryChainSelector, + logger: log.Sugared(logger).Named("OrgResolver"), + } + + if client != nil { + resolver.client = client + } else { + if cfg.URL == "" { + return nil, errors.New("URL is required when client is not provided") + } + + var opts []grpc.DialOption + if cfg.TLSEnabled { + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(nil))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + conn, err := grpc.NewClient(cfg.URL, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create linking service client at %s: %w", cfg.URL, err) + } + + resolver.conn = conn + resolver.client = linkingclient.NewLinkingServiceClient(conn) + } + + return resolver, nil +} + +func (o *orgResolver) Get(ctx context.Context, owner string) (string, error) { + req := &linkingclient.GetOrganizationFromWorkflowOwnerRequest{ + WorkflowOwner: owner, + WorkflowRegistryAddress: o.workflowRegistryAddress, + ChainSelector: o.workflowRegistryChainSelector, + } + + resp, err := o.client.GetOrganizationFromWorkflowOwner(ctx, req) + if err != nil { + return "", fmt.Errorf("failed to fetch organization from workflow owner: %w", err) + } + + return resp.OrganizationId, nil +} + +func (o *orgResolver) Start(_ context.Context) error { + return nil +} + +func (o *orgResolver) HealthReport() map[string]error { + return map[string]error{o.Name(): nil} +} + +func (o *orgResolver) Close() error { + if o.conn != nil { + return o.conn.Close() + } + return nil +} + +func (o *orgResolver) Name() string { + return o.logger.Name() +} + +func (o *orgResolver) Ready() error { + return nil +} diff --git a/pkg/services/orgresolver/linking_test.go b/pkg/services/orgresolver/linking_test.go new file mode 100644 index 000000000..8b0a9836f --- /dev/null +++ b/pkg/services/orgresolver/linking_test.go @@ -0,0 +1,115 @@ +package orgresolver + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1" +) + +// mockLinkingClient implements the LinkingServiceClient interface for testing +type mockLinkingClient struct{} + +func (m *mockLinkingClient) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest, opts ...grpc.CallOption) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) { + orgID := "org-" + req.WorkflowOwner + return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{ + OrganizationId: orgID, + }, nil +} + +// mockLinkingServer implements the LinkingServiceServer interface for testing +type mockLinkingServer struct { + linkingclient.UnimplementedLinkingServiceServer +} + +func (s *mockLinkingServer) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) { + orgID := "org-" + req.WorkflowOwner + return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{ + OrganizationId: orgID, + }, nil +} + +func TestOrgResolver_Get(t *testing.T) { + ctx := context.Background() + client := &mockLinkingClient{} + + cfg := Config{ + URL: "test-url", + TLSEnabled: false, + WorkflowRegistryAddress: "0x1234567890abcdef", + WorkflowRegistryChainSelector: 1, + } + + resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t)) + require.NoError(t, err) + + workflowOwner := "0xabcdef1234567890" + + orgID, err := resolver.Get(ctx, workflowOwner) + require.NoError(t, err) + require.Equal(t, "org-"+workflowOwner, orgID) +} + +func TestOrgResolver_NewOrgResolver_RequiresClientOrURL(t *testing.T) { + cfg := Config{ + URL: "", // Empty URL should cause error + TLSEnabled: false, + WorkflowRegistryAddress: "0x1234567890abcdef", + WorkflowRegistryChainSelector: 1, + } + + _, err := NewOrgResolverWithClient(cfg, nil, logger.Test(t)) + require.Error(t, err) + require.Contains(t, err.Error(), "URL is required when client is not provided") +} + +func TestOrgResolver_NewOrgResolver_WithMockServer(t *testing.T) { + // Use in-memory connection for faster testing + lis := bufconn.Listen(1024 * 1024) + server := grpc.NewServer() + linkingclient.RegisterLinkingServiceServer(server, &mockLinkingServer{}) + + go func() { + _ = server.Serve(lis) + }() + t.Cleanup(func() { server.Stop() }) + + // Create gRPC client connection using bufconn + ctx := context.Background() + conn, err := grpc.NewClient("passthrough:///bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := linkingclient.NewLinkingServiceClient(conn) + + // Create OrgResolver using the client (simulating what NewOrgResolver would do) + cfg := Config{ + URL: "bufnet", // Not used since client is injected + TLSEnabled: false, + WorkflowRegistryAddress: "0x1234567890abcdef", + WorkflowRegistryChainSelector: 1, + } + + resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t)) + require.NoError(t, err) + + workflowOwner := "0xabcdef1234567890" + + orgID, err := resolver.Get(ctx, workflowOwner) + require.NoError(t, err) + require.Equal(t, "org-"+workflowOwner, orgID) + + err = resolver.Close() + require.NoError(t, err) +} diff --git a/pkg/workflows/events/trigger_events.go b/pkg/workflows/events/trigger_events.go new file mode 100644 index 000000000..6e02ece86 --- /dev/null +++ b/pkg/workflows/events/trigger_events.go @@ -0,0 +1,80 @@ +package events + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + workflowsevents "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" +) + +// Label keys for trigger events +const ( + KeyTriggerID = "trigger_id" + KeyWorkflowID = "workflow_id" + KeyWorkflowOwner = "workflow_owner" + KeyWorkflowName = "workflow_name" + KeyWorkflowExecutionID = "workflow_execution_id" + KeyDonID = "don_id" + KeyDonVersion = "don_version" + KeyOrganizationID = "organization_id" +) + +// EmitTriggerExecutionStarted emits a TriggerExecutionStarted event using the provided labeler +func EmitTriggerExecutionStarted(ctx context.Context, labeler custmsg.MessageEmitter) error { + labels := labeler.Labels() + + // Extract required fields + triggerID, ok := labels[KeyTriggerID] + if !ok { + return fmt.Errorf("missing required field: %s", KeyTriggerID) + } + + workflowID, ok := labels[KeyWorkflowID] + if !ok { + return fmt.Errorf("missing required field: %s", KeyWorkflowID) + } + + workflowExecutionID, ok := labels[KeyWorkflowExecutionID] + if !ok { + return fmt.Errorf("missing required field: %s", KeyWorkflowExecutionID) + } + + event := &workflowsevents.TriggerExecutionStarted{ + TriggerID: triggerID, + WorkflowExecutionID: workflowExecutionID, + Workflow: &workflowsevents.WorkflowKey{ + WorkflowID: workflowID, + WorkflowOwner: labels[KeyWorkflowOwner], + WorkflowName: labels[KeyWorkflowName], + OrganizationID: labels[KeyOrganizationID], + }, + Timestamp: time.Now().Format(time.RFC3339), + } + + // Optional; downstream consumers could infer from csa public key, + // as of now Beholder/ChiP autohydrates csa public key + if donIDStr, exists := labels[KeyDonID]; exists { + if donID, err := strconv.ParseInt(donIDStr, 10, 32); err == nil { + event.CreInfo = &workflowsevents.CreInfo{ + DonID: int32(donID), + DonVersion: labels[KeyDonVersion], + } + } + } + + b, err := proto.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal TriggerExecutionStarted event: %w", err) + } + + return beholder.GetEmitter().Emit(ctx, b, + "beholder_data_schema", "workflows.v2.trigger_execution_started", // required + "beholder_domain", "platform", // required + "beholder_entity", "workflows.v2.TriggerExecutionStarted") // required +} diff --git a/pkg/workflows/events/trigger_events_test.go b/pkg/workflows/events/trigger_events_test.go new file mode 100644 index 000000000..a3c671be4 --- /dev/null +++ b/pkg/workflows/events/trigger_events_test.go @@ -0,0 +1,111 @@ +package events + +import ( + "context" + "regexp" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder/beholdertest" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + workflowsevents "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" +) + +func TestEmitTriggerExecutionStarted(t *testing.T) { + ctx := context.Background() + beholderObserver := beholdertest.NewObserver(t) + + // Test data + expectedTriggerID := "test-trigger" + expectedWorkflowID := "test-workflow" + expectedWorkflowExecutionID := "test-execution" + expectedWorkflowOwner := "test-owner" + expectedWorkflowName := "test-workflow-name" + expectedOrganizationID := "test-org" + expectedDonID := "1" + expectedDonVersion := "v1.0.0" + + labeler := custmsg.NewLabeler().With( + KeyTriggerID, expectedTriggerID, + KeyWorkflowID, expectedWorkflowID, + KeyWorkflowExecutionID, expectedWorkflowExecutionID, + KeyWorkflowOwner, expectedWorkflowOwner, + KeyWorkflowName, expectedWorkflowName, + KeyOrganizationID, expectedOrganizationID, + KeyDonID, expectedDonID, + KeyDonVersion, expectedDonVersion, + ) + + err := EmitTriggerExecutionStarted(ctx, labeler) + require.NoError(t, err) + + // Verify the message was emitted with correct beholder attributes + msgs := beholderObserver.Messages(t, "beholder_entity", "workflows.v2.TriggerExecutionStarted") + require.Len(t, msgs, 1, "Expected exactly one TriggerExecutionStarted message") + + msg := msgs[0] + + // Verify beholder attributes + assert.Equal(t, "workflows.v2.trigger_execution_started", msg.Attrs["beholder_data_schema"]) + assert.Equal(t, "platform", msg.Attrs["beholder_domain"]) + assert.Equal(t, "workflows.v2.TriggerExecutionStarted", msg.Attrs["beholder_entity"]) + + // Unmarshal and verify the protobuf message content + var event workflowsevents.TriggerExecutionStarted + require.NoError(t, proto.Unmarshal(msg.Body, &event)) + + // Verify required fields + assert.Equal(t, expectedTriggerID, event.TriggerID) + assert.Equal(t, expectedWorkflowExecutionID, event.WorkflowExecutionID) + + // Verify WorkflowKey fields + require.NotNil(t, event.Workflow) + assert.Equal(t, expectedWorkflowID, event.Workflow.WorkflowID) + assert.Equal(t, expectedWorkflowOwner, event.Workflow.WorkflowOwner) + assert.Equal(t, expectedWorkflowName, event.Workflow.WorkflowName) + assert.Equal(t, expectedOrganizationID, event.Workflow.OrganizationID) + + // Verify CreInfo fields (optional) + require.NotNil(t, event.CreInfo) + assert.Equal(t, int32(1), event.CreInfo.DonID) + assert.Equal(t, expectedDonVersion, event.CreInfo.DonVersion) + + // Verify timestamp format (RFC3339) + timeMatcher := regexp.MustCompile(`[0-9\-]{10}T[0-9:]{8}[Z\-\+][0-9:]*`) + assert.True(t, timeMatcher.MatchString(event.Timestamp), "Timestamp should be in RFC3339 format: %s", event.Timestamp) +} + +func TestEmitTriggerExecutionStarted_MissingRequiredFields(t *testing.T) { + ctx := context.Background() + + // Test missing trigger ID + labeler := custmsg.NewLabeler().With( + KeyWorkflowID, "test-workflow", + KeyWorkflowExecutionID, "test-execution", + ) + + err := EmitTriggerExecutionStarted(ctx, labeler) + require.Error(t, err) + require.Contains(t, err.Error(), "missing required field: trigger_id") + + labeler = custmsg.NewLabeler().With( + KeyTriggerID, "test-trigger", + KeyWorkflowExecutionID, "test-execution", + ) + + err = EmitTriggerExecutionStarted(ctx, labeler) + require.Error(t, err) + require.Contains(t, err.Error(), "missing required field: workflow_id") + + labeler = custmsg.NewLabeler().With( + KeyTriggerID, "test-trigger", + KeyWorkflowID, "test-workflow", + ) + + err = EmitTriggerExecutionStarted(ctx, labeler) + require.Error(t, err) + require.Contains(t, err.Error(), "missing required field: workflow_execution_id") +}