Skip to content

SqlServerTransport

Brian Lehnen edited this page May 16, 2026 · 8 revisions

SQL Server Transport

The SQL server transport uses SQL tables to store messages. It supports the following features

  • Delayed messages
  • Message expiration
  • Message priority
  • Routing
  • History tracking
  • User-defined metadata columns
  • Transactional, status-based, or FIFO processing

The SQL transport requires that the queue be created before usage. This can be done manually, or via the queue creation classes.

Creating queues
//Create the queue if it doesn't exist
var queueName = "testing";
var connectionString = "Server=V-SQL;Application Name=SQLProducer;Database=TestR;Trusted_Connection=True;";
var queueConnection = new QueueConnection(queueName, connectionString);
using (var createQueueContainer = new QueueCreationContainer<SqlServerMessageQueueInit>())
{
	using (var createQueue = createQueueContainer.GetQueueCreation<SqlServerMessageQueueCreation>(queueConnection))
	{
		if (!createQueue.QueueExists)
		{
        	//set specific queue options here before calling create
			createQueue.CreateQueue();
		}
	}
}
Usage

[Producer]

using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
{
	using (var queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection))
    {
		queue.Send(new SimpleMessage {Message = "Hello World"});
    }
}

[Consumer]

using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
{
	using (var queue = queueContainer.CreateConsumer(queueConnection))
    {
		var notifications = new ConsumerQueueNotifications(
		    (n) => Console.WriteLine($"Error: {n.Error}"),
		    (n) => Console.WriteLine($"Receive error: {n.Error}"),
		    (n) => Console.WriteLine($"Moved to error queue: {n.MessageId}"),
		    (n) => Console.WriteLine($"Poison message: {n.MessageId}"),
		    (n) => Console.WriteLine($"Rollback: {n.MessageId}"),
		    (n) => Console.WriteLine($"Completed: {n.MessageId}"));
		queue.Start<SimpleMessage>(HandleMessages, notifications);
		Console.WriteLine("Processing messages - press any key to stop");
        Console.ReadKey((true));
    }
}

private void HandleMessages(IReceivedMessage<SimpleMessage> message, IWorkerNotification notifications)
{
	notifications.Log.LogDebug($"Processing Message {message.Body.Message}");
}

For more consumer patterns, see ConsumerMethod and ConsumerAsync.

If history tracking is enabled, see MessageHistory for retention and query options.

To enqueue a queue message inside your own business transaction (transactional outbox pattern), see Outbox Pattern.

[Using a Schema other than 'dbo']

'dbo' is the default schema that a queue will be created and then accessed in. You may specifiy a different schema on the connection properties; this same schema must be set whenever you access the queue.

var queueName = "testing";
var connectionString = "Server=V-SQL;Application Name=SQLProducer;Database=TestR;Trusted_Connection=True;";
var schema = "test1";
var settings = new Dictionary<string, string>();
settings.SetSchema(schema); //use built in ext. method to set schema
var queueConnection = new QueueConnection(queueName, connectionString, settings);
using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
{
	using (var queue = queueContainer.CreateConsumer(queueConnection))
    {
		var notifications = new ConsumerQueueNotifications(
		    (n) => Console.WriteLine($"Error: {n.Error}"),
		    (n) => Console.WriteLine($"Receive error: {n.Error}"),
		    (n) => Console.WriteLine($"Moved to error queue: {n.MessageId}"),
		    (n) => Console.WriteLine($"Poison message: {n.MessageId}"),
		    (n) => Console.WriteLine($"Rollback: {n.MessageId}"),
		    (n) => Console.WriteLine($"Completed: {n.MessageId}"));
		queue.Start<SimpleMessage>(HandleMessages, notifications);
		Console.WriteLine("Processing messages - press any key to stop");
        Console.ReadKey((true));
    }
}

The above will use a schema named 'test1' instead of 'dbo'.

Full samples

See the SQL Server samples in the DotNetWorkQueue.Samples repository.

Clone this wiki locally