Skip to content

Commit d345572

Browse files
authored
Merge pull request #8 from NullComma/feature/message-archive
feat: add message archive system
2 parents c55e845 + 4d3cfad commit d345572

2 files changed

Lines changed: 47 additions & 119 deletions

File tree

src/Modules/ArchiveModule.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public async Task ShowArchiveStatus()
3636
if (stats.NewestMessage.HasValue)
3737
embed.AddField("Newest Message", $"<t:{ToUnix(stats.NewestMessage.Value)}:R>", true);
3838

39-
embed.AddField("Status", stats.IsBackfilling ? "Backfilling in progress..." : stats.IsComplete ? "Complete" : "Waiting", true);
39+
embed.AddField("Status", stats.IsBackfilling ? "Initial backfilling..." : "Ongoing archive", true);
4040

4141
await FollowupAsync(embed: embed.Build());
4242
}

src/Services/MessageArchiveService.cs

Lines changed: 46 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public class ArchiveStats
1313
public DateTime? OldestMessage { get; set; }
1414
public DateTime? NewestMessage { get; set; }
1515
public bool IsBackfilling { get; set; }
16-
public bool IsComplete { get; set; }
1716
public long DbSizeBytes { get; set; }
1817
public int TotalUsers { get; set; }
1918
}
@@ -36,25 +35,19 @@ class RetryState
3635
public ulong OldestMessageId;
3736
public DateTime NextRetryAt;
3837
public int DelaySec;
39-
public const int MaxDelaySec = 300;
38+
public const int MaxDelaySec = 1800;
4039
}
4140

4241
[Service]
4342
public class MessageArchiveService
4443
{
4544
const string DB_PATH = "ArchiveData/messages.db";
4645

47-
static readonly Dictionary<ulong, ulong> KnownFirstMessages = new()
48-
{
49-
[264800866169651203] = 264813684281311234
50-
};
51-
5246
readonly DiscordSocketClient _discord;
5347
readonly LoggingService _log;
5448
readonly SemaphoreSlim _dbLock = new(1, 1);
5549
readonly List<RetryState> _retryQueue = new();
5650
volatile bool _backfillStarted;
57-
volatile bool _backfillComplete;
5851

5952
public MessageArchiveService(DiscordSocketClient discord, LoggingService log)
6053
{
@@ -73,32 +66,13 @@ void EnsureDatabase()
7366
using var ctx = CreateContext();
7467
ctx.Database.EnsureCreated();
7568

76-
var oldest = ctx.Messages.OrderBy(m => m.Timestamp).FirstOrDefault();
77-
_backfillComplete = true;
78-
79-
foreach (var (channelId, targetId) in KnownFirstMessages)
80-
{
81-
var hasTarget = ctx.Messages.Any(m => m.MessageId == targetId);
82-
if (!hasTarget)
83-
{
84-
_backfillComplete = false;
85-
break;
86-
}
87-
}
88-
89-
_log.Info($"Archive database ready at {DB_PATH} (has data: {oldest != null}, oldest: {oldest?.Timestamp:O})");
69+
_log.Info($"Archive database ready at {DB_PATH}");
9070
}
9171

9272
public void Start()
9373
{
9474
_discord.MessageReceived += OnMessageReceived;
95-
if (!_backfillComplete)
96-
{
97-
if (_discord.ConnectionState == ConnectionState.Connected)
98-
_ = Task.Run(BackfillAllChannelsAsync);
99-
else
100-
_discord.Ready += OnReady;
101-
}
75+
_discord.Ready += OnReady;
10276
}
10377

10478
Task OnReady()
@@ -164,10 +138,7 @@ async Task BackfillAllChannelsAsync()
164138
{
165139
try
166140
{
167-
if (await BackfillChannelAsync(channel) == BackfillResult.NeedRetry)
168-
{
169-
// retry handled by background loop
170-
}
141+
await BackfillChannelAsync(channel);
171142
}
172143
catch (Exception e)
173144
{
@@ -178,67 +149,55 @@ async Task BackfillAllChannelsAsync()
178149

179150
if (_retryQueue.Count == 0)
180151
{
181-
_backfillComplete = true;
182152
_log.Info("Full backfill complete. All channels archived.");
183153
}
184154
else
185155
{
186-
_log.Info($"Initial pass done. {_retryQueue.Count} channels need deep archive retry.");
156+
_log.Info($"Initial pass done. {_retryQueue.Count} channels queued for ongoing deep archive.");
187157
_ = RunRetryLoopAsync();
188158
}
189159
}
190160

191161
async Task RunRetryLoopAsync()
192162
{
193-
while (_retryQueue.Count > 0)
163+
while (true)
194164
{
195-
var now = DateTime.UtcNow;
196-
var due = new List<RetryState>();
197-
165+
List<RetryState> due;
198166
lock (_retryQueue)
199167
{
200-
due.AddRange(_retryQueue.Where(r => now >= r.NextRetryAt));
168+
due = _retryQueue.Where(r => DateTime.UtcNow >= r.NextRetryAt).ToList();
201169
}
202170

203171
foreach (var state in due)
204172
{
205173
try
206174
{
207175
var result = await BackfillChannelAsync(state.Channel, state.OldestMessageId);
208-
if (result == BackfillResult.Complete)
209-
{
210-
lock (_retryQueue) _retryQueue.Remove(state);
211-
_log.Info($"Deep archive complete for #{state.Channel.Name}.");
212-
}
213-
else if (result == BackfillResult.NeedRetry)
176+
if (result == BackfillResult.NeedRetry)
214177
{
215178
state.DelaySec = Math.Min(state.DelaySec * 2, RetryState.MaxDelaySec);
216179
state.NextRetryAt = DateTime.UtcNow.AddSeconds(state.DelaySec);
217180
}
181+
else if (result == BackfillResult.Complete)
182+
{
183+
lock (_retryQueue) _retryQueue.Remove(state);
184+
_log.Info($"Deep archive complete for #{state.Channel.Name}.");
185+
}
218186
}
219187
catch (Exception e)
220188
{
221189
state.DelaySec = Math.Min(state.DelaySec * 2, RetryState.MaxDelaySec);
222190
state.NextRetryAt = DateTime.UtcNow.AddSeconds(state.DelaySec);
223-
_log.Error($"Retry failed for #{state.Channel.Name}: {e.Message}. Next retry in {state.DelaySec}s.");
191+
_log.Warning($"Retry failed #{state.Channel.Name}: {e.Message}. Next in {state.DelaySec}s.");
224192
}
225193
}
226194

227-
if (_retryQueue.Count == 0)
228-
{
229-
_backfillComplete = true;
230-
_log.Info("Deep archive retry complete. All channels fully archived.");
231-
return;
232-
}
233-
234195
await Task.Delay(5000);
235196
}
236197
}
237198

238199
async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong? resumeFromId = null)
239200
{
240-
KnownFirstMessages.TryGetValue(channel.Id, out var targetMessageId);
241-
242201
if (resumeFromId == null)
243202
{
244203
var firstPage = await channel.GetMessagesAsync(1).FlattenAsync();
@@ -250,8 +209,7 @@ async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong
250209

251210
var totalSaved = 0;
252211
var oldestMessageId = resumeFromId;
253-
var emptyRetries = 0;
254-
var delaySec = 5;
212+
var fetchDelay = 5;
255213

256214
while (true)
257215
{
@@ -262,51 +220,19 @@ async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong
262220
}
263221
catch (Exception e)
264222
{
265-
_log.Warning($"Backfill fetch error #{channel.Name}: {e.Message}. Retry in {delaySec}s.");
266-
await Task.Delay(TimeSpan.FromSeconds(delaySec));
267-
delaySec = Math.Min(delaySec * 2, 60);
223+
_log.Warning($"Fetch error #{channel.Name}: {e.Message}. Retry in {fetchDelay}s.");
224+
await Task.Delay(TimeSpan.FromSeconds(fetchDelay));
225+
fetchDelay = Math.Min(fetchDelay * 2, 60);
268226
continue;
269227
}
270228

271229
if (messages.Count == 0)
272230
{
273-
if (targetMessageId > 0 && oldestMessageId > targetMessageId)
274-
{
275-
emptyRetries++;
276-
_log.Info($"Deep archive: #{channel.Name} at ID {oldestMessageId}, waiting for Discord cold storage ({emptyRetries}).");
277-
278-
if (!_retryQueue.Any(r => r.Channel.Id == channel.Id))
279-
{
280-
lock (_retryQueue)
281-
{
282-
_retryQueue.Add(new RetryState
283-
{
284-
Channel = channel,
285-
OldestMessageId = oldestMessageId.Value,
286-
NextRetryAt = DateTime.UtcNow.AddSeconds(delaySec),
287-
DelaySec = delaySec
288-
});
289-
}
290-
}
291-
else
292-
{
293-
lock (_retryQueue)
294-
{
295-
var existing = _retryQueue.First(r => r.Channel.Id == channel.Id);
296-
existing.OldestMessageId = oldestMessageId.Value;
297-
existing.NextRetryAt = DateTime.UtcNow.AddSeconds(delaySec);
298-
existing.DelaySec = Math.Min(existing.DelaySec * 2, RetryState.MaxDelaySec);
299-
}
300-
}
301-
302-
return BackfillResult.NeedRetry;
303-
}
304-
305-
break;
231+
QueueRetry(channel, oldestMessageId.Value);
232+
return BackfillResult.NeedRetry;
306233
}
307234

308-
emptyRetries = 0;
309-
delaySec = 5;
235+
fetchDelay = 5;
310236

311237
var newRecords = new List<MessageRecord>();
312238

@@ -363,31 +289,34 @@ async Task<BackfillResult> BackfillChannelAsync(SocketTextChannel channel, ulong
363289

364290
if (messages.Count < 100)
365291
{
366-
if (targetMessageId > 0 && oldestMessageId > targetMessageId)
367-
{
368-
if (!_retryQueue.Any(r => r.Channel.Id == channel.Id))
369-
{
370-
lock (_retryQueue)
371-
{
372-
_retryQueue.Add(new RetryState
373-
{
374-
Channel = channel,
375-
OldestMessageId = oldestMessageId.Value,
376-
NextRetryAt = DateTime.UtcNow.AddSeconds(delaySec),
377-
DelaySec = delaySec
378-
});
379-
}
380-
}
381-
return BackfillResult.NeedRetry;
382-
}
383-
break;
292+
QueueRetry(channel, oldestMessageId!.Value);
293+
return BackfillResult.NeedRetry;
384294
}
385295

386296
await Task.Delay(200);
387297
}
298+
}
388299

389-
_log.Info($"Backfill for #{channel.Name} complete. Saved {totalSaved} messages.");
390-
return BackfillResult.Complete;
300+
void QueueRetry(SocketTextChannel channel, ulong oldestMessageId)
301+
{
302+
lock (_retryQueue)
303+
{
304+
var existing = _retryQueue.FirstOrDefault(r => r.Channel.Id == channel.Id);
305+
if (existing != null)
306+
{
307+
existing.OldestMessageId = oldestMessageId;
308+
}
309+
else
310+
{
311+
_retryQueue.Add(new RetryState
312+
{
313+
Channel = channel,
314+
OldestMessageId = oldestMessageId,
315+
NextRetryAt = DateTime.UtcNow.AddSeconds(60),
316+
DelaySec = 60
317+
});
318+
}
319+
}
391320
}
392321

393322
ArchiveDbContext CreateContext()
@@ -424,8 +353,7 @@ public async Task<ArchiveStats> GetArchiveStatsAsync()
424353
TotalUsers = totalUsers,
425354
OldestMessage = oldest?.Timestamp,
426355
NewestMessage = newest?.Timestamp,
427-
IsBackfilling = _backfillStarted && !_backfillComplete,
428-
IsComplete = _backfillComplete,
356+
IsBackfilling = _backfillStarted,
429357
DbSizeBytes = dbSize
430358
};
431359
}

0 commit comments

Comments
 (0)