Skip to content

ConsumerAsync

Brian Lehnen edited this page Apr 8, 2026 · 10 revisions

Consuming messages

You may consume messages in a couple of different ways. Either via dedicated worker threads Consumer, or via a pool of shared threads between multiple queues.

This page demonstrates the scheduler-based async consumer (CreateConsumerQueueScheduler), which manages a thread pool via TaskScheduler. For the lower-level CreateConsumerAsync API, the same Start() pattern applies but the worker action must return Task.

Creating the queue

You'll need the following

  • A task scheduler instance
  • The transport your connecting to, determined by the transport init module you specify when creating the container.
  • The connection string to the transport
  • The name of the queue
  • A delegate that will be called for each message to process

ConsumerQueueNotifications provides 6 callback delegates for error handling and completion tracking.

using (var schedulerContainer = new SchedulerContainer())
{
    using (var scheduler = schedulerContainer.CreateTaskScheduler())
    {
        var factory = schedulerContainer.CreateTaskFactory(scheduler);
        factory.Scheduler.Configuration.MaximumThreads = 8;
        factory.Scheduler.Configuration.MinimumThreads = 0;
        factory.Scheduler.Start();
        using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
        {
            var queueConnection = new QueueConnection(queueName, connectionString);
            using (var queue = queueContainer.CreateConsumerQueueScheduler(queueConnection, factory))
            {
                var notifications = new ConsumerQueueNotifications(
                    (notification) => { /* handle errors */ },
                    (notification) => { /* handle receive errors */ },
                    (notification) => { /* handle message moved to error queue */ },
                    (notification) => { /* handle poison message */ },
                    (notification) => { /* handle rollback */ },
                    (notification) => { /* handle completion */ });
                queue.Start<SimpleMessage>(Handle, notifications);
                Console.WriteLine("Processing messages - press any key to stop");
                Console.ReadKey((true));
            }
        }
    }
}

private void Handle(IReceivedMessage<SimpleMessage> m, IWorkerNotification n)
{
    //processing logic goes here
}

You may use a default scheduler and factory by not passing one to the creation logic. However, this means you will be unable to change their configuration. It also means you won't be able to re-use the scheduler with multiple queues.

When your delegate method finishes with no errors, the message will be acked and considered finished. However, you may wish to check for a condition in your logic

  • The queue shutting down. A cancel token is provided for this.
    • If the transport supports rollback, you may throw an operation canceled exception to requeue the message

For example, here is how you can check to see if cancellation is requested, and also force a requeue. Note that we are verifying that the transport supports rollbacks first.

if (notifications.TransportSupportsRollback && notifications.MessageCancellation.Token.IsCancellationRequested)
{
    notifications.Log.LogDebug("Cancel has been requested - aborting");
    notifications.MessageCancellation.Token.ThrowIfCancellationRequested();
}

You could register a delegate with the Cancel token instead, so that you don't have to constantly check the token throughout your code.

Stopping the queue

To stop the queue, call dispose on it.

queue.Dispose();

Calling dispose on the queue container will dispose all queues created by that container as well. If you created a task scheduler, you should dispose of it after all queues have been disposed.

scheduler.Dispose();

Dispose is blocking operation. Depending on your configuration settings and how quickly your message consuming code responds to cancels, it may take a while to return.

For a complete working example, see SQLServerConsumerAsync in the samples repository.

Clone this wiki locally