-
-
Notifications
You must be signed in to change notification settings - Fork 10
Simplify.Pipelines
Provides a conveyor (production line) for processing an item through a sequence of stages. Both synchronous and asynchronous models are available, and the conveyor raises events when processing starts and after each stage completes.
Available at NuGet as binary package
All public types live in the Simplify.Pipelines.ProductionLine namespace.
| Role | Synchronous | Asynchronous |
|---|---|---|
| Single processing step | IConveyorStage<T> |
IAsyncConveyorStage<T> |
| Ordered set of stages |
IConveyor<T> (impl. Conveyor<T>) |
IAsyncConveyor<T> (impl. AsyncConveyor<T>) |
| Item source | IConveyorItemPreparer<T> |
IAsyncConveyorItemPreparer<T> |
| Prepare + run |
IConveyorExecutor<T> (impl. ConveyorExecutor<T>) |
IAsyncConveyorExecutor<T> (impl. AsyncConveyorExecutor<T>) |
Key signatures:
public interface IConveyorStage<in T> { void Execute(T item); }
public interface IAsyncConveyorStage<in T> { Task Execute(T item); }
public interface IConveyor<T> { void Execute(T item); event ConveyorAction<T> OnConveyorStart; event ConveyorStageAction<T> OnStageExecuted; }
public interface IAsyncConveyor<T> { Task Execute(T item); event ConveyorAction<T> OnConveyorStart; event ConveyorStageAction<T> OnStageExecuted; }
public interface IConveyorItemPreparer<out T> { T GetItem(); }
public interface IAsyncConveyorItemPreparer<T> { Task<T> GetItem(); }
public interface IConveyorExecutor<out T> { T Execute(); }
public interface IAsyncConveyorExecutor<T> { Task<T> Execute(); }The event delegates are:
public delegate void ConveyorAction<in T>(T item);
public delegate void ConveyorStageAction<in T>(Type stageType, T item);Conveyor<T> / AsyncConveyor<T> are constructed from a list of stages, and the executors are constructed from a conveyor and an item preparer:
public Conveyor(IList<IConveyorStage<T>> stages);
public AsyncConveyor(IList<IAsyncConveyorStage<T>> stages);
public ConveyorExecutor(IConveyor<T> conveyor, IConveyorItemPreparer<T> itemPreparer);
public AsyncConveyorExecutor(IAsyncConveyor<T> conveyor, IAsyncConveyorItemPreparer<T> itemPreparer);When the executor's Execute() is called it asks the item preparer for an item, raises OnConveyorStart, runs the item through each stage in order, raises OnStageExecuted after each stage, and returns the processed item.
using System.Collections.Generic;
using Simplify.Pipelines.ProductionLine;
public class Order
{
public int Id { get; set; }
public decimal Amount { get; set; }
public string? Status { get; set; }
}
public class ValidateStage : IConveyorStage<Order>
{
public void Execute(Order item)
{
if (item.Amount <= 0)
throw new InvalidOperationException("Amount must be positive");
item.Status = "Validated";
}
}
public class CompleteStage : IConveyorStage<Order>
{
public void Execute(Order item) => item.Status = "Completed";
}
public class OrderPreparer : IConveyorItemPreparer<Order>
{
public Order GetItem() => new Order { Id = 1, Amount = 99.99m };
}var stages = new List<IConveyorStage<Order>>
{
new ValidateStage(),
new CompleteStage()
};
var conveyor = new Conveyor<Order>(stages);
conveyor.OnConveyorStart += order => Console.WriteLine($"Started order {order.Id}");
conveyor.OnStageExecuted += (stageType, order) =>
Console.WriteLine($"{stageType.Name} done, status: {order.Status}");
var executor = new ConveyorExecutor<Order>(conveyor, new OrderPreparer());
Order processed = executor.Execute();Use the Async* counterparts; stages return Task and the executor's Execute() returns Task<T>:
using System.Collections.Generic;
using Simplify.Pipelines.ProductionLine;
public class AsyncValidateStage : IAsyncConveyorStage<Order>
{
public async Task Execute(Order item)
{
await Task.Yield();
item.Status = "Validated";
}
}
public class AsyncOrderPreparer : IAsyncConveyorItemPreparer<Order>
{
public Task<Order> GetItem() => Task.FromResult(new Order { Id = 1, Amount = 99.99m });
}var stages = new List<IAsyncConveyorStage<Order>> { new AsyncValidateStage() };
var conveyor = new AsyncConveyor<Order>(stages);
var executor = new AsyncConveyorExecutor<Order>(conveyor, new AsyncOrderPreparer());
Order processed = await executor.Execute();Simplify.Pipelines does not ship dedicated Simplify.DI registration extensions. Register the conveyor, its stages, the item preparer, and the executor in your container manually (for example via DIContainer.Current.Register<...>()), or construct them directly as shown above.