forked from SciSharp/BotSharp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMembaseGraphDb.cs
More file actions
91 lines (77 loc) · 3.04 KB
/
MembaseGraphDb.cs
File metadata and controls
91 lines (77 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
using Polly;
using Polly.Timeout;
using Refit;
namespace BotSharp.Plugin.Membase.GraphDb;
public partial class MembaseGraphDb : IGraphDb
{
private readonly IServiceProvider _services;
private readonly ILogger<MembaseGraphDb> _logger;
private readonly IMembaseApi _membaseApi;
public MembaseGraphDb(
IServiceProvider services,
ILogger<MembaseGraphDb> logger,
IMembaseApi membaseApi)
{
_services = services;
_logger = logger;
_membaseApi = membaseApi;
}
public string Provider => "membase";
private const int RetryCount = 3;
public async Task<GraphQueryResult> ExecuteQueryAsync(string query, GraphQueryExecuteOptions? options = null)
{
if (string.IsNullOrEmpty(options?.GraphId))
{
throw new ArgumentException($"Please provide a valid {Provider} graph id.");
}
var args = options?.Arguments ?? new();
var argLogs = JsonSerializer.Serialize(args, BotSharpOptions.defaultJsonOptions);
try
{
var retryPolicy = BuildRetryPolicy();
var response = await retryPolicy.ExecuteAsync(() =>
_membaseApi.CypherQueryAsync(options!.GraphId, new CypherQueryRequest
{
Query = query,
Parameters = args
}));
return new GraphQueryResult
{
Keys = response.Columns,
Values = response.Data,
Result = JsonSerializer.Serialize(response.Data)
};
}
catch (ApiException ex)
{
_logger.LogError($"Error when executing query in {Provider} graph db:\r\n{ex.Content}\r\n{query}\r\n{argLogs}");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error when executing query in {Provider} graph db. (Query: {query}), (Argments: \r\n{argLogs})");
throw;
}
}
private AsyncPolicy BuildRetryPolicy()
{
var settings = _services.GetRequiredService<MembaseSettings>();
var timeoutSeconds = (double)settings.TimeoutSecond / RetryCount;
var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromSeconds(timeoutSeconds));
var retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<TaskCanceledException>()
.Or<TimeoutRejectedException>()
.Or<ApiException>(ex => ex.StatusCode == HttpStatusCode.ServiceUnavailable)
.WaitAndRetryAsync(
retryCount: RetryCount,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (ex, timespan, retryAttempt, _) =>
{
_logger.LogWarning(ex,
"CypherQueryAsync retry {RetryAttempt}/{MaxRetries} after {Delay}s. Exception: {Message}",
retryAttempt, RetryCount, timespan.TotalSeconds, ex.Message);
});
return Policy.WrapAsync(retryPolicy, timeoutPolicy);
}
}