Skip to content

Commit ca20e73

Browse files
committed
Merge pull request #315 from sharwell/messages-tests
Updated tests for CloudQueuesProvider.ListAllMessagesAsync
2 parents 5ddd1dc + 928882a commit ca20e73

2 files changed

Lines changed: 138 additions & 2 deletions

File tree

src/testing/integration/Providers/Rackspace/UserQueuesTests.cs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public async Task TestQueueStatistics()
262262
[TestMethod]
263263
[TestCategory(TestCategories.User)]
264264
[TestCategory(TestCategories.Queues)]
265-
public async Task TestListAllQueueMessages()
265+
public async Task TestListAllQueueMessagesWithUpdates()
266266
{
267267
IQueueingService provider = CreateProvider();
268268
using (CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TestTimeout(TimeSpan.FromSeconds(10))))
@@ -314,6 +314,42 @@ public async Task TestListAllQueueMessages()
314314
}
315315
}
316316

317+
[TestMethod]
318+
[TestCategory(TestCategories.User)]
319+
[TestCategory(TestCategories.Queues)]
320+
public async Task TestListAllQueueMessages()
321+
{
322+
IQueueingService provider = CreateProvider();
323+
using (CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TestTimeout(TimeSpan.FromSeconds(10))))
324+
{
325+
QueueName queueName = CreateRandomQueueName();
326+
327+
await provider.CreateQueueAsync(queueName, cancellationTokenSource.Token);
328+
329+
List<Task> postMessagesTasks = new List<Task>();
330+
for (int i = 0; i < 28; i++)
331+
{
332+
postMessagesTasks.Add(provider.PostMessagesAsync(queueName, cancellationTokenSource.Token, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(i, "Some Message " + i))));
333+
}
334+
335+
await Task.Factory.ContinueWhenAll(postMessagesTasks.ToArray(), TaskExtrasExtensions.PropagateExceptions);
336+
337+
HashSet<int> locatedMessages = new HashSet<int>();
338+
339+
ReadOnlyCollection<QueuedMessage> messages = await ListAllMessagesAsync(provider, queueName, null, true, false, cancellationTokenSource.Token, null);
340+
foreach (QueuedMessage message in messages)
341+
Assert.IsTrue(locatedMessages.Add(message.Body.ToObject<SampleMetadata>().ValueA), "Received the same message more than once.");
342+
343+
Assert.AreEqual(28, locatedMessages.Count);
344+
for (int i = 0; i < 28; i++)
345+
{
346+
Assert.IsTrue(locatedMessages.Contains(i), "The message listing did not include message '{0}', which was in the queue when the listing started and still in it afterwards.", i);
347+
}
348+
349+
await provider.DeleteQueueAsync(queueName, cancellationTokenSource.Token);
350+
}
351+
}
352+
317353
/// <summary>
318354
/// Tests the queueing service message functionality by creating two queues
319355
/// and two sub-processes and using them for two-way communication.
@@ -680,6 +716,40 @@ private static async Task<ReadOnlyCollection<CloudQueue>> ListAllQueuesAsync(IQu
680716
return await (await provider.ListQueuesAsync(null, limit, detailed, cancellationToken)).GetAllPagesAsync(cancellationToken, progress);
681717
}
682718

719+
/// <summary>
720+
/// Gets all existing messages in a queue through a series of asynchronous operations,
721+
/// each of which requests a subset of the available messages.
722+
/// </summary>
723+
/// <param name="provider">The queueing service.</param>
724+
/// <param name="queueName">The queue name.</param>
725+
/// <param name="limit">The maximum number of <see cref="QueuedMessage"/> objects to return from a single task. If this value is <see langword="null"/>, a provider-specific default is used.</param>
726+
/// <param name="echo"><see langword="true"/> to include messages created by the current client; otherwise, <see langword="false"/>.</param>
727+
/// <param name="includeClaimed"><see langword="true"/> to include claimed messages; otherwise <see langword="false"/> to return only unclaimed messages.</param>
728+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that the task will observe.</param>
729+
/// <param name="progress">An optional callback object to receive progress notifications. If this is <see langword="null"/>, no progress notifications are sent.</param>
730+
/// <returns>
731+
/// A <see cref="Task"/> object representing the asynchronous operation. When the operation
732+
/// completes successfully, the <see cref="Task{TResult}.Result"/> property will contain a
733+
/// read-only collection containing the complete set of messages in the queue.
734+
/// </returns>
735+
/// <exception cref="ArgumentNullException">
736+
/// If <paramref name="provider"/> is <see langword="null"/>.
737+
/// <para>-or-</para>
738+
/// <para>If <paramref name="queueName"/> is <see langword="null"/>.</para>
739+
/// </exception>
740+
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
741+
private static async Task<ReadOnlyCollection<QueuedMessage>> ListAllMessagesAsync(IQueueingService provider, QueueName queueName, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken, net.openstack.Core.IProgress<ReadOnlyCollectionPage<QueuedMessage>> progress)
742+
{
743+
if (provider == null)
744+
throw new ArgumentNullException("provider");
745+
if (queueName == null)
746+
throw new ArgumentNullException("queueName");
747+
if (limit <= 0)
748+
throw new ArgumentOutOfRangeException("limit");
749+
750+
return await (await provider.ListMessagesAsync(queueName, null, limit, echo, includeClaimed, cancellationToken)).GetAllPagesAsync(cancellationToken, progress);
751+
}
752+
683753
private TimeSpan TestTimeout(TimeSpan timeout)
684754
{
685755
if (Debugger.IsAttached)

src/testing/integration/Providers/Rackspace/UserQueuesTestsSynchronous.cs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Diagnostics;
77
using System.Linq;
88
using Microsoft.VisualStudio.TestTools.UnitTesting;
9+
using net.openstack.Core.Collections;
910
using net.openstack.Core.Domain;
1011
using net.openstack.Core.Domain.Queues;
1112
using net.openstack.Core.Providers;
@@ -234,7 +235,7 @@ public void SynchronousTestQueueStatistics()
234235
[TestMethod]
235236
[TestCategory(TestCategories.User)]
236237
[TestCategory(TestCategories.QueuesSynchronous)]
237-
public void SynchronousTestListAllQueueMessages()
238+
public void SynchronousTestListAllQueueMessagesWithUpdates()
238239
{
239240
IQueueingService provider = CreateProvider();
240241
QueueName queueName = CreateRandomQueueName();
@@ -280,6 +281,36 @@ public void SynchronousTestListAllQueueMessages()
280281
provider.DeleteQueue(queueName);
281282
}
282283

284+
[TestMethod]
285+
[TestCategory(TestCategories.User)]
286+
[TestCategory(TestCategories.QueuesSynchronous)]
287+
public void SynchronousTestListAllQueueMessages()
288+
{
289+
IQueueingService provider = CreateProvider();
290+
QueueName queueName = CreateRandomQueueName();
291+
292+
provider.CreateQueue(queueName);
293+
294+
for (int i = 0; i < 28; i++)
295+
{
296+
provider.PostMessages(queueName, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(i, "Some Message " + i)));
297+
}
298+
299+
HashSet<int> locatedMessages = new HashSet<int>();
300+
301+
ReadOnlyCollection<QueuedMessage> messages = ListAllMessages(provider, queueName, null, true, false);
302+
foreach (QueuedMessage message in messages)
303+
Assert.IsTrue(locatedMessages.Add(message.Body.ToObject<SampleMetadata>().ValueA), "Received the same message more than once.");
304+
305+
Assert.AreEqual(28, locatedMessages.Count);
306+
for (int i = 0; i < 28; i++)
307+
{
308+
Assert.IsTrue(locatedMessages.Contains(i), "The message listing did not include message '{0}', which was in the queue when the listing started and still in it afterwards.", i);
309+
}
310+
311+
provider.DeleteQueue(queueName);
312+
}
313+
283314
/// <summary>
284315
/// Tests the queueing service message functionality by creating two queues
285316
/// and two sub-processes and using them for two-way communication.
@@ -614,14 +645,49 @@ public void SynchronousTestQueueClaims()
614645
/// <param name="limit">The maximum number of <see cref="CloudQueue"/> to return from a single call to <see cref="QueueingServiceExtensions.ListQueues"/>. If this value is <see langword="null"/>, a provider-specific default is used.</param>
615646
/// <param name="detailed"><see langword="true"/> to return detailed information for each queue; otherwise, <see langword="false"/>.</param>
616647
/// <returns>A collection of <see cref="CloudQueue"/> objects describing the available queues.</returns>
648+
/// <exception cref="ArgumentNullException">If <paramref name="provider"/> is <see langword="null"/>.</exception>
649+
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
650+
/// <exception cref="WebException">If the REST request does not return successfully.</exception>
617651
private static ReadOnlyCollection<CloudQueue> ListAllQueues(IQueueingService provider, int? limit, bool detailed)
618652
{
653+
if (provider == null)
654+
throw new ArgumentNullException("provider");
619655
if (limit <= 0)
620656
throw new ArgumentOutOfRangeException("limit");
621657

622658
return provider.ListQueues(null, limit, detailed).GetAllPages();
623659
}
624660

661+
/// <summary>
662+
/// Gets all existing messages in a queue through a series of synchronous operations,
663+
/// each of which requests a subset of the available messages.
664+
/// </summary>
665+
/// <param name="provider">The queueing service.</param>
666+
/// <param name="queueName">The queue name.</param>
667+
/// <param name="limit">The maximum number of <see cref="QueuedMessage"/> objects to return from a single task. If this value is <see langword="null"/>, a provider-specific default is used.</param>
668+
/// <param name="echo"><see langword="true"/> to include messages created by the current client; otherwise, <see langword="false"/>.</param>
669+
/// <param name="includeClaimed"><see langword="true"/> to include claimed messages; otherwise <see langword="false"/> to return only unclaimed messages.</param>
670+
/// <param name="progress">An optional callback object to receive progress notifications. If this is <see langword="null"/>, no progress notifications are sent.</param>
671+
/// <returns>A collection of <see cref="CloudQueue"/> objects describing the available queues.</returns>
672+
/// <exception cref="ArgumentNullException">
673+
/// If <paramref name="provider"/> is <see langword="null"/>.
674+
/// <para>-or-</para>
675+
/// <para>If <paramref name="queueName"/> is <see langword="null"/>.</para>
676+
/// </exception>
677+
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
678+
/// <exception cref="WebException">If the REST request does not return successfully.</exception>
679+
private static ReadOnlyCollection<QueuedMessage> ListAllMessages(IQueueingService provider, QueueName queueName, int? limit, bool echo, bool includeClaimed)
680+
{
681+
if (provider == null)
682+
throw new ArgumentNullException("provider");
683+
if (queueName == null)
684+
throw new ArgumentNullException("queueName");
685+
if (limit <= 0)
686+
throw new ArgumentOutOfRangeException("limit");
687+
688+
return provider.ListMessages(queueName, null, limit, echo, includeClaimed).GetAllPages();
689+
}
690+
625691
/// <summary>
626692
/// Creates a random queue name with the proper prefix for integration testing.
627693
/// </summary>

0 commit comments

Comments
 (0)