Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions src/Paramore.Brighter.Mediator/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,19 @@ public Runner(IAmAJobChannel<TData> channel, IAmAStateStoreAsync stateStore, IAm
/// Runs the job processing loop.
/// </summary>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
public void RunAsync(CancellationToken cancellationToken = default(CancellationToken))
/// <returns>A task that completes when the job processing loop exits (via cancellation or channel closure).</returns>
public async Task RunAsync(CancellationToken cancellationToken = default(CancellationToken))
{
s_logger.LogInformation("Starting runner {RunnerName}", _runnerName);

var task = Task.Factory.StartNew(async () =>
{
cancellationToken.ThrowIfCancellationRequested();

await ProcessJobs(cancellationToken);

cancellationToken.ThrowIfCancellationRequested();

}, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

Task.WaitAll([task], cancellationToken);

s_logger.LogInformation("Finished runner {RunnerName}", _runnerName);
try
{
await Task.Run(() => ProcessJobs(cancellationToken), cancellationToken);
}
finally
{
s_logger.LogInformation("Finished runner {RunnerName}", _runnerName);
}
}

private async Task Execute(Job<TData>? job, CancellationToken cancellationToken = default(CancellationToken))
Expand Down
26 changes: 10 additions & 16 deletions src/Paramore.Brighter.Mediator/Waker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,19 @@ public Waker(TimeSpan jobAge, Scheduler<TData> scheduler)
/// This will periodically wake up and trigger due jobs in the scheduler.
/// </summary>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous run operation.</returns>
public void RunAsync(CancellationToken cancellationToken = default(CancellationToken))
/// <returns>A task that completes when the wake loop exits (via cancellation).</returns>
public async Task RunAsync(CancellationToken cancellationToken = default(CancellationToken))
{
s_logger.LogInformation("Starting waker {WakerName}", _wakerName);

var task = Task.Factory.StartNew(async () =>
{
cancellationToken.ThrowIfCancellationRequested();

await Wake(cancellationToken);

if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

}, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

Task.WaitAll([task], cancellationToken);

s_logger.LogInformation("Finished waker {WakerName}", _wakerName);
try
{
await Task.Run(() => Wake(cancellationToken), cancellationToken);
}
finally
{
s_logger.LogInformation("Finished waker {WakerName}", _wakerName);
}
}

private async Task Wake(CancellationToken cancellationToken = default(CancellationToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,31 @@ public MediatorWaitStepFlowTests(ITestOutputHelper testOutputHelper)
public async Task When_running_a_wait_workflow()
{
var ct = new CancellationTokenSource();
ct.CancelAfter( TimeSpan.FromSeconds(3));
ct.CancelAfter(TimeSpan.FromSeconds(3));

Task runnerTask = Task.CompletedTask;
Task wakerTask = Task.CompletedTask;
try
{
await _scheduler.ScheduleAsync(_job);

_timeProvider.Advance(TimeSpan.FromMilliseconds(1000));

_runner.RunAsync(ct.Token);
_waker.RunAsync(ct.Token);

await Task.Delay(5, ct.Token);
runnerTask = _runner.RunAsync(ct.Token);
wakerTask = _waker.RunAsync(ct.Token);

await Task.Delay(TimeSpan.FromMilliseconds(500), ct.Token);
}
catch (Exception e)
catch (OperationCanceledException e)
{
_testOutputHelper.WriteLine(e.ToString());
}

Assert.True(_stepCompleted);
Assert.Equal(JobState.Done, _job.State);

ct.Cancel();
try { await Task.WhenAll(runnerTask, wakerTask); }
catch (OperationCanceledException) { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MediatorChangeStepFlowTests
private readonly ITestOutputHelper _testOutputHelper;
private readonly Scheduler<WorkflowTestData> _scheduler;
private readonly Runner<WorkflowTestData> _runner;
private readonly InMemoryJobChannel<WorkflowTestData> _channel;
private readonly Job<WorkflowTestData> _job;
private bool _stepCompleted;

Expand Down Expand Up @@ -50,34 +51,33 @@ public MediatorChangeStepFlowTests (ITestOutputHelper testOutputHelper)
_job.InitSteps(firstStep);

var store = new InMemoryStateStoreAsync ();
InMemoryJobChannel<WorkflowTestData> channel = new();
_channel = new InMemoryJobChannel<WorkflowTestData>();

_scheduler = new Scheduler<WorkflowTestData>(
channel,
_channel,
store
);
_runner = new Runner<WorkflowTestData>(channel, store, commandProcessor, _scheduler);

_runner = new Runner<WorkflowTestData>(_channel, store, commandProcessor, _scheduler);
}

[Fact]
public async Task When_running_a_change_workflow()
{
await _scheduler.ScheduleAsync(_job);
_channel.Stop();

//let it run long enough to finish work, then terminate
var ct = new CancellationTokenSource();
ct.CancelAfter( TimeSpan.FromSeconds(1) );
ct.CancelAfter(TimeSpan.FromSeconds(1));
try
{
_runner.RunAsync(ct.Token);
await _runner.RunAsync(ct.Token);
}
catch (Exception ex)
catch (OperationCanceledException ex)
{
// swallow the exception, we expect it to be cancelled
_testOutputHelper.WriteLine(ex.ToString());
}

Assert.Equal(JobState.Done, _job.State);
Assert.True(_stepCompleted);
Assert.Equal("Altered", _job.Data.Bag["MyValue"]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class MediatorFailingChoiceFlowTests
private readonly ITestOutputHelper _testOutputHelper;
private readonly Scheduler<WorkflowTestData> _scheduler;
private readonly Runner<WorkflowTestData> _runner;
private readonly InMemoryJobChannel<WorkflowTestData> _channel;
private readonly Job<WorkflowTestData> _job;
private bool _stepCompletedOne;
private bool _stepCompletedTwo;
Expand Down Expand Up @@ -70,14 +71,14 @@ public MediatorFailingChoiceFlowTests(ITestOutputHelper testOutputHelper)
_job.InitSteps(stepOne);

InMemoryStateStoreAsync store = new();
InMemoryJobChannel<WorkflowTestData> channel = new();
_channel = new InMemoryJobChannel<WorkflowTestData>();

_scheduler = new Scheduler<WorkflowTestData>(
channel,
_channel,
store
);
_runner = new Runner<WorkflowTestData>(channel, store, commandProcessor, _scheduler);

_runner = new Runner<WorkflowTestData>(_channel, store, commandProcessor, _scheduler);
}

[Fact]
Expand All @@ -87,15 +88,16 @@ public async Task When_running_a_choice_workflow_step()
MyOtherCommandHandlerAsync.ReceivedCommands.Clear();

await _scheduler.ScheduleAsync(_job);

_channel.Stop();

var ct = new CancellationTokenSource();
ct.CancelAfter( TimeSpan.FromSeconds(1) );
ct.CancelAfter(TimeSpan.FromSeconds(1));

try
{
_runner.RunAsync(ct.Token);
await _runner.RunAsync(ct.Token);
}
catch (Exception e)
catch (OperationCanceledException e)
{
_testOutputHelper.WriteLine(e.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MediatorReplyMultiStepFlowTests
private readonly ITestOutputHelper _testOutputHelper;
private readonly Scheduler<WorkflowTestData> _scheduler;
private readonly Runner<WorkflowTestData> _runner;
private readonly InMemoryJobChannel<WorkflowTestData> _channel;
private readonly Job<WorkflowTestData> _job;
private bool _stepCompletedOne;
private bool _stepCompletedTwo;
Expand Down Expand Up @@ -61,14 +62,14 @@ public MediatorReplyMultiStepFlowTests(ITestOutputHelper testOutputHelper)
_job.InitSteps(stepOne);

InMemoryStateStoreAsync store = new();
InMemoryJobChannel<WorkflowTestData> channel = new();
_channel = new InMemoryJobChannel<WorkflowTestData>();

_scheduler = new Scheduler<WorkflowTestData>(
channel,
_channel,
store
);
_runner = new Runner<WorkflowTestData>(channel, store, commandProcessor, _scheduler);

_runner = new Runner<WorkflowTestData>(_channel, store, commandProcessor, _scheduler);
}

[Fact]
Expand All @@ -78,19 +79,20 @@ public async Task When_running_a_workflow_with_reply()
MyEventHandlerAsync.ReceivedEvents.Clear();

await _scheduler.ScheduleAsync(_job);

_channel.Stop();

var ct = new CancellationTokenSource();
ct.CancelAfter( TimeSpan.FromSeconds(1) );
ct.CancelAfter(TimeSpan.FromSeconds(1));

try
{
_runner.RunAsync(ct.Token);
await _runner.RunAsync(ct.Token);
}
catch (Exception e)
catch (OperationCanceledException e)
{
_testOutputHelper.WriteLine(e.ToString());
}

Assert.True(_stepCompletedOne);
Assert.True(_stepCompletedTwo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class MediatorPassingChoiceFlowTests
private readonly ITestOutputHelper _testOutputHelper;
private readonly Scheduler<WorkflowTestData> _scheduler;
private readonly Runner<WorkflowTestData> _runner;
private readonly InMemoryJobChannel<WorkflowTestData> _channel;
private readonly Job<WorkflowTestData> _job;
private bool _stepCompletedOne;
private bool _stepCompletedTwo;
Expand Down Expand Up @@ -70,14 +71,14 @@ public MediatorPassingChoiceFlowTests(ITestOutputHelper testOutputHelper)
_job.InitSteps(stepOne);

InMemoryStateStoreAsync store = new();
InMemoryJobChannel<WorkflowTestData> channel = new();
_channel = new InMemoryJobChannel<WorkflowTestData>();

_scheduler = new Scheduler<WorkflowTestData>(
channel,
_channel,
store
);
_runner = new Runner<WorkflowTestData>(channel, store, commandProcessor, _scheduler);

_runner = new Runner<WorkflowTestData>(_channel, store, commandProcessor, _scheduler);
}

[Fact]
Expand All @@ -87,15 +88,16 @@ public async Task When_running_a_choice_workflow_step()
MyOtherCommandHandlerAsync.ReceivedCommands.Clear();

await _scheduler.ScheduleAsync(_job);

_channel.Stop();

var ct = new CancellationTokenSource();
ct.CancelAfter( TimeSpan.FromSeconds(1) );
ct.CancelAfter(TimeSpan.FromSeconds(1));

try
{
_runner.RunAsync(ct.Token);
await _runner.RunAsync(ct.Token);
}
catch (Exception e)
catch (OperationCanceledException e)
{
_testOutputHelper.WriteLine(e.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MediatorOneStepFlowTests
private readonly ITestOutputHelper _testOutputHelper;
private readonly Scheduler<WorkflowTestData> _scheduler;
private readonly Runner<WorkflowTestData> _runner;
private readonly InMemoryJobChannel<WorkflowTestData> _channel;
private readonly Job<WorkflowTestData> _job;
private bool _stepCompleted;

Expand Down Expand Up @@ -46,35 +47,36 @@ public MediatorOneStepFlowTests(ITestOutputHelper testOutputHelper)
_job.InitSteps(firstStep);

InMemoryStateStoreAsync store = new();
InMemoryJobChannel<WorkflowTestData> channel = new();
_channel = new InMemoryJobChannel<WorkflowTestData>();

_scheduler = new Scheduler<WorkflowTestData>(
channel,
_channel,
store
);
_runner = new Runner<WorkflowTestData>(channel, store, commandProcessor, _scheduler);

_runner = new Runner<WorkflowTestData>(_channel, store, commandProcessor, _scheduler);
}

[Fact]
public async Task When_running_a_single_step_workflow()
{
MyCommandHandlerAsync.ReceivedCommands.Clear();

await _scheduler.ScheduleAsync(_job);

await _scheduler.ScheduleAsync(_job);
_channel.Stop();

var ct = new CancellationTokenSource();
ct.CancelAfter( TimeSpan.FromSeconds(1) );
ct.CancelAfter(TimeSpan.FromSeconds(1));

try
{
_runner.RunAsync(ct.Token);
await _runner.RunAsync(ct.Token);
}
catch (Exception e)
catch (OperationCanceledException e)
{
_testOutputHelper.WriteLine(e.ToString());
}

Assert.Contains(MyCommandHandlerAsync.ReceivedCommands, c => c.Value == "Test");
Assert.Equal(JobState.Done, _job.State);
Assert.True(_stepCompleted);
Expand Down
Loading
Loading