Skip to content

Commit b9ebac5

Browse files
nattb8claude
authored andcommitted
feat(audience): add DiskStore and EventQueue for persistent event batching (SDK-127)
Implements file-per-event disk persistence under imtbl_audience/queue/ with atomic writes, stale-event pruning (30d), and crash recovery. EventQueue wraps a ConcurrentQueue with a background drain thread that flushes to DiskStore on a time interval or when FlushSize is reached; Shutdown/Dispose guarantee a final flush. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 76a0901 commit b9ebac5

9 files changed

Lines changed: 577 additions & 0 deletions

File tree

src/Packages/Audience/Runtime/Transport.meta

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
6+
namespace Immutable.Audience
7+
{
8+
/// <summary>
9+
/// File-per-event persistent store. Each event is written as an atomic
10+
/// <c>{ticks}_{uuid}.json</c> file inside <c>imtbl_audience/queue/</c>.
11+
/// </summary>
12+
internal sealed class DiskStore
13+
{
14+
private readonly string _queueDir;
15+
16+
internal DiskStore(string persistentDataPath)
17+
{
18+
_queueDir = Path.Combine(persistentDataPath, "imtbl_audience", "queue");
19+
Directory.CreateDirectory(_queueDir);
20+
}
21+
22+
/// <summary>Atomically writes <paramref name="json"/> as a new event file.</summary>
23+
internal void Write(string json)
24+
{
25+
var fileName = $"{DateTime.UtcNow.Ticks}_{Guid.NewGuid():N}.json";
26+
var finalPath = Path.Combine(_queueDir, fileName);
27+
var tmpPath = finalPath + ".tmp";
28+
29+
File.WriteAllText(tmpPath, json);
30+
31+
try
32+
{
33+
File.Move(tmpPath, finalPath);
34+
}
35+
catch (IOException)
36+
{
37+
// Destination already exists (unlikely but safe to handle)
38+
File.Delete(finalPath);
39+
File.Move(tmpPath, finalPath);
40+
}
41+
}
42+
43+
/// <summary>
44+
/// Returns up to <paramref name="maxSize"/> file paths, oldest first.
45+
/// Files older than <see cref="Constants.StaleEventDays"/> days are deleted and excluded.
46+
/// </summary>
47+
internal IReadOnlyList<string> ReadBatch(int maxSize)
48+
{
49+
if (maxSize <= 0)
50+
return Array.Empty<string>();
51+
52+
maxSize = Math.Min(maxSize, Constants.MaxBatchSize);
53+
54+
var cutoff = DateTime.UtcNow.AddDays(-Constants.StaleEventDays);
55+
56+
var result = new List<string>();
57+
58+
// Sort by filename (ticks prefix) → oldest first
59+
var files = Directory.GetFiles(_queueDir, "*.json")
60+
.OrderBy(f => Path.GetFileName(f), StringComparer.Ordinal);
61+
62+
foreach (var path in files)
63+
{
64+
if (result.Count >= maxSize)
65+
break;
66+
67+
// Stale check: parse ticks from filename prefix
68+
var name = Path.GetFileNameWithoutExtension(path);
69+
var underscoreIdx = name.IndexOf('_');
70+
if (underscoreIdx > 0 && long.TryParse(name.Substring(0, underscoreIdx), out var ticks))
71+
{
72+
var fileTime = new DateTime(ticks, DateTimeKind.Utc);
73+
if (fileTime < cutoff)
74+
{
75+
TryDelete(path);
76+
continue;
77+
}
78+
}
79+
80+
result.Add(path);
81+
}
82+
83+
return result;
84+
}
85+
86+
/// <summary>Deletes the event files at <paramref name="paths"/>.</summary>
87+
internal void Delete(IEnumerable<string> paths)
88+
{
89+
foreach (var path in paths)
90+
TryDelete(path);
91+
}
92+
93+
/// <summary>Returns the total number of event files currently on disk.</summary>
94+
internal int Count() => Directory.GetFiles(_queueDir, "*.json").Length;
95+
96+
private static void TryDelete(string path)
97+
{
98+
try { File.Delete(path); }
99+
catch (IOException) { }
100+
catch (UnauthorizedAccessException) { }
101+
}
102+
}
103+
}

src/Packages/Audience/Runtime/Transport/DiskStore.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Threading;
4+
5+
namespace Immutable.Audience
6+
{
7+
/// <summary>
8+
/// Thread-safe, disk-persistent batch event queue for the Audience SDK.
9+
///
10+
/// <para>Enqueue is lock-free and safe to call from any thread. A background
11+
/// drain thread moves events from the in-memory <see cref="ConcurrentQueue{T}"/>
12+
/// to <see cref="DiskStore"/>, flushing either on a time interval or when the
13+
/// in-memory batch reaches <see cref="AudienceConfig.FlushSize"/>.</para>
14+
///
15+
/// <para>Call <see cref="Shutdown"/> before process exit to flush remaining events
16+
/// and stop the drain thread cleanly.</para>
17+
/// </summary>
18+
internal sealed class EventQueue : IDisposable
19+
{
20+
private readonly DiskStore _store;
21+
private readonly int _flushIntervalMs;
22+
private readonly int _flushSize;
23+
24+
private readonly ConcurrentQueue<string> _memory = new ConcurrentQueue<string>();
25+
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
26+
private readonly Thread _drainThread;
27+
private readonly ManualResetEventSlim _flushGate = new ManualResetEventSlim(false);
28+
29+
private bool _disposed;
30+
31+
/// <param name="store">Pre-created <see cref="DiskStore"/> for this queue.</param>
32+
/// <param name="flushIntervalSeconds">How often to drain to disk regardless of batch size.</param>
33+
/// <param name="flushSize">Drain to disk immediately when this many events are queued.</param>
34+
internal EventQueue(DiskStore store, int flushIntervalSeconds, int flushSize)
35+
{
36+
_store = store ?? throw new ArgumentNullException(nameof(store));
37+
_flushIntervalMs = Math.Max(1, flushIntervalSeconds) * 1000;
38+
_flushSize = Math.Max(1, flushSize);
39+
40+
_drainThread = new Thread(DrainLoop)
41+
{
42+
IsBackground = true,
43+
Name = "imtbl-audience-drain"
44+
};
45+
_drainThread.Start();
46+
}
47+
48+
/// <summary>Enqueues a JSON-serialised event. Lock-free; safe from any thread.</summary>
49+
internal void Enqueue(string json)
50+
{
51+
if (_disposed) return;
52+
53+
_memory.Enqueue(json);
54+
55+
// Signal the drain thread early if we've hit the flush-size threshold
56+
if (_memory.Count >= _flushSize)
57+
_flushGate.Set();
58+
}
59+
60+
/// <summary>
61+
/// Drains the in-memory queue and persists all events to disk immediately.
62+
/// Blocks until the drain is complete.
63+
/// </summary>
64+
internal void FlushAsync()
65+
{
66+
DrainMemoryToDisk();
67+
}
68+
69+
/// <summary>
70+
/// Flushes all pending events to disk and stops the drain thread.
71+
/// Safe to call multiple times.
72+
/// </summary>
73+
internal void Shutdown()
74+
{
75+
if (_disposed) return;
76+
_cts.Cancel();
77+
_flushGate.Set(); // Wake drain thread so it exits promptly
78+
_drainThread.Join(TimeSpan.FromSeconds(5));
79+
DrainMemoryToDisk(); // Final drain after thread stops
80+
_disposed = true;
81+
}
82+
83+
// -----------------------------------------------------------------
84+
// Background drain loop
85+
// -----------------------------------------------------------------
86+
87+
private void DrainLoop()
88+
{
89+
while (!_cts.IsCancellationRequested)
90+
{
91+
// Wait for flush gate or interval timeout
92+
_flushGate.Wait(_flushIntervalMs);
93+
_flushGate.Reset();
94+
95+
if (_cts.IsCancellationRequested)
96+
break;
97+
98+
DrainMemoryToDisk();
99+
}
100+
}
101+
102+
private void DrainMemoryToDisk()
103+
{
104+
while (_memory.TryDequeue(out var json))
105+
{
106+
try
107+
{
108+
_store.Write(json);
109+
}
110+
catch (Exception)
111+
{
112+
// Best-effort: if we can't write, discard rather than block the drain
113+
}
114+
}
115+
}
116+
117+
// -----------------------------------------------------------------
118+
// IDisposable
119+
// -----------------------------------------------------------------
120+
121+
public void Dispose()
122+
{
123+
Shutdown();
124+
_cts.Dispose();
125+
_flushGate.Dispose();
126+
}
127+
}
128+
}

src/Packages/Audience/Runtime/Transport/EventQueue.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)