-
Notifications
You must be signed in to change notification settings - Fork 16
SqlServerTransport
The SQL server transport uses SQL tables to store messages. It supports the following features
- Delayed messages
- Message expiration
- Message priority
- Routing
- History tracking
- User-defined metadata columns
- Transactional, status-based, or FIFO processing
The SQL transport requires that the queue be created before usage. This can be done manually, or via the queue creation classes.
//Create the queue if it doesn't exist
var queueName = "testing";
var connectionString = "Server=V-SQL;Application Name=SQLProducer;Database=TestR;Trusted_Connection=True;";
var queueConnection = new QueueConnection(queueName, connectionString);
using (var createQueueContainer = new QueueCreationContainer<SqlServerMessageQueueInit>())
{
using (var createQueue = createQueueContainer.GetQueueCreation<SqlServerMessageQueueCreation>(queueConnection))
{
if (!createQueue.QueueExists)
{
//set specific queue options here before calling create
createQueue.CreateQueue();
}
}
}[Producer]
using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
{
using (var queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection))
{
queue.Send(new SimpleMessage {Message = "Hello World"});
}
}[Consumer]
using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
{
using (var queue = queueContainer.CreateConsumer(queueConnection))
{
var notifications = new ConsumerQueueNotifications(
(n) => Console.WriteLine($"Error: {n.Error}"),
(n) => Console.WriteLine($"Receive error: {n.Error}"),
(n) => Console.WriteLine($"Moved to error queue: {n.MessageId}"),
(n) => Console.WriteLine($"Poison message: {n.MessageId}"),
(n) => Console.WriteLine($"Rollback: {n.MessageId}"),
(n) => Console.WriteLine($"Completed: {n.MessageId}"));
queue.Start<SimpleMessage>(HandleMessages, notifications);
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
}
}
private void HandleMessages(IReceivedMessage<SimpleMessage> message, IWorkerNotification notifications)
{
notifications.Log.LogDebug($"Processing Message {message.Body.Message}");
}For more consumer patterns, see ConsumerMethod and ConsumerAsync.
If history tracking is enabled, see MessageHistory for retention and query options.
To enqueue a queue message inside your own business transaction (transactional outbox pattern), see Outbox Pattern.
[Using a Schema other than 'dbo']
'dbo' is the default schema that a queue will be created and then accessed in. You may specifiy a different schema on the connection properties; this same schema must be set whenever you access the queue.
var queueName = "testing";
var connectionString = "Server=V-SQL;Application Name=SQLProducer;Database=TestR;Trusted_Connection=True;";
var schema = "test1";
var settings = new Dictionary<string, string>();
settings.SetSchema(schema); //use built in ext. method to set schema
var queueConnection = new QueueConnection(queueName, connectionString, settings);
using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>())
{
using (var queue = queueContainer.CreateConsumer(queueConnection))
{
var notifications = new ConsumerQueueNotifications(
(n) => Console.WriteLine($"Error: {n.Error}"),
(n) => Console.WriteLine($"Receive error: {n.Error}"),
(n) => Console.WriteLine($"Moved to error queue: {n.MessageId}"),
(n) => Console.WriteLine($"Poison message: {n.MessageId}"),
(n) => Console.WriteLine($"Rollback: {n.MessageId}"),
(n) => Console.WriteLine($"Completed: {n.MessageId}"));
queue.Start<SimpleMessage>(HandleMessages, notifications);
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
}
}The above will use a schema named 'test1' instead of 'dbo'.
See the SQL Server samples in the DotNetWorkQueue.Samples repository.
For any issues please use the GitHub issues