Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Modules/ArchiveModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task ShowArchiveStatus()
if (stats.NewestMessage.HasValue)
embed.AddField("Newest Message", $"<t:{ToUnix(stats.NewestMessage.Value)}:R>", true);

embed.AddField("Status", stats.IsBackfilling ? "Backfilling in progress..." : stats.IsComplete ? "Complete" : "Waiting", true);
embed.AddField("Status", stats.IsBackfilling ? "Initial backfilling..." : "Ongoing archive", true);

await FollowupAsync(embed: embed.Build());
}
Expand Down
164 changes: 46 additions & 118 deletions src/Services/MessageArchiveService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public class ArchiveStats
public DateTime? OldestMessage { get; set; }
public DateTime? NewestMessage { get; set; }
public bool IsBackfilling { get; set; }
public bool IsComplete { get; set; }
public long DbSizeBytes { get; set; }
public int TotalUsers { get; set; }
}
Expand All @@ -36,25 +35,19 @@ class RetryState
public ulong OldestMessageId;
public DateTime NextRetryAt;
public int DelaySec;
public const int MaxDelaySec = 300;
public const int MaxDelaySec = 1800;
}

[Service]
public class MessageArchiveService
{
const string DB_PATH = "ArchiveData/messages.db";

static readonly Dictionary<ulong, ulong> KnownFirstMessages = new()
{
[264800866169651203] = 264813684281311234
};

readonly DiscordSocketClient _discord;
readonly LoggingService _log;
readonly SemaphoreSlim _dbLock = new(1, 1);
readonly List<RetryState> _retryQueue = new();
volatile bool _backfillStarted;
volatile bool _backfillComplete;

public MessageArchiveService(DiscordSocketClient discord, LoggingService log)
{
Expand All @@ -73,32 +66,13 @@ void EnsureDatabase()
using var ctx = CreateContext();
ctx.Database.EnsureCreated();

var oldest = ctx.Messages.OrderBy(m => m.Timestamp).FirstOrDefault();
_backfillComplete = true;

foreach (var (channelId, targetId) in KnownFirstMessages)
{
var hasTarget = ctx.Messages.Any(m => m.MessageId == targetId);
if (!hasTarget)
{
_backfillComplete = false;
break;
}
}

_log.Info($"Archive database ready at {DB_PATH} (has data: {oldest != null}, oldest: {oldest?.Timestamp:O})");
_log.Info($"Archive database ready at {DB_PATH}");
}

public void Start()
{
_discord.MessageReceived += OnMessageReceived;
if (!_backfillComplete)
{
if (_discord.ConnectionState == ConnectionState.Connected)
_ = Task.Run(BackfillAllChannelsAsync);
else
_discord.Ready += OnReady;
}
_discord.Ready += OnReady;
}

Task OnReady()
Expand Down Expand Up @@ -164,10 +138,7 @@ async Task BackfillAllChannelsAsync()
{
try
{
if (await BackfillChannelAsync(channel) == BackfillResult.NeedRetry)
{
// retry handled by background loop
}
await BackfillChannelAsync(channel);
}
catch (Exception e)
{
Expand All @@ -178,67 +149,55 @@ async Task BackfillAllChannelsAsync()

if (_retryQueue.Count == 0)
{
_backfillComplete = true;
_log.Info("Full backfill complete. All channels archived.");
}
else
{
_log.Info($"Initial pass done. {_retryQueue.Count} channels need deep archive retry.");
_log.Info($"Initial pass done. {_retryQueue.Count} channels queued for ongoing deep archive.");
_ = RunRetryLoopAsync();
}
}

async Task RunRetryLoopAsync()
{
while (_retryQueue.Count > 0)
while (true)
{
var now = DateTime.UtcNow;
var due = new List<RetryState>();

List<RetryState> due;
lock (_retryQueue)
{
due.AddRange(_retryQueue.Where(r => now >= r.NextRetryAt));
due = _retryQueue.Where(r => DateTime.UtcNow >= r.NextRetryAt).ToList();
}

foreach (var state in due)
{
try
{
var result = await BackfillChannelAsync(state.Channel, state.OldestMessageId);
if (result == BackfillResult.Complete)
{
lock (_retryQueue) _retryQueue.Remove(state);
_log.Info($"Deep archive complete for #{state.Channel.Name}.");
}
else if (result == BackfillResult.NeedRetry)
if (result == BackfillResult.NeedRetry)
{
state.DelaySec = Math.Min(state.DelaySec * 2, RetryState.MaxDelaySec);
state.NextRetryAt = DateTime.UtcNow.AddSeconds(state.DelaySec);
}
else if (result == BackfillResult.Complete)
{
lock (_retryQueue) _retryQueue.Remove(state);
_log.Info($"Deep archive complete for #{state.Channel.Name}.");
}
}
catch (Exception e)
{
state.DelaySec = Math.Min(state.DelaySec * 2, RetryState.MaxDelaySec);
state.NextRetryAt = DateTime.UtcNow.AddSeconds(state.DelaySec);
_log.Error($"Retry failed for #{state.Channel.Name}: {e.Message}. Next retry in {state.DelaySec}s.");
_log.Warning($"Retry failed #{state.Channel.Name}: {e.Message}. Next in {state.DelaySec}s.");
}
}

if (_retryQueue.Count == 0)
{
_backfillComplete = true;
_log.Info("Deep archive retry complete. All channels fully archived.");
return;
}

await Task.Delay(5000);
}
}

async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong? resumeFromId = null)
{
KnownFirstMessages.TryGetValue(channel.Id, out var targetMessageId);

if (resumeFromId == null)
{
var firstPage = await channel.GetMessagesAsync(1).FlattenAsync();
Expand All @@ -250,8 +209,7 @@ async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong

var totalSaved = 0;
var oldestMessageId = resumeFromId;
var emptyRetries = 0;
var delaySec = 5;
var fetchDelay = 5;

while (true)
{
Expand All @@ -262,51 +220,19 @@ async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong
}
catch (Exception e)
{
_log.Warning($"Backfill fetch error #{channel.Name}: {e.Message}. Retry in {delaySec}s.");
await Task.Delay(TimeSpan.FromSeconds(delaySec));
delaySec = Math.Min(delaySec * 2, 60);
_log.Warning($"Fetch error #{channel.Name}: {e.Message}. Retry in {fetchDelay}s.");
await Task.Delay(TimeSpan.FromSeconds(fetchDelay));
fetchDelay = Math.Min(fetchDelay * 2, 60);
continue;
}

if (messages.Count == 0)
{
if (targetMessageId > 0 && oldestMessageId > targetMessageId)
{
emptyRetries++;
_log.Info($"Deep archive: #{channel.Name} at ID {oldestMessageId}, waiting for Discord cold storage ({emptyRetries}).");

if (!_retryQueue.Any(r => r.Channel.Id == channel.Id))
{
lock (_retryQueue)
{
_retryQueue.Add(new RetryState
{
Channel = channel,
OldestMessageId = oldestMessageId.Value,
NextRetryAt = DateTime.UtcNow.AddSeconds(delaySec),
DelaySec = delaySec
});
}
}
else
{
lock (_retryQueue)
{
var existing = _retryQueue.First(r => r.Channel.Id == channel.Id);
existing.OldestMessageId = oldestMessageId.Value;
existing.NextRetryAt = DateTime.UtcNow.AddSeconds(delaySec);
existing.DelaySec = Math.Min(existing.DelaySec * 2, RetryState.MaxDelaySec);
}
}

return BackfillResult.NeedRetry;
}

break;
QueueRetry(channel, oldestMessageId.Value);
return BackfillResult.NeedRetry;
}

emptyRetries = 0;
delaySec = 5;
fetchDelay = 5;

var newRecords = new List<MessageRecord>();

Expand Down Expand Up @@ -363,31 +289,34 @@ async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong

if (messages.Count < 100)
{
if (targetMessageId > 0 && oldestMessageId > targetMessageId)
{
if (!_retryQueue.Any(r => r.Channel.Id == channel.Id))
{
lock (_retryQueue)
{
_retryQueue.Add(new RetryState
{
Channel = channel,
OldestMessageId = oldestMessageId.Value,
NextRetryAt = DateTime.UtcNow.AddSeconds(delaySec),
DelaySec = delaySec
});
}
}
return BackfillResult.NeedRetry;
}
break;
QueueRetry(channel, oldestMessageId!.Value);
return BackfillResult.NeedRetry;
}

await Task.Delay(200);
}
}

_log.Info($"Backfill for #{channel.Name} complete. Saved {totalSaved} messages.");
return BackfillResult.Complete;
void QueueRetry(SocketTextChannel channel, ulong oldestMessageId)
{
lock (_retryQueue)
{
var existing = _retryQueue.FirstOrDefault(r => r.Channel.Id == channel.Id);
if (existing != null)
{
existing.OldestMessageId = oldestMessageId;
}
else
{
_retryQueue.Add(new RetryState
{
Channel = channel,
OldestMessageId = oldestMessageId,
NextRetryAt = DateTime.UtcNow.AddSeconds(60),
DelaySec = 60
});
}
}
}

ArchiveDbContext CreateContext()
Expand Down Expand Up @@ -424,8 +353,7 @@ public async Task<ArchiveStats> GetArchiveStatsAsync()
TotalUsers = totalUsers,
OldestMessage = oldest?.Timestamp,
NewestMessage = newest?.Timestamp,
IsBackfilling = _backfillStarted && !_backfillComplete,
IsComplete = _backfillComplete,
IsBackfilling = _backfillStarted,
DbSizeBytes = dbSize
};
}
Expand Down