-
Notifications
You must be signed in to change notification settings - Fork 2k
cre-2803: http boundry blocker investigation (core node side) #21827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
f094709
cre-2803: http boundry blocker investigation
mchain0 31677b5
cre-2803: minor improvements
mchain0 5e43d6c
Merge branch 'develop' into cre-2802-http-boundry-blocker-investigation
mchain0 928bc47
cre-2802: better assert
mchain0 2e3e639
cre-2802: swap sync atomic to old-school goshed to assert on the orde…
mchain0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
147 changes: 147 additions & 0 deletions
147
core/services/workflows/v2/engine_execution_concurrency_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| package v2_test | ||
|
|
||
| import ( | ||
| "context" | ||
| "runtime" | ||
| "slices" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/mock" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" | ||
| modulemocks "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host/mocks" | ||
| sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" | ||
|
|
||
| regmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" | ||
| capmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" | ||
| workflowEvents "github.com/smartcontractkit/chainlink/v2/core/services/workflows/events" | ||
| v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" | ||
| "github.com/smartcontractkit/chainlink/v2/core/utils/matches" | ||
| ) | ||
|
|
||
| // TestEngine_ExecutionConcurrencySerializesOverlappingRuns proves that when PerWorkflow | ||
| // ExecutionConcurrencyLimit is 1, a second trigger cannot start Module.Execute until the first | ||
| // run completes (executionsSemaphore.Wait blocks in handleAllTriggerEvents). | ||
| func TestEngine_ExecutionConcurrencySerializesOverlappingRuns(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| continueFirst := make(chan struct{}) | ||
| var execMu sync.Mutex | ||
| var execOrder []string | ||
|
|
||
| module := modulemocks.NewModuleV2(t) | ||
| module.EXPECT().Start().Once() | ||
| module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).Return(newTriggerSubs(1), nil).Once() | ||
| module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).Run( | ||
| func(_ context.Context, _ *sdkpb.ExecuteRequest, eh host.ExecutionHelper) { | ||
| execMu.Lock() | ||
| execOrder = append(execOrder, eh.GetWorkflowExecutionID()) | ||
| n := len(execOrder) | ||
| execMu.Unlock() | ||
| if n == 1 { | ||
| <-continueFirst | ||
| } | ||
| }).Return(nil, nil).Times(2) | ||
| module.EXPECT().Close().Once() | ||
|
|
||
| capreg := regmocks.NewCapabilitiesRegistry(t) | ||
| capreg.EXPECT().LocalNode(matches.AnyContext).Return(newNode(t), nil).Once() | ||
|
|
||
| initDoneCh := make(chan error, 1) | ||
| subscribedToTriggersCh := make(chan []string, 1) | ||
| executionFinishedCh := make(chan string, 2) | ||
|
|
||
| cfg := defaultTestConfig(t, func(cfg *cresettings.Workflows) { | ||
| cfg.ExecutionConcurrencyLimit.DefaultValue = 1 | ||
| }) | ||
| cfg.Module = module | ||
| cfg.CapRegistry = capreg | ||
| cfg.BillingClient = setupMockBillingClient(t) | ||
|
|
||
| wantExecID1, err := workflowEvents.GenerateExecutionID(cfg.WorkflowID, "event_concurrency_1") | ||
| require.NoError(t, err) | ||
| wantExecID2, err := workflowEvents.GenerateExecutionID(cfg.WorkflowID, "event_concurrency_2") | ||
| require.NoError(t, err) | ||
|
|
||
| cfg.Hooks = v2.LifecycleHooks{ | ||
| OnInitialized: func(err error) { | ||
| initDoneCh <- err | ||
| }, | ||
| OnSubscribedToTriggers: func(triggerIDs []string) { | ||
| subscribedToTriggersCh <- triggerIDs | ||
| }, | ||
| OnExecutionFinished: func(executionID string, _ string) { | ||
| executionFinishedCh <- executionID | ||
|
mchain0 marked this conversation as resolved.
|
||
| if executionID == wantExecID2 { | ||
| close(executionFinishedCh) | ||
| } | ||
| }, | ||
| } | ||
|
|
||
| engine, err := v2.NewEngine(cfg) | ||
| require.NoError(t, err) | ||
|
|
||
| trigger := capmocks.NewTriggerCapability(t) | ||
| capreg.EXPECT().GetTrigger(matches.AnyContext, "id_0").Return(trigger, nil).Once() | ||
| eventCh := make(chan capabilities.TriggerResponse) | ||
| trigger.EXPECT().RegisterTrigger(matches.AnyContext, mock.Anything).Return(eventCh, nil).Once() | ||
| trigger.EXPECT().UnregisterTrigger(matches.AnyContext, mock.Anything).Return(nil).Once() | ||
| trigger.EXPECT().AckEvent(matches.AnyContext, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() | ||
|
|
||
| require.NoError(t, engine.Start(t.Context())) | ||
| require.NoError(t, <-initDoneCh) | ||
| require.Equal(t, []string{"id_0"}, <-subscribedToTriggersCh) | ||
|
|
||
| eventCh <- capabilities.TriggerResponse{ | ||
| Event: capabilities.TriggerEvent{ | ||
| TriggerType: "basic-trigger@1.0.0", | ||
| ID: "event_concurrency_1", | ||
| Payload: nil, | ||
| }, | ||
| } | ||
|
|
||
| require.Eventually(t, func() bool { | ||
| execMu.Lock() | ||
| defer execMu.Unlock() | ||
| return len(execOrder) == 1 && execOrder[0] == wantExecID1 | ||
| }, 2*time.Second, 5*time.Millisecond, "first execution should start") | ||
|
|
||
| eventCh <- capabilities.TriggerResponse{ | ||
| Event: capabilities.TriggerEvent{ | ||
| TriggerType: "basic-trigger@1.0.0", | ||
| ID: "event_concurrency_2", | ||
| Payload: nil, | ||
| }, | ||
| } | ||
|
|
||
| for i := 0; i < 10_000; i++ { | ||
| runtime.Gosched() | ||
| } | ||
| execMu.Lock() | ||
| gotMid := slices.Clone(execOrder) | ||
| execMu.Unlock() | ||
| require.Equal(t, []string{wantExecID1}, gotMid, | ||
| "second execution must not start while the first holds the executions semaphore") | ||
|
|
||
| continueFirst <- struct{}{} | ||
|
|
||
| require.Eventually(t, func() bool { | ||
| execMu.Lock() | ||
| defer execMu.Unlock() | ||
| return slices.Equal(execOrder, []string{wantExecID1, wantExecID2}) | ||
| }, 2*time.Second, 5*time.Millisecond, "second execution should start after the first completes") | ||
|
|
||
| finishedIDs := make([]string, 0, 2) | ||
| for id := range executionFinishedCh { | ||
| finishedIDs = append(finishedIDs, id) | ||
| } | ||
| require.Equal(t, []string{wantExecID1, wantExecID2}, finishedIDs) | ||
|
|
||
| require.NoError(t, engine.Close()) | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.