-
-
Notifications
You must be signed in to change notification settings - Fork 21
perf: Add cancellation support and short-circuit to ModuleConditionHandler #1720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,4 @@ | ||
| using System.Reflection; | ||
| using EnumerableAsyncProcessor.Extensions; | ||
| using Microsoft.Extensions.Options; | ||
| using ModularPipelines.Attributes; | ||
| using ModularPipelines.Context; | ||
|
|
@@ -20,7 +19,7 @@ public ModuleConditionHandler(IOptions<PipelineOptions> pipelineOptions, IPipeli | |
| _pipelineContextProvider = pipelineContextProvider; | ||
| } | ||
|
|
||
| public async Task<(bool ShouldIgnore, SkipDecision? SkipDecision)> ShouldIgnore(IModule module) | ||
| public async Task<(bool ShouldIgnore, SkipDecision? SkipDecision)> ShouldIgnore(IModule module, CancellationToken cancellationToken = default) | ||
| { | ||
| var moduleType = module.GetType(); | ||
|
|
||
|
|
@@ -34,7 +33,7 @@ public ModuleConditionHandler(IOptions<PipelineOptions> pipelineOptions, IPipeli | |
| return (true, SkipDecision.Skip("The module was not in a runnable category")); | ||
| } | ||
|
|
||
| var conditionResult = await IsRunnableCondition(moduleType).ConfigureAwait(false); | ||
| var conditionResult = await IsRunnableCondition(moduleType, cancellationToken).ConfigureAwait(false); | ||
| if (!conditionResult.IsRunnable) | ||
| { | ||
| return (true, conditionResult.SkipDecision); | ||
|
|
@@ -71,43 +70,43 @@ private bool IsIgnoreCategory(Type moduleType) | |
| return category != null && ignoreCategories.Contains(category.Category); | ||
| } | ||
|
|
||
| private async Task<(bool IsRunnable, SkipDecision? SkipDecision)> IsRunnableCondition(Type moduleType) | ||
| private async Task<(bool IsRunnable, SkipDecision? SkipDecision)> IsRunnableCondition(Type moduleType, CancellationToken cancellationToken) | ||
| { | ||
| var mandatoryRunConditionAttributes = moduleType.GetCustomAttributes<MandatoryRunConditionAttribute>(true).ToList(); | ||
| var runConditionAttributes = moduleType.GetCustomAttributes<RunConditionAttribute>(true).Except(mandatoryRunConditionAttributes).ToList(); | ||
|
|
||
| // Get a context for condition evaluation | ||
| var pipelineContext = _pipelineContextProvider.GetModuleContext(); | ||
|
|
||
| var mandatoryConditionResults = await mandatoryRunConditionAttributes.ToAsyncProcessorBuilder() | ||
| .SelectAsync(async runConditionAttribute => new RunnableConditionMet(await runConditionAttribute.Condition(pipelineContext).ConfigureAwait(false), runConditionAttribute)) | ||
| .ProcessInParallel(); | ||
|
|
||
| var mandatoryCondition = mandatoryConditionResults.FirstOrDefault(result => !result.ConditionMet); | ||
|
|
||
| if (mandatoryCondition != null) | ||
| // Evaluate mandatory conditions sequentially with short-circuit on first failure | ||
| foreach (var attr in mandatoryRunConditionAttributes) | ||
| { | ||
| return (false, SkipDecision.Skip($"A condition to run this module has not been met - {mandatoryCondition.RunConditionAttribute.GetType().Name}")); | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| var conditionMet = await attr.Condition(pipelineContext).ConfigureAwait(false); | ||
| if (!conditionMet) | ||
| { | ||
| return (false, SkipDecision.Skip($"A condition to run this module has not been met - {attr.GetType().Name}")); | ||
| } | ||
| } | ||
|
|
||
| if (!runConditionAttributes.Any()) | ||
| { | ||
| return (true, null); | ||
| } | ||
|
|
||
| var conditionResults = await runConditionAttributes.ToAsyncProcessorBuilder() | ||
| .SelectAsync(async runConditionAttribute => new RunnableConditionMet(await runConditionAttribute.Condition(pipelineContext).ConfigureAwait(false), runConditionAttribute)) | ||
| .ProcessInParallel(); | ||
|
|
||
| var runnableCondition = conditionResults.FirstOrDefault(result => result.ConditionMet); | ||
|
|
||
| if (runnableCondition != null) | ||
| // Evaluate non-mandatory conditions sequentially with short-circuit on first success | ||
| foreach (var attr in runConditionAttributes) | ||
| { | ||
| return (true, null); | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
|
|
||
| var conditionMet = await attr.Condition(pipelineContext).ConfigureAwait(false); | ||
| if (conditionMet) | ||
| { | ||
| return (true, null); | ||
| } | ||
| } | ||
|
|
||
| return (false, SkipDecision.Skip($"No run conditions were met: {string.Join(", ", runConditionAttributes.Select(x => x.GetType().Name.Replace("Attribute", string.Empty, StringComparison.OrdinalIgnoreCase)))}")); | ||
| } | ||
|
|
||
| private record RunnableConditionMet(bool ConditionMet, RunConditionAttribute RunConditionAttribute); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancellation token is being checked between evaluating conditions, but it's not being passed to the
attr.Condition(pipelineContext)call on line 86. This means the actual condition evaluation cannot be cancelled, which limits the effectiveness of the cancellation support added in this PR. Consider updating theRunConditionAttribute.Conditionmethod signature to accept aCancellationTokenparameter.