-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathPostgresMissingEventRepository.cs
More file actions
107 lines (95 loc) · 4.42 KB
/
PostgresMissingEventRepository.cs
File metadata and controls
107 lines (95 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using PollingOutboxPublisher.ConfigOptions;
using PollingOutboxPublisher.Database.Providers.Interfaces;
using PollingOutboxPublisher.Database.Repositories.Interfaces;
using PollingOutboxPublisher.Exceptions;
using PollingOutboxPublisher.Extensions;
using PollingOutboxPublisher.Models;
namespace PollingOutboxPublisher.Database.Repositories.Postgres
{
[ExcludeFromCodeCoverage]
public class PostgresMissingEventRepository : IMissingEventRepository
{
private readonly string _tableName;
private readonly ILogger<PostgresMissingEventRepository> _logger;
private readonly IPostgresConnectionProvider _postgresConnectionProvider;
public PostgresMissingEventRepository(ILogger<PostgresMissingEventRepository> logger, IPostgresConnectionProvider postgresConnectionProvider, IOptions<DataStoreSettings> tableNameSettings)
{
_logger = logger;
_postgresConnectionProvider = postgresConnectionProvider;
_tableName = tableNameSettings.Value.MissingEvents ??
throw new MissingConfigurationException(nameof(tableNameSettings.Value.MissingEvents));
}
public async Task InsertAsync(MissingEvent missingEvent)
{
using var connection = _postgresConnectionProvider.CreateConnection();
await connection.ExecuteAsync(InsertStatement(), missingEvent);
}
public async Task UpdateRetryCountAndExceptionThrownAsync(MissingEvent missingEvent)
{
using var connection = _postgresConnectionProvider.CreateConnection();
await connection.ExecuteAsync(UpdateRetryCountAndExceptionThrownAsyncStatement(), missingEvent);
}
public async Task<List<MissingEvent>> GetMissingEventsAsync(int batchCount)
{
using var connection = _postgresConnectionProvider.CreateConnection();
return (List<MissingEvent>)await connection.QueryAsync<MissingEvent>(
GetMissingEventsStatement(), new { BatchCount = batchCount });
}
public async Task IncrementRetryCountAsync(IEnumerable<long> idList)
{
using var connection = _postgresConnectionProvider.CreateConnection();
await connection.ExecuteAsync(IncrementRetryCountOfListStatement(), new { IdList = idList });
}
public async Task DeleteMissingEventsAsync(IReadOnlyCollection<long> idList)
{
using var connection = _postgresConnectionProvider.CreateConnection();
await connection.ExecuteAsyncWithRetry(DeleteMissingEventsStatement(), _logger, new { IdList = idList });
}
private string DeleteMissingEventsStatement()
{
return $"""
DELETE FROM {_tableName}
WHERE id = ANY(@IdList)
""";
}
private string IncrementRetryCountOfListStatement()
{
return $"""
UPDATE {_tableName}
SET retry_count=retry_count+1
WHERE id = ANY(@IdList)
""";
}
private string GetMissingEventsStatement()
{
return $"""
SELECT id,
missed_date AS MissedDate,
retry_count AS RetryCount,
exception_thrown AS ExceptionThrown FROM {_tableName} LIMIT (@BatchCount)
""";
}
private string UpdateRetryCountAndExceptionThrownAsyncStatement()
{
return $"""
UPDATE {_tableName}
SET retry_count= @RetryCount, exception_thrown = @ExceptionThrown
WHERE id = @Id
""";
}
private string InsertStatement()
{
return $"""
INSERT INTO {_tableName}
(id, missed_date, retry_count, exception_thrown)
VALUES (@Id, @MissedDate, @RetryCount, @ExceptionThrown)
""";
}
}
}