-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathSharedPointInTimeManager.cs
More file actions
102 lines (87 loc) · 3 KB
/
SharedPointInTimeManager.cs
File metadata and controls
102 lines (87 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using Elastic.Documentation.Search.Common;
using Microsoft.Extensions.Logging;
namespace Elastic.Documentation.Search;
/// <summary>Singleton manager for a shared Elasticsearch Point In Time (PIT).</summary>
public sealed partial class SharedPointInTimeManager(
ElasticsearchClientAccessor clientAccessor,
ILogger<SharedPointInTimeManager> logger
) : IAsyncDisposable
{
private static readonly TimeSpan KeepAliveDuration = TimeSpan.FromMinutes(5);
public const string PitKeepAlive = "5m";
private readonly SemaphoreSlim _semaphore = new(1, 1);
private string? _pitId;
private DateTimeOffset _expiresAt;
/// <summary>
/// Returns a valid PIT ID, opening a new one if needed.
/// When <paramref name="expiredPitId"/> is supplied and matches the current PIT, it is invalidated first.
/// </summary>
public async Task<string> GetPitIdAsync(Cancel ctx, string? expiredPitId = null)
{
await _semaphore.WaitAsync(ctx);
try
{
if (expiredPitId is not null && _pitId == expiredPitId)
{
LogPitExpired(logger);
_pitId = null;
}
if (_pitId is not null && DateTimeOffset.UtcNow < _expiresAt)
return _pitId;
_pitId = await OpenPit(ctx);
_expiresAt = DateTimeOffset.UtcNow.Add(KeepAliveDuration);
return _pitId;
}
finally
{
_ = _semaphore.Release();
}
}
/// <summary>Bumps the local expiry after a successful search (ES extends the PIT server-side via KeepAlive).</summary>
public void RefreshKeepAlive() => _expiresAt = DateTimeOffset.UtcNow.Add(KeepAliveDuration);
private async Task<string> OpenPit(Cancel ctx)
{
var response = await clientAccessor.Client.OpenPointInTimeAsync(
clientAccessor.SearchIndex,
r => r.KeepAlive(PitKeepAlive),
ctx
);
if (!response.IsValidResponse)
{
throw new InvalidOperationException(
$"Failed to open PIT: {response.ElasticsearchServerError?.Error?.Reason ?? "Unknown"}"
);
}
LogPitOpened(logger, response.Id);
return response.Id;
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
try
{
if (_pitId is not null)
{
_ = await clientAccessor.Client.ClosePointInTimeAsync(r => r.Id(_pitId));
LogPitClosed(logger, _pitId);
}
}
catch (OperationCanceledException ex)
{
logger.LogWarning(ex, "PIT close operation was canceled during shutdown for {PitId}", _pitId);
}
finally
{
_semaphore.Dispose();
}
}
[LoggerMessage(Level = LogLevel.Debug, Message = "Opened new shared PIT: {PitId}")]
private static partial void LogPitOpened(ILogger logger, string pitId);
[LoggerMessage(Level = LogLevel.Warning, Message = "Shared PIT expired or not found, will open a new one")]
private static partial void LogPitExpired(ILogger logger);
[LoggerMessage(Level = LogLevel.Debug, Message = "Closed shared PIT: {PitId}")]
private static partial void LogPitClosed(ILogger logger, string pitId);
}