-
Notifications
You must be signed in to change notification settings - Fork 28
adding shared trigger and org resolver logic to chainlink-common #1581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| package orgresolver | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
|
|
||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/credentials" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
|
|
||
| log "github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/services" | ||
| linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1" | ||
| ) | ||
|
|
||
| // OrgResolver interface defines methods for resolving organization IDs from workflow owners | ||
| type OrgResolver interface { | ||
| services.Service | ||
| Get(ctx context.Context, owner string) (string, error) | ||
| } | ||
|
|
||
| type Config struct { | ||
| URL string | ||
| TLSEnabled bool | ||
| WorkflowRegistryAddress string | ||
| WorkflowRegistryChainSelector uint64 | ||
| } | ||
|
|
||
| // orgResolver makes direct calls to the linking service to resolve organization IDs from workflow owners. | ||
| // This simplified implementation makes a network call for each Get() request. | ||
| type orgResolver struct { | ||
| workflowRegistryAddress string | ||
| workflowRegistryChainSelector uint64 | ||
|
|
||
| client linkingclient.LinkingServiceClient | ||
| conn *grpc.ClientConn // nil if client was injected | ||
| logger log.SugaredLogger | ||
| } | ||
|
|
||
| // NewOrgResolver creates a new org resolver with the specified configuration | ||
| func NewOrgResolver(cfg Config, logger log.Logger) (*orgResolver, error) { | ||
| return NewOrgResolverWithClient(cfg, nil, logger) | ||
| } | ||
|
|
||
| // NewOrgResolverWithClient creates a new org resolver with an optional injected client (for testing) | ||
| func NewOrgResolverWithClient(cfg Config, client linkingclient.LinkingServiceClient, logger log.Logger) (*orgResolver, error) { | ||
| resolver := &orgResolver{ | ||
| workflowRegistryAddress: cfg.WorkflowRegistryAddress, | ||
| workflowRegistryChainSelector: cfg.WorkflowRegistryChainSelector, | ||
| logger: log.Sugared(logger).Named("OrgResolver"), | ||
| } | ||
|
|
||
| if client != nil { | ||
| resolver.client = client | ||
| } else { | ||
| if cfg.URL == "" { | ||
| return nil, errors.New("URL is required when client is not provided") | ||
| } | ||
|
|
||
| var opts []grpc.DialOption | ||
| if cfg.TLSEnabled { | ||
| opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(nil))) | ||
| } else { | ||
| opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
| } | ||
|
|
||
| conn, err := grpc.NewClient(cfg.URL, opts...) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create linking service client at %s: %w", cfg.URL, err) | ||
| } | ||
|
|
||
| resolver.conn = conn | ||
| resolver.client = linkingclient.NewLinkingServiceClient(conn) | ||
| } | ||
|
|
||
| return resolver, nil | ||
| } | ||
|
|
||
| func (o *orgResolver) Get(ctx context.Context, owner string) (string, error) { | ||
| req := &linkingclient.GetOrganizationFromWorkflowOwnerRequest{ | ||
| WorkflowOwner: owner, | ||
| WorkflowRegistryAddress: o.workflowRegistryAddress, | ||
| ChainSelector: o.workflowRegistryChainSelector, | ||
| } | ||
|
|
||
| resp, err := o.client.GetOrganizationFromWorkflowOwner(ctx, req) | ||
| if err != nil { | ||
| return "", fmt.Errorf("failed to fetch organization from workflow owner: %w", err) | ||
| } | ||
|
|
||
| return resp.OrganizationId, nil | ||
| } | ||
|
|
||
| func (o *orgResolver) Start(_ context.Context) error { | ||
| return nil | ||
| } | ||
|
|
||
| func (o *orgResolver) HealthReport() map[string]error { | ||
| return map[string]error{o.Name(): nil} | ||
| } | ||
|
|
||
| func (o *orgResolver) Close() error { | ||
| if o.conn != nil { | ||
| return o.conn.Close() | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (o *orgResolver) Name() string { | ||
| return o.logger.Name() | ||
| } | ||
|
|
||
| func (o *orgResolver) Ready() error { | ||
| return nil | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| package orgresolver | ||
|
|
||
| import ( | ||
| "context" | ||
| "net" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
| "google.golang.org/grpc/test/bufconn" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
| linkingclient "github.com/smartcontractkit/chainlink-protos/linking-service/go/v1" | ||
| ) | ||
|
|
||
| // mockLinkingClient implements the LinkingServiceClient interface for testing | ||
| type mockLinkingClient struct{} | ||
|
|
||
| func (m *mockLinkingClient) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest, opts ...grpc.CallOption) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) { | ||
| orgID := "org-" + req.WorkflowOwner | ||
| return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{ | ||
| OrganizationId: orgID, | ||
| }, nil | ||
| } | ||
|
|
||
| // mockLinkingServer implements the LinkingServiceServer interface for testing | ||
| type mockLinkingServer struct { | ||
| linkingclient.UnimplementedLinkingServiceServer | ||
| } | ||
|
|
||
| func (s *mockLinkingServer) GetOrganizationFromWorkflowOwner(ctx context.Context, req *linkingclient.GetOrganizationFromWorkflowOwnerRequest) (*linkingclient.GetOrganizationFromWorkflowOwnerResponse, error) { | ||
| orgID := "org-" + req.WorkflowOwner | ||
| return &linkingclient.GetOrganizationFromWorkflowOwnerResponse{ | ||
| OrganizationId: orgID, | ||
| }, nil | ||
| } | ||
|
|
||
| func TestOrgResolver_Get(t *testing.T) { | ||
| ctx := context.Background() | ||
| client := &mockLinkingClient{} | ||
|
|
||
| cfg := Config{ | ||
| URL: "test-url", | ||
| TLSEnabled: false, | ||
| WorkflowRegistryAddress: "0x1234567890abcdef", | ||
| WorkflowRegistryChainSelector: 1, | ||
| } | ||
|
|
||
| resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t)) | ||
| require.NoError(t, err) | ||
|
|
||
| workflowOwner := "0xabcdef1234567890" | ||
|
|
||
| orgID, err := resolver.Get(ctx, workflowOwner) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "org-"+workflowOwner, orgID) | ||
| } | ||
|
|
||
| func TestOrgResolver_NewOrgResolver_RequiresClientOrURL(t *testing.T) { | ||
| cfg := Config{ | ||
| URL: "", // Empty URL should cause error | ||
| TLSEnabled: false, | ||
| WorkflowRegistryAddress: "0x1234567890abcdef", | ||
| WorkflowRegistryChainSelector: 1, | ||
| } | ||
|
|
||
| _, err := NewOrgResolverWithClient(cfg, nil, logger.Test(t)) | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), "URL is required when client is not provided") | ||
| } | ||
|
|
||
| func TestOrgResolver_NewOrgResolver_WithMockServer(t *testing.T) { | ||
| // Use in-memory connection for faster testing | ||
| lis := bufconn.Listen(1024 * 1024) | ||
| server := grpc.NewServer() | ||
| linkingclient.RegisterLinkingServiceServer(server, &mockLinkingServer{}) | ||
|
|
||
| go func() { | ||
| _ = server.Serve(lis) | ||
| }() | ||
| t.Cleanup(func() { server.Stop() }) | ||
|
|
||
| // Create gRPC client connection using bufconn | ||
| ctx := context.Background() | ||
| conn, err := grpc.NewClient("passthrough:///bufnet", | ||
| grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { | ||
| return lis.Dial() | ||
| }), | ||
| grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
| require.NoError(t, err) | ||
| defer conn.Close() | ||
|
|
||
| client := linkingclient.NewLinkingServiceClient(conn) | ||
|
|
||
| // Create OrgResolver using the client (simulating what NewOrgResolver would do) | ||
| cfg := Config{ | ||
| URL: "bufnet", // Not used since client is injected | ||
| TLSEnabled: false, | ||
| WorkflowRegistryAddress: "0x1234567890abcdef", | ||
| WorkflowRegistryChainSelector: 1, | ||
| } | ||
|
|
||
| resolver, err := NewOrgResolverWithClient(cfg, client, logger.Test(t)) | ||
| require.NoError(t, err) | ||
|
|
||
| workflowOwner := "0xabcdef1234567890" | ||
|
|
||
| orgID, err := resolver.Get(ctx, workflowOwner) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "org-"+workflowOwner, orgID) | ||
|
|
||
| err = resolver.Close() | ||
| require.NoError(t, err) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| package events | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "google.golang.org/protobuf/proto" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/beholder" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/custmsg" | ||
| workflowsevents "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" | ||
| ) | ||
|
|
||
| // Label keys for trigger events | ||
| const ( | ||
| KeyTriggerID = "trigger_id" | ||
| KeyWorkflowID = "workflow_id" | ||
| KeyWorkflowOwner = "workflow_owner" | ||
| KeyWorkflowName = "workflow_name" | ||
| KeyWorkflowExecutionID = "workflow_execution_id" | ||
| KeyDonID = "don_id" | ||
| KeyDonVersion = "don_version" | ||
| KeyOrganizationID = "organization_id" | ||
| ) | ||
|
|
||
| // EmitTriggerExecutionStarted emits a TriggerExecutionStarted event using the provided labeler | ||
| func EmitTriggerExecutionStarted(ctx context.Context, labeler custmsg.MessageEmitter) error { | ||
| labels := labeler.Labels() | ||
|
|
||
| // Extract required fields | ||
| triggerID, ok := labels[KeyTriggerID] | ||
| if !ok { | ||
| return fmt.Errorf("missing required field: %s", KeyTriggerID) | ||
| } | ||
|
|
||
| workflowID, ok := labels[KeyWorkflowID] | ||
| if !ok { | ||
| return fmt.Errorf("missing required field: %s", KeyWorkflowID) | ||
| } | ||
|
|
||
| workflowExecutionID, ok := labels[KeyWorkflowExecutionID] | ||
| if !ok { | ||
| return fmt.Errorf("missing required field: %s", KeyWorkflowExecutionID) | ||
| } | ||
|
|
||
| event := &workflowsevents.TriggerExecutionStarted{ | ||
| TriggerID: triggerID, | ||
| WorkflowExecutionID: workflowExecutionID, | ||
| Workflow: &workflowsevents.WorkflowKey{ | ||
| WorkflowID: workflowID, | ||
| WorkflowOwner: labels[KeyWorkflowOwner], | ||
| WorkflowName: labels[KeyWorkflowName], | ||
| OrganizationID: labels[KeyOrganizationID], | ||
| }, | ||
| Timestamp: time.Now().Format(time.RFC3339), | ||
| } | ||
|
|
||
| // Optional; downstream consumers could infer from csa public key, | ||
| // as of now Beholder/ChiP autohydrates csa public key | ||
| if donIDStr, exists := labels[KeyDonID]; exists { | ||
| if donID, err := strconv.ParseInt(donIDStr, 10, 32); err == nil { | ||
| event.CreInfo = &workflowsevents.CreInfo{ | ||
| DonID: int32(donID), | ||
| DonVersion: labels[KeyDonVersion], | ||
| } | ||
| } | ||
| } | ||
|
|
||
| b, err := proto.Marshal(event) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal TriggerExecutionStarted event: %w", err) | ||
| } | ||
|
|
||
| return beholder.GetEmitter().Emit(ctx, b, | ||
| "beholder_data_schema", "workflows.v2.trigger_execution_started", // required | ||
| "beholder_domain", "platform", // required | ||
| "beholder_entity", "workflows.v2.TriggerExecutionStarted") // required | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.