|
| 1 | +// Licensed to Elasticsearch B.V under one or more agreements. |
| 2 | +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. |
| 3 | +// See the LICENSE file in the project root for more information |
| 4 | + |
| 5 | +using System.Net; |
| 6 | +using System.Text.Json; |
| 7 | +using Microsoft.Extensions.Logging; |
| 8 | + |
| 9 | +namespace Elastic.Documentation.Configuration.ReleaseNotes; |
| 10 | + |
| 11 | +/// <summary> |
| 12 | +/// One downloaded changelog entry: the registry file name and its raw YAML content. |
| 13 | +/// </summary> |
| 14 | +public readonly record struct CdnChangelogEntry(string FileName, string Content); |
| 15 | + |
| 16 | +/// <summary> |
| 17 | +/// Fetches the individual (scrubbed) changelog entries for a single product from the public CDN, for |
| 18 | +/// the <c>changelog bundle</c> command when sourcing entries from S3 rather than a local folder. It |
| 19 | +/// reads <c>{base}/{product}/changelog/registry.json</c> to enumerate entries and downloads each |
| 20 | +/// <c>{base}/{product}/changelog/{file}</c> as raw YAML; the bundle command then applies its usual |
| 21 | +/// filter (products / prs / issues) to the downloaded set. |
| 22 | +/// </summary> |
| 23 | +/// <remarks> |
| 24 | +/// <para> |
| 25 | +/// A registry that cannot be fetched or parsed is a hard error (the caller gets an empty list and an |
| 26 | +/// emitted error). An individual entry that the registry lists but the CDN does not yet serve is |
| 27 | +/// retried a few times with short backoff (and cache-busting, to defeat any CloudFront negative-cache) |
| 28 | +/// to ride out the brief upload→scrub→propagate window. If it still cannot be fetched after the retry |
| 29 | +/// budget it is escalated to an error, not skipped: the registry asserts the entry exists (uploads |
| 30 | +/// never prune) and scrubbing is sub-second, so a persistent miss is a real pipeline problem and |
| 31 | +/// silently shipping an incomplete release bundle is worse than failing the run. |
| 32 | +/// </para> |
| 33 | +/// </remarks> |
| 34 | +public sealed class CdnChangelogEntryFetcher : IDisposable |
| 35 | +{ |
| 36 | + private const int SupportedSchemaVersion = 1; |
| 37 | + |
| 38 | + /// <summary>Total GET attempts per entry (1 initial + retries). ~3.5s budget at the default backoff.</summary> |
| 39 | + private const int DefaultMaxAttempts = 4; |
| 40 | + private const int BaseRetryDelayMs = 500; |
| 41 | + private const int MaxRetryDelayMs = 2000; |
| 42 | + |
| 43 | + /// <summary> |
| 44 | + /// Bounds an individual registry/entry HTTP request so a stalled CDN connection cannot hang a bundle run. |
| 45 | + /// </summary> |
| 46 | + private static readonly TimeSpan FetchTimeout = TimeSpan.FromSeconds(30); |
| 47 | + |
| 48 | + /// <summary> |
| 49 | + /// Process-wide client shared by every fetcher built for the production (no injected handler) path. |
| 50 | + /// <see cref="HttpClient"/> is thread-safe and intended to be long-lived; a single static instance avoids |
| 51 | + /// leaking a socket handle per fetch, and <see cref="SocketsHttpHandler.PooledConnectionLifetime"/> |
| 52 | + /// bounds DNS staleness. It is intentionally never disposed — it lives for the lifetime of the process. |
| 53 | + /// </summary> |
| 54 | + private static readonly HttpClient SharedHttpClient = new( |
| 55 | + new SocketsHttpHandler |
| 56 | + { |
| 57 | + AutomaticDecompression = DecompressionMethods.All, |
| 58 | + PooledConnectionLifetime = TimeSpan.FromMinutes(5) |
| 59 | + }) |
| 60 | + { Timeout = FetchTimeout }; |
| 61 | + |
| 62 | + private readonly ILogger _logger; |
| 63 | + private readonly HttpClient _httpClient; |
| 64 | + private readonly int _maxAttempts; |
| 65 | + private readonly Func<TimeSpan, Cancel, Task> _sleep; |
| 66 | + |
| 67 | + /// <summary> |
| 68 | + /// Non-null only when a caller injects its own <see cref="HttpMessageHandler"/> (tests): in that case we |
| 69 | + /// own a per-instance client and must dispose it. On the production path <see cref="_httpClient"/> points |
| 70 | + /// at <see cref="SharedHttpClient"/>, which is never disposed. |
| 71 | + /// </summary> |
| 72 | + private readonly HttpClient? _ownedHttpClient; |
| 73 | + |
| 74 | + public CdnChangelogEntryFetcher( |
| 75 | + ILoggerFactory logFactory, |
| 76 | + HttpMessageHandler? handler = null, |
| 77 | + int maxAttempts = DefaultMaxAttempts, |
| 78 | + Func<TimeSpan, Cancel, Task>? sleep = null) |
| 79 | + { |
| 80 | + _logger = logFactory.CreateLogger<CdnChangelogEntryFetcher>(); |
| 81 | + _maxAttempts = maxAttempts < 1 ? DefaultMaxAttempts : maxAttempts; |
| 82 | + _sleep = sleep ?? DefaultSleepAsync; |
| 83 | + |
| 84 | + if (handler is null) |
| 85 | + _httpClient = SharedHttpClient; |
| 86 | + else |
| 87 | + { |
| 88 | + // disposeHandler: false — the injected handler is owned by the caller (tests), not by us. |
| 89 | + _ownedHttpClient = new HttpClient(handler, disposeHandler: false) { Timeout = FetchTimeout }; |
| 90 | + _httpClient = _ownedHttpClient; |
| 91 | + } |
| 92 | + } |
| 93 | + |
| 94 | + /// <summary> |
| 95 | + /// Downloads the changelog entries for <paramref name="product"/> from the CDN at |
| 96 | + /// <paramref name="baseUri"/>. Returns an empty list after emitting an error when the registry cannot |
| 97 | + /// be read or when a registry-listed entry cannot be fetched within the retry budget. Entries are |
| 98 | + /// returned in registry order; the caller owns filtering and de-duplication. |
| 99 | + /// </summary> |
| 100 | + public async Task<IReadOnlyList<CdnChangelogEntry>> FetchAsync( |
| 101 | + Uri baseUri, |
| 102 | + string product, |
| 103 | + Action<string> emitError, |
| 104 | + Action<string> emitWarning, |
| 105 | + Cancel ctx) |
| 106 | + { |
| 107 | + var registryUri = Combine(baseUri, product, "changelog", "registry.json"); |
| 108 | + |
| 109 | + ChangelogRegistry? registry; |
| 110 | + try |
| 111 | + { |
| 112 | + registry = await FetchRegistryAsync(registryUri, ctx).ConfigureAwait(false); |
| 113 | + } |
| 114 | + catch (Exception ex) when (ex is not OperationCanceledException) |
| 115 | + { |
| 116 | + emitError($"Could not fetch changelog entry registry for product '{product}' from {registryUri}: {ex.Message}"); |
| 117 | + return []; |
| 118 | + } |
| 119 | + |
| 120 | + if (registry is null) |
| 121 | + { |
| 122 | + emitError($"Changelog entry registry for product '{product}' at {registryUri} was empty or unparseable."); |
| 123 | + return []; |
| 124 | + } |
| 125 | + |
| 126 | + if (registry.SchemaVersion > SupportedSchemaVersion) |
| 127 | + { |
| 128 | + emitError( |
| 129 | + $"Changelog entry registry for product '{product}' uses schema version {registry.SchemaVersion}, but this build only understands version {SupportedSchemaVersion}. Update docs-builder."); |
| 130 | + return []; |
| 131 | + } |
| 132 | + |
| 133 | + var entries = new List<CdnChangelogEntry>(registry.Bundles.Count); |
| 134 | + foreach (var entry in registry.Bundles) |
| 135 | + { |
| 136 | + ctx.ThrowIfCancellationRequested(); |
| 137 | + |
| 138 | + var fileName = entry.File; |
| 139 | + if (string.IsNullOrWhiteSpace(fileName) || !IsSafeFileName(fileName)) |
| 140 | + { |
| 141 | + emitWarning($"Changelog entry registry for '{product}' lists an invalid file name '{fileName}'; skipping."); |
| 142 | + continue; |
| 143 | + } |
| 144 | + |
| 145 | + var entryUri = Combine(baseUri, product, "changelog", fileName); |
| 146 | + var (fetched, content, lastError) = await TryFetchEntryAsync(entryUri, fileName, product, ctx).ConfigureAwait(false); |
| 147 | + if (fetched) |
| 148 | + { |
| 149 | + entries.Add(new CdnChangelogEntry(fileName, content)); |
| 150 | + continue; |
| 151 | + } |
| 152 | + |
| 153 | + // The registry lists this entry, so it exists in the private bucket and should have been |
| 154 | + // scrubbed to the public one within milliseconds. Still missing after the retry budget means |
| 155 | + // a genuine propagation/scrub failure — fail rather than ship a bundle missing this entry. |
| 156 | + emitError( |
| 157 | + $"Changelog entry '{fileName}' for product '{product}' is listed in the registry but could not be fetched from {entryUri} after {_maxAttempts} attempt(s): {lastError}. " + |
| 158 | + "The scrubbed copy may not have propagated to the CDN yet; retry shortly, and if it persists check the changelog scrubber pipeline."); |
| 159 | + return []; |
| 160 | + } |
| 161 | + |
| 162 | + _logger.LogInformation("Fetched {Count} changelog entry(ies) for {Product} from {BaseUri}", entries.Count, product, baseUri); |
| 163 | + return entries; |
| 164 | + } |
| 165 | + |
| 166 | + /// <summary> |
| 167 | + /// Fetches a single entry, retrying transient failures (most importantly a not-yet-propagated 404) |
| 168 | + /// up to <see cref="_maxAttempts"/> times with exponential backoff. Retry requests are cache-busted |
| 169 | + /// so a CloudFront-cached 404 cannot pin the result for the whole window. |
| 170 | + /// </summary> |
| 171 | + private async Task<(bool Fetched, string Content, string? LastError)> TryFetchEntryAsync(Uri uri, string fileName, string product, Cancel ctx) |
| 172 | + { |
| 173 | + string? lastError = null; |
| 174 | + |
| 175 | + for (var attempt = 1; attempt <= _maxAttempts; attempt++) |
| 176 | + { |
| 177 | + ctx.ThrowIfCancellationRequested(); |
| 178 | + try |
| 179 | + { |
| 180 | + var content = await FetchTextAsync(uri, attempt, ctx).ConfigureAwait(false); |
| 181 | + if (attempt > 1) |
| 182 | + _logger.LogInformation("Fetched changelog entry '{File}' for {Product} on attempt {Attempt}/{Max}", fileName, product, attempt, _maxAttempts); |
| 183 | + return (true, content, null); |
| 184 | + } |
| 185 | + catch (Exception ex) when (ex is not OperationCanceledException) |
| 186 | + { |
| 187 | + lastError = ex.Message; |
| 188 | + if (attempt >= _maxAttempts) |
| 189 | + break; |
| 190 | + |
| 191 | + var delay = RetryDelay(attempt); |
| 192 | + _logger.LogDebug( |
| 193 | + "Changelog entry '{File}' for {Product} not yet available (attempt {Attempt}/{Max}: {Error}); retrying in {Delay}", |
| 194 | + fileName, product, attempt, _maxAttempts, ex.Message, delay); |
| 195 | + await _sleep(delay, ctx).ConfigureAwait(false); |
| 196 | + } |
| 197 | + } |
| 198 | + |
| 199 | + return (false, string.Empty, lastError); |
| 200 | + } |
| 201 | + |
| 202 | + private async Task<ChangelogRegistry?> FetchRegistryAsync(Uri registryUri, Cancel ctx) |
| 203 | + { |
| 204 | + _logger.LogInformation("Fetching changelog entry registry {RegistryUri}", registryUri); |
| 205 | + using var request = new HttpRequestMessage(HttpMethod.Get, registryUri); |
| 206 | + using var response = await _httpClient.SendAsync(request, ctx).ConfigureAwait(false); |
| 207 | + _ = response.EnsureSuccessStatusCode(); |
| 208 | + await using var stream = await response.Content.ReadAsStreamAsync(ctx).ConfigureAwait(false); |
| 209 | + return await JsonSerializer.DeserializeAsync(stream, ChangelogRegistryJsonContext.Default.ChangelogRegistry, ctx).ConfigureAwait(false); |
| 210 | + } |
| 211 | + |
| 212 | + private async Task<string> FetchTextAsync(Uri uri, int attempt, Cancel ctx) |
| 213 | + { |
| 214 | + // Only bust the cache on retries: the first hit should use the CDN cache normally (the common, |
| 215 | + // already-propagated case); retries explicitly want to bypass any cached 404. |
| 216 | + var requestUri = attempt > 1 ? WithCacheBuster(uri) : uri; |
| 217 | + using var request = new HttpRequestMessage(HttpMethod.Get, requestUri); |
| 218 | + if (attempt > 1) |
| 219 | + _ = request.Headers.TryAddWithoutValidation("Cache-Control", "no-cache"); |
| 220 | + using var response = await _httpClient.SendAsync(request, ctx).ConfigureAwait(false); |
| 221 | + _ = response.EnsureSuccessStatusCode(); |
| 222 | + return await response.Content.ReadAsStringAsync(ctx).ConfigureAwait(false); |
| 223 | + } |
| 224 | + |
| 225 | + private static TimeSpan RetryDelay(int attempt) |
| 226 | + { |
| 227 | + // attempt is 1-based; first retry waits BaseRetryDelayMs, doubling up to the cap. |
| 228 | + var ms = Math.Min(BaseRetryDelayMs * (1L << (attempt - 1)), MaxRetryDelayMs); |
| 229 | + return TimeSpan.FromMilliseconds(ms); |
| 230 | + } |
| 231 | + |
| 232 | + private static async Task DefaultSleepAsync(TimeSpan delay, Cancel ctx) |
| 233 | + { |
| 234 | + if (delay > TimeSpan.Zero) |
| 235 | + await Task.Delay(delay, ctx).ConfigureAwait(false); |
| 236 | + } |
| 237 | + |
| 238 | + private static Uri WithCacheBuster(Uri uri) |
| 239 | + { |
| 240 | + var separator = string.IsNullOrEmpty(uri.Query) ? "?" : "&"; |
| 241 | + return new Uri($"{uri.AbsoluteUri}{separator}_={DateTimeOffset.UtcNow.Ticks:x}"); |
| 242 | + } |
| 243 | + |
| 244 | + private static Uri Combine(Uri baseUri, params string[] segments) |
| 245 | + { |
| 246 | + var basePath = baseUri.AbsoluteUri.TrimEnd('/'); |
| 247 | + var suffix = string.Join('/', segments.Select(Uri.EscapeDataString)); |
| 248 | + return new Uri($"{basePath}/{suffix}"); |
| 249 | + } |
| 250 | + |
| 251 | + /// <summary> |
| 252 | + /// Guards against path traversal or nested keys sneaking in via the registry: an entry file name |
| 253 | + /// must be a single path segment (the producer always writes <c>{product}/changelog/{file}</c>). |
| 254 | + /// </summary> |
| 255 | + private static bool IsSafeFileName(string fileName) => |
| 256 | + !fileName.Contains('/', StringComparison.Ordinal) |
| 257 | + && !fileName.Contains('\\', StringComparison.Ordinal) |
| 258 | + && fileName is not ("." or ".."); |
| 259 | + |
| 260 | + /// <summary> |
| 261 | + /// Disposes the per-instance <see cref="HttpClient"/> created for an injected handler. The shared |
| 262 | + /// production client (<see cref="SharedHttpClient"/>) is process-lived and intentionally not disposed. |
| 263 | + /// </summary> |
| 264 | + public void Dispose() |
| 265 | + { |
| 266 | + _ownedHttpClient?.Dispose(); |
| 267 | + GC.SuppressFinalize(this); |
| 268 | + } |
| 269 | +} |
0 commit comments