Skip to content

Commit 657cdb8

Browse files
committed
Merge pull request #312 from sharwell/QueuedMessageList
Updated QueuedMessageList to extend ReadOnlyCollectionPage
2 parents 1175e8c + ea34baa commit 657cdb8

9 files changed

Lines changed: 140 additions & 143 deletions

File tree

src/corelib/Core/Domain/Queues/QueuedMessageList.cs

Lines changed: 24 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -2,110 +2,50 @@
22
{
33
using System;
44
using System.Collections.Generic;
5-
using System.Collections.ObjectModel;
6-
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using net.openstack.Core.Collections;
78
using net.openstack.Core.Providers;
8-
using Newtonsoft.Json;
99

1010
/// <summary>
11-
/// Represents a collection of messages stored in a queue in the <see cref="IQueueingService"/>.
11+
/// This class extends the <see cref="ReadOnlyCollectionPage{T}"/> class
12+
/// to provide access to the opaque marker used for paginating messages
13+
/// in the <see cref="IQueueingService"/> (via the <see cref="NextPageId"/>
14+
/// property.
1215
/// </summary>
1316
/// <threadsafety static="true" instance="false"/>
1417
/// <preliminary/>
15-
[JsonObject(MemberSerialization.OptIn)]
16-
public class QueuedMessageList
18+
public class QueuedMessageList : BasicReadOnlyCollectionPage<QueuedMessage>
1719
{
1820
/// <summary>
19-
/// This is the backing field for the <see cref="Empty"/> property.
21+
/// This is the backing field for the <see cref="NextPageId"/> property.
2022
/// </summary>
21-
private static readonly QueuedMessageList _empty = new QueuedMessageList(Enumerable.Empty<QueuedMessage>(), Enumerable.Empty<Link>());
22-
23-
/// <summary>
24-
/// This is the backing field for the <see cref="Links"/> property.
25-
/// </summary>
26-
[JsonProperty("links")]
27-
private Link[] _links;
28-
29-
/// <summary>
30-
/// This is the backing field for the <see cref="Messages"/> property.
31-
/// </summary>
32-
[JsonProperty("messages")]
33-
private QueuedMessage[] _messages;
34-
35-
/// <summary>
36-
/// Initializes a new instance of the <see cref="QueuedMessageList"/> class during
37-
/// JSON deserialization.
38-
/// </summary>
39-
[JsonConstructor]
40-
protected QueuedMessageList()
41-
{
42-
}
43-
23+
private QueuedMessageListId _nextPageId;
24+
4425
/// <summary>
45-
/// Initializes a new instance of the <see cref="QueuedMessageList"/> class.
26+
/// Initializes a new instance of the <see cref="BasicReadOnlyCollectionPage{T}"/> class
27+
/// that is a read-only wrapper around the specified list.
4628
/// </summary>
47-
/// <param name="messages">A collection of <see cref="QueuedMessage"/> objects describing the messages in a queue.</param>
48-
/// <param name="links">A collection of <see cref="Link"/> objects describing resources related to the list of messages.</param>
49-
/// <exception cref="ArgumentException">
50-
/// If <paramref name="messages"/> contains any <see langword="null"/> values.
51-
/// <para>-or-</para>
52-
/// <para>If <paramref name="links"/> contains any <see langword="null"/> values.</para>
29+
/// <param name="list">The list to wrap.</param>
30+
/// <param name="getNextPageAsync">A function that returns a <see cref="Task{TResult}"/> representing the asynchronous operation to get the next page of items in the collection. If specified, this function implements <see cref="BasicReadOnlyCollectionPage{T}.GetNextPageAsync"/>. If the value is <see langword="null"/>, then <see cref="BasicReadOnlyCollectionPage{T}.CanHaveNextPage"/> will return <see langword="false"/>.</param>
31+
/// <param name="nextPageId">The identifier of the next page in the message list.</param>
32+
/// <exception cref="ArgumentNullException">
33+
/// If <paramref name="list"/> is <see langword="null"/>.
5334
/// </exception>
54-
public QueuedMessageList(IEnumerable<QueuedMessage> messages, IEnumerable<Link> links)
35+
public QueuedMessageList(IList<QueuedMessage> list, Func<CancellationToken, Task<ReadOnlyCollectionPage<QueuedMessage>>> getNextPageAsync, QueuedMessageListId nextPageId)
36+
: base(list, getNextPageAsync)
5537
{
56-
if (messages != null)
57-
{
58-
_messages = messages.ToArray();
59-
if (_messages.Contains(null))
60-
throw new ArgumentException("messages cannot contain any null values", "messages");
61-
}
62-
63-
if (links != null)
64-
{
65-
_links = links.ToArray();
66-
if (_links.Contains(null))
67-
throw new ArgumentException("links cannot contain any null values", "links");
68-
}
38+
_nextPageId = nextPageId;
6939
}
7040

7141
/// <summary>
72-
/// Gets an empty list of messages, which is not specific to any queue.
42+
/// Gets the identifier of the next page of the message list.
7343
/// </summary>
74-
public static QueuedMessageList Empty
44+
public QueuedMessageListId NextPageId
7545
{
7646
get
7747
{
78-
return _empty;
79-
}
80-
}
81-
82-
/// <summary>
83-
/// Gets a collection of <see cref="Link"/> objects describing resources related
84-
/// to this list of messages.
85-
/// </summary>
86-
public ReadOnlyCollection<Link> Links
87-
{
88-
get
89-
{
90-
if (_links == null)
91-
return null;
92-
93-
return new ReadOnlyCollection<Link>(_links);
94-
}
95-
}
96-
97-
/// <summary>
98-
/// Gets a list of <see cref="QueuedMessage"/> objects describing the messages in
99-
/// the queue.
100-
/// </summary>
101-
public ReadOnlyCollection<QueuedMessage> Messages
102-
{
103-
get
104-
{
105-
if (_messages == null)
106-
return null;
107-
108-
return new ReadOnlyCollection<QueuedMessage>(_messages);
48+
return _nextPageId;
10949
}
11050
}
11151
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
namespace net.openstack.Core.Domain.Queues
2+
{
3+
using System;
4+
using net.openstack.Core;
5+
using net.openstack.Core.Providers;
6+
using Newtonsoft.Json;
7+
8+
/// <summary>
9+
/// Represents the unique identifier of a message list page in the <see cref="IQueueingService"/>.
10+
/// </summary>
11+
/// <seealso cref="IQueueingService.ListMessagesAsync"/>
12+
/// <seealso cref="QueuedMessageList.NextPageId"/>
13+
/// <threadsafety static="true" instance="false"/>
14+
/// <preliminary/>
15+
[JsonConverter(typeof(QueuedMessageListId.Converter))]
16+
public sealed class QueuedMessageListId : ResourceIdentifier<QueuedMessageListId>
17+
{
18+
/// <summary>
19+
/// Initializes a new instance of the <see cref="QueuedMessageListId"/> class
20+
/// with the specified identifier value.
21+
/// </summary>
22+
/// <param name="id">The identifier value.</param>
23+
/// <exception cref="ArgumentNullException">If <paramref name="id"/> is <c>null</c>.</exception>
24+
/// <exception cref="ArgumentException">If <paramref name="id"/> is empty.</exception>
25+
public QueuedMessageListId(string id)
26+
: base(id)
27+
{
28+
}
29+
30+
/// <summary>
31+
/// Provides support for serializing and deserializing <see cref="QueuedMessageListId"/>
32+
/// objects to JSON string values.
33+
/// </summary>
34+
/// <threadsafety static="true" instance="false"/>
35+
private sealed class Converter : ConverterBase
36+
{
37+
/// <inheritdoc/>
38+
protected override QueuedMessageListId FromValue(string id)
39+
{
40+
return new QueuedMessageListId(id);
41+
}
42+
}
43+
}
44+
}

src/corelib/Core/Providers/IQueueingService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ Task<T> GetQueueMetadataAsync<T>(QueueName queueName, CancellationToken cancella
160160
/// Gets a list of messages currently in a queue.
161161
/// </summary>
162162
/// <param name="queueName">The queue name.</param>
163-
/// <param name="marker">The <see cref="QueuedMessageList"/> object returned by a previous call to <see cref="ListMessagesAsync"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
163+
/// <param name="marker">The identifier of the message list page to return. This is obtained from <see cref="QueuedMessageList.NextPageId"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
164164
/// <param name="limit">The maximum number of messages to return. If this value is <see langword="null"/>, a provider-specific default value is used.</param>
165165
/// <param name="echo"><see langword="true"/> to include messages created by the current client; otherwise, <see langword="false"/>.</param>
166166
/// <param name="includeClaimed"><see langword="true"/> to include claimed messages; otherwise <see langword="false"/> to return only unclaimed messages.</param>
@@ -170,7 +170,7 @@ Task<T> GetQueueMetadataAsync<T>(QueueName queueName, CancellationToken cancella
170170
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
171171
/// <exception cref="WebException">If the REST request does not return successfully.</exception>
172172
/// <seealso href="https://wiki.openstack.org/w/index.php?title=Marconi/specs/api/v1#List_Messages">List Messages (OpenStack Marconi API v1 Blueprint)</seealso>
173-
Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken);
173+
Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken);
174174

175175
/// <summary>
176176
/// Gets detailed information about a specific queued message.

src/corelib/Core/Synchronous/QueueingServiceExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS
338338
/// </summary>
339339
/// <param name="queueingService">The queueing service instance.</param>
340340
/// <param name="queueName">The queue name.</param>
341-
/// <param name="marker">The <see cref="QueuedMessageList"/> object returned by a previous call to <see cref="IQueueingService.ListMessagesAsync"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
341+
/// <param name="marker">The identifier of the message list page to return. This is obtained from <see cref="QueuedMessageList.NextPageId"/>. If this value is <see langword="null"/>, the list starts at the beginning.</param>
342342
/// <param name="limit">The maximum number of messages to return. If this value is <see langword="null"/>, a provider-specific default value is used.</param>
343343
/// <param name="echo"><see langword="true"/> to include messages created by the current client; otherwise, <see langword="false"/>.</param>
344344
/// <param name="includeClaimed"><see langword="true"/> to include claimed messages; otherwise <see langword="false"/> to return only unclaimed messages.</param>
@@ -348,7 +348,7 @@ public static QueueStatistics GetQueueStatistics(this IQueueingService queueingS
348348
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="limit"/> is less than or equal to 0.</exception>
349349
/// <exception cref="WebException">If the REST request does not return successfully.</exception>
350350
/// <seealso href="https://wiki.openstack.org/w/index.php?title=Marconi/specs/api/v1#List_Messages">List Messages (OpenStack Marconi API v1 Blueprint)</seealso>
351-
public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed)
351+
public static QueuedMessageList ListMessages(this IQueueingService queueingService, QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed)
352352
{
353353
if (queueingService == null)
354354
throw new ArgumentNullException("queueingService");

src/corelib/Providers/Rackspace/CloudQueuesProvider.cs

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ public Task<QueueStatistics> GetQueueStatisticsAsync(QueueName queueName, Cancel
353353
}
354354

355355
/// <inheritdoc/>
356-
public Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageList marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken)
356+
public Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMessageListId marker, int? limit, bool echo, bool includeClaimed, CancellationToken cancellationToken)
357357
{
358358
if (queueName == null)
359359
throw new ArgumentNullException("queueName");
@@ -369,49 +369,60 @@ public Task<QueuedMessageList> ListMessagesAsync(QueueName queueName, QueuedMess
369369
{ "echo", echo.ToString() },
370370
{ "include_claimed", includeClaimed.ToString() }
371371
};
372-
373372
if (marker != null)
374-
{
375-
Link nextLink = null;
376-
if (marker.Links != null)
373+
parameters["marker"] = marker.Value;
374+
if (limit != null)
375+
parameters["limit"] = limit.ToString();
376+
377+
Func<Task<Tuple<IdentityToken, Uri>>, HttpWebRequest> prepareRequest =
378+
PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters);
379+
380+
Func<Task<HttpWebRequest>, Task<ListCloudQueueMessagesResponse>> requestResource =
381+
GetResponseAsyncFunc<ListCloudQueueMessagesResponse>(cancellationToken);
382+
383+
Func<Task<ListCloudQueueMessagesResponse>, QueuedMessageList> resultSelector =
384+
task =>
377385
{
378-
foreach (Link link in marker.Links)
386+
ReadOnlyCollection<QueuedMessage> messages = null;
387+
if (task.Result != null)
388+
messages = task.Result.Messages;
389+
390+
QueuedMessageListId nextMarker = null;
391+
if (task.Result != null && task.Result.Links != null)
379392
{
380-
if (link.Rel == "next")
393+
Link nextLink = task.Result.Links.FirstOrDefault(i => string.Equals(i.Rel, "next", StringComparison.OrdinalIgnoreCase));
394+
if (nextLink != null)
381395
{
382-
nextLink = link;
383-
break;
396+
Uri baseUri = new Uri("https://example.com");
397+
Uri absoluteUri;
398+
if (nextLink.Href.StartsWith("/v1"))
399+
absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length));
400+
else
401+
absoluteUri = new Uri(baseUri, nextLink.Href);
402+
403+
UriTemplateMatch match = template.Match(baseUri, absoluteUri);
404+
if (!string.IsNullOrEmpty(match.BoundVariables["marker"]))
405+
nextMarker = new QueuedMessageListId(match.BoundVariables["marker"]);
384406
}
385407
}
386-
}
387-
388-
if (nextLink == null)
389-
{
390-
return InternalTaskExtensions.CompletedTask(QueuedMessageList.Empty);
391-
}
392-
393-
Uri baseUri = new Uri("https://example.com");
394-
Uri absoluteUri;
395-
if (nextLink.Href.StartsWith("/v1"))
396-
absoluteUri = new Uri(baseUri, nextLink.Href.Substring("/v1".Length));
397-
else
398-
absoluteUri = new Uri(baseUri, nextLink.Href);
399-
400-
UriTemplateMatch match = template.Match(baseUri, absoluteUri);
401-
parameters.Add("marker", match.BoundVariables["marker"]);
402-
}
403-
404-
if (limit != null)
405-
parameters["limit"] = limit.ToString();
406408

407-
Func<Task<Tuple<IdentityToken, Uri>>, HttpWebRequest> prepareRequest =
408-
PrepareRequestAsyncFunc(HttpMethod.GET, template, parameters);
409+
if (messages == null || messages.Count == 0)
410+
{
411+
// use the same marker again
412+
messages = messages ?? new ReadOnlyCollection<QueuedMessage>(new QueuedMessage[0]);
413+
nextMarker = marker;
414+
}
409415

410-
Func<Task<HttpWebRequest>, Task<QueuedMessageList>> requestResource =
411-
GetResponseAsyncFunc<QueuedMessageList>(cancellationToken);
416+
Func<CancellationToken, Task<ReadOnlyCollectionPage<QueuedMessage>>> getNextPageAsync = null;
417+
if (nextMarker != null || messages.Count == 0)
418+
{
419+
getNextPageAsync =
420+
nextCancellationToken => ListMessagesAsync(queueName, nextMarker, limit, echo, includeClaimed, nextCancellationToken)
421+
.ContinueWith(t => (ReadOnlyCollectionPage<QueuedMessage>)t.Result, TaskContinuationOptions.ExecuteSynchronously);
422+
}
412423

413-
Func<Task<QueuedMessageList>, QueuedMessageList> resultSelector =
414-
task => (task.Result != null && task.Result.Messages != null ? task.Result : null) ?? QueuedMessageList.Empty;
424+
return new QueuedMessageList(messages, getNextPageAsync, nextMarker);
425+
};
415426

416427
return AuthenticateServiceAsync(cancellationToken)
417428
.ContinueWith(prepareRequest)

src/corelib/corelib.v3.5.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
<Compile Include="Core\Domain\Queues\NamespaceDoc.cs" />
104104
<Compile Include="Core\Domain\Queues\QueuedMessage.cs" />
105105
<Compile Include="Core\Domain\Queues\QueuedMessageList.cs" />
106+
<Compile Include="Core\Domain\Queues\QueuedMessageListId.cs" />
106107
<Compile Include="Core\Domain\Queues\QueueMessagesStatistics.cs" />
107108
<Compile Include="Core\Domain\Queues\QueueName.cs" />
108109
<Compile Include="Core\Domain\Queues\QueueStatistics.cs" />

src/corelib/corelib.v4.0.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
<Compile Include="Core\Domain\Queues\NamespaceDoc.cs" />
9494
<Compile Include="Core\Domain\Queues\QueuedMessage.cs" />
9595
<Compile Include="Core\Domain\Queues\QueuedMessageList.cs" />
96+
<Compile Include="Core\Domain\Queues\QueuedMessageListId.cs" />
9697
<Compile Include="Core\Domain\Queues\QueueMessagesStatistics.cs" />
9798
<Compile Include="Core\Domain\Queues\QueueName.cs" />
9899
<Compile Include="Core\Domain\Queues\QueueStatistics.cs" />

0 commit comments

Comments
 (0)