-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathCouchbaseMissingEventRepository.cs
More file actions
123 lines (103 loc) · 4.02 KB
/
CouchbaseMissingEventRepository.cs
File metadata and controls
123 lines (103 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using Couchbase;
using Couchbase.KeyValue;
using Couchbase.Query;
using Microsoft.Extensions.Options;
using PollingOutboxPublisher.ConfigOptions;
using PollingOutboxPublisher.Database.Providers.Interfaces;
using PollingOutboxPublisher.Database.Repositories.Interfaces;
using PollingOutboxPublisher.Exceptions;
using PollingOutboxPublisher.Models;
namespace PollingOutboxPublisher.Database.Repositories.Couchbase;
[ExcludeFromCodeCoverage]
public class CouchbaseMissingEventRepository : IMissingEventRepository
{
private readonly string _tableName;
private IScope _scope;
private ICouchbaseCollection _collection;
private readonly ICouchbaseScopeProvider _couchbaseScopeProvider;
public CouchbaseMissingEventRepository(ICouchbaseScopeProvider couchbaseScopeProvider,
IOptions<DataStoreSettings> dataStoreSettings)
{
_couchbaseScopeProvider = couchbaseScopeProvider;
_tableName = dataStoreSettings.Value.MissingEvents ??
throw new MissingConfigurationException(nameof(dataStoreSettings.Value.MissingEvents));
}
private async Task InitializeScopeAsync()
{
_scope ??= await _couchbaseScopeProvider.GetScopeAsync();
}
private async Task InitializeCollectionAsync()
{
await InitializeScopeAsync();
_collection ??= await _scope.CollectionAsync(_tableName);
}
public async Task InsertAsync(MissingEvent missingEvent)
{
await InitializeCollectionAsync();
await _collection.UpsertAsync(missingEvent.Id.ToString(), missingEvent);
}
public async Task UpdateRetryCountAndExceptionThrownAsync(MissingEvent missingEvent)
{
await InitializeCollectionAsync();
await _collection.UpsertAsync(missingEvent.Id.ToString(), missingEvent);
}
public async Task<List<MissingEvent>> GetMissingEventsAsync(int batchCount)
{
await InitializeScopeAsync();
var queryResult = await _scope.QueryAsync<MissingEvent>(GetMissingEventsStatement(), options => options
.Parameter("batchCount", batchCount)
.Readonly(true));
if (queryResult.MetaData!.Status != QueryStatus.Success)
{
throw new CouchbaseQueryFailedException(queryResult.Errors.ToString());
}
var missingEvents = queryResult.Rows;
return await missingEvents.ToListAsync();
}
public async Task IncrementRetryCountAsync(IEnumerable<long> ids)
{
await InitializeScopeAsync();
var tasks = new List<Task<IQueryResult<dynamic>>>();
foreach (var id in ids)
{
var task = _scope.QueryAsync<dynamic>(IncrementRetryCount(), options => options
.Parameter("id", id));
tasks.Add(task);
}
await Task.WhenAll(tasks);
ThrowExceptionsForFailedQueries(tasks);
}
private static void ThrowExceptionsForFailedQueries(List<Task<IQueryResult<dynamic>>> tasks)
{
var failedTasks = tasks.Where(task => task.Result.MetaData!.Status != QueryStatus.Success).ToList();
if (failedTasks.Count > 0)
{
var errors = failedTasks.Select(task => task.Result.Errors.ToString()).ToList();
var joinedErrors = string.Join(" | ", errors);
throw new CouchbaseQueryFailedException(joinedErrors);
}
}
public async Task DeleteMissingEventsAsync(IReadOnlyCollection<long> ids)
{
await InitializeCollectionAsync();
var tasks = new List<Task>();
foreach (var id in ids)
{
var task = _collection.RemoveAsync(id.ToString());
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
private string IncrementRetryCount()
{
return $"update `{_tableName}` set retryCount = retryCount + 1 where meta().id = $id";
}
private string GetMissingEventsStatement()
{
return $"select t.* from `{_tableName}` t limit $batchCount";
}
}