Skip to content

Commit 8dd23fa

Browse files
committed
- event processing
1 parent 6c86ec0 commit 8dd23fa

4 files changed

Lines changed: 23 additions & 30 deletions

File tree

Shuttle.Recall/EventProcessing/EventProcessor.cs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -278,39 +278,32 @@ private async Task<IEventProcessor> StartAsync(bool sync)
278278
return this;
279279
}
280280

281-
Started = true;
282281
Asynchronous = !sync;
283282

284283
foreach (var projectionAggregation in _projectionAggregations.Values)
285284
{
286285
projectionAggregation.AddEventTypes();
287286
}
288287

289-
try
288+
var startupPipeline = _pipelineFactory.GetPipeline<EventProcessorStartupPipeline>();
289+
290+
if (sync)
291+
{
292+
startupPipeline.Execute();
293+
}
294+
else
290295
{
291-
var startupPipeline = _pipelineFactory.GetPipeline<EventProcessorStartupPipeline>();
296+
await startupPipeline.ExecuteAsync().ConfigureAwait(false);
297+
}
292298

293-
if (sync)
294-
{
295-
startupPipeline.Execute();
296-
}
297-
else
298-
{
299-
await startupPipeline.ExecuteAsync().ConfigureAwait(false);
300-
}
299+
_cancellationTokenSource = new CancellationTokenSource();
300+
_cancellationToken = _cancellationTokenSource.Token;
301301

302-
_cancellationTokenSource = new CancellationTokenSource();
303-
_cancellationToken = _cancellationTokenSource.Token;
302+
_eventProcessorThreadPool = startupPipeline.State.Get<IProcessorThreadPool>("EventProcessorThreadPool");
304303

305-
_eventProcessorThreadPool = startupPipeline.State.Get<IProcessorThreadPool>("EventProcessorThreadPool");
304+
_sequenceNumberTailThread.Start();
306305

307-
_sequenceNumberTailThread.Start();
308-
}
309-
catch
310-
{
311-
Started = false;
312-
throw;
313-
}
306+
Started = true;
314307

315308
return this;
316309
}

Shuttle.Recall/Pipeline/Events.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,15 @@ public class OnAfterEncryptEvent : PipelineEvent
170170
{
171171
}
172172

173-
public class OnBeforeStartEventProcessingEvent : PipelineEvent
173+
public class OnBeforeStartEventProcessing : PipelineEvent
174174
{
175175
}
176176

177-
public class OnStartEventProcessingEvent : PipelineEvent
177+
public class OnStartEventProcessing : PipelineEvent
178178
{
179179
}
180180

181-
public class OnAfterStartEventProcessingEvent : PipelineEvent
181+
public class OnAfterStartEventProcessing : PipelineEvent
182182
{
183183
}
184184

Shuttle.Recall/Pipeline/Observers/EventProjection/StartupEventProcessingObserver.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ public StartupEventProcessingObserver(IOptions<EventStoreOptions> eventStoreOpti
2424
_eventStoreConfiguration = Guard.AgainstNull(eventStoreConfiguration, nameof(eventStoreConfiguration));
2525
}
2626

27-
public void Execute(OnStartEventProcessingEvent pipelineEvent)
27+
public void Execute(OnStartEventProcessing pipelineEvent)
2828
{
2929
ExecuteAsync(pipelineEvent).GetAwaiter().GetResult();
3030
}
3131

32-
public async Task ExecuteAsync(OnStartEventProcessingEvent pipelineEvent)
32+
public async Task ExecuteAsync(OnStartEventProcessing pipelineEvent)
3333
{
3434
Guard.AgainstNull(pipelineEvent, nameof(pipelineEvent));
3535

@@ -91,7 +91,7 @@ private async Task ExecuteAsync(OnStartThreadPools pipelineEvent, bool sync)
9191
}
9292

9393
public interface IStartupEventProcessingObserver :
94-
IPipelineObserver<OnStartEventProcessingEvent>,
94+
IPipelineObserver<OnStartEventProcessing>,
9595
IPipelineObserver<OnConfigureThreadPools>,
9696
IPipelineObserver<OnStartThreadPools>
9797
{

Shuttle.Recall/Pipeline/Pipelines/EventProcessorStartupPipeline.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ public EventProcessorStartupPipeline(IStartupEventProcessingObserver startupEven
1313
RegisterStage("Process")
1414
.WithEvent<OnStartTransactionScope>()
1515
.WithEvent<OnAfterStartTransactionScope>()
16-
.WithEvent<OnBeforeStartEventProcessingEvent>()
17-
.WithEvent<OnStartEventProcessingEvent>()
18-
.WithEvent<OnAfterStartEventProcessingEvent>()
16+
.WithEvent<OnBeforeStartEventProcessing>()
17+
.WithEvent<OnStartEventProcessing>()
18+
.WithEvent<OnAfterStartEventProcessing>()
1919
.WithEvent<OnCompleteTransactionScope>()
2020
.WithEvent<OnDisposeTransactionScope>()
2121
.WithEvent<OnConfigureThreadPools>()

0 commit comments

Comments
 (0)