-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathP2PBufferTransfer.cs
More file actions
208 lines (186 loc) · 7.06 KB
/
Copy pathP2PBufferTransfer.cs
File metadata and controls
208 lines (186 loc) · 7.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
namespace SpawnDev.ILGPU.P2P;
/// <summary>
/// Handles tensor/buffer data transfer between coordinator and workers.
/// Large buffers are chunked for WebRTC data channel compatibility
/// (max ~256KB per message on most browsers).
///
/// Transfer flow:
/// Coordinator: BufferSend header → chunk 0 → chunk 1 → ... → chunk N
/// Worker: receives all chunks → reassembles → stores in local buffer
/// Worker (after kernel): BufferData header → chunk 0 → ... → chunk N
/// Coordinator: receives all chunks → updates local buffer
/// </summary>
public class P2PBufferTransfer
{
/// <summary>
/// Maximum chunk size in bytes. Now that BufferSend/BufferData go through the
/// binary wire frame (<see cref="P2PBinaryFrame"/>) instead of a JSON envelope
/// that would base64-expand the payload, we can push right up to SIPSorcery's
/// hard 256 KiB SCTP max-message cap minus a 512-byte safety margin for the
/// ~15-byte binary frame header plus bufferId utf-8 bytes. 4x fewer wire
/// messages per multi-megabyte buffer vs the prior 64 KiB ceiling. Callers
/// who need smaller chunks (unit tests with tiny payloads etc.) override
/// per-instance.
/// </summary>
public int MaxChunkSize { get; set; } = 256 * 1024 - 512;
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, BufferTransferState> _inProgress = new();
/// <summary>
/// Fired when a complete buffer has been received.
/// </summary>
public event Action<string, byte[]>? OnBufferReceived; // bufferId, data
/// <summary>
/// Split a buffer into chunks for transmission.
/// </summary>
public BufferChunk[] CreateChunks(string bufferId, byte[] data)
{
int totalChunks = (data.Length + MaxChunkSize - 1) / MaxChunkSize;
var chunks = new BufferChunk[totalChunks];
for (int i = 0; i < totalChunks; i++)
{
int offset = i * MaxChunkSize;
int length = Math.Min(MaxChunkSize, data.Length - offset);
var chunkData = new byte[length];
Array.Copy(data, offset, chunkData, 0, length);
chunks[i] = new BufferChunk
{
BufferId = bufferId,
ChunkIndex = i,
TotalChunks = totalChunks,
TotalBytes = data.Length,
Data = chunkData,
};
}
return chunks;
}
/// <summary>
/// Process a received chunk. Returns true if the buffer is now complete.
/// </summary>
/// <summary>
/// Maximum allowed total chunks per transfer (prevents OOM from malicious headers).
/// 1GB / 64KB = 16384 chunks max.
/// </summary>
public int MaxTotalChunks { get; set; } = 16384;
/// <summary>
/// Maximum allowed total bytes per transfer (prevents OOM).
/// Default: 1GB.
/// </summary>
public long MaxTotalBytes { get; set; } = 1L * 1024 * 1024 * 1024;
public bool ReceiveChunk(BufferChunk chunk)
{
// SECURITY: Bounds check to prevent OOM from malicious chunk headers
if (chunk.TotalChunks <= 0 || chunk.TotalChunks > MaxTotalChunks)
return false;
if (chunk.TotalBytes <= 0 || chunk.TotalBytes > MaxTotalBytes)
return false;
if (chunk.ChunkIndex < 0 || chunk.ChunkIndex >= chunk.TotalChunks)
return false;
if (chunk.Data == null || chunk.Data.Length == 0 || chunk.Data.Length > MaxChunkSize)
return false;
var state = _inProgress.GetOrAdd(chunk.BufferId, _ => new BufferTransferState
{
BufferId = chunk.BufferId,
TotalChunks = chunk.TotalChunks,
TotalBytes = chunk.TotalBytes,
ReceivedChunks = new byte[chunk.TotalChunks][],
ReceivedCount = 0,
});
// Store chunk (idempotent — handles retransmits). Lock per-state for chunk assembly.
lock (state)
{
if (state.ReceivedChunks[chunk.ChunkIndex] == null)
{
state.ReceivedChunks[chunk.ChunkIndex] = chunk.Data;
state.ReceivedCount++;
}
// Check if complete
if (state.ReceivedCount >= state.TotalChunks)
{
var assembled = Assemble(state);
_inProgress.TryRemove(chunk.BufferId, out _);
OnBufferReceived?.Invoke(chunk.BufferId, assembled);
return true;
}
}
return false;
}
/// <summary>
/// Get transfer progress for a buffer (0.0 to 1.0).
/// </summary>
public double GetProgress(string bufferId)
{
if (_inProgress.TryGetValue(bufferId, out var state))
return (double)state.ReceivedCount / state.TotalChunks;
return 0;
}
/// <summary>
/// Cancel an in-progress transfer.
/// </summary>
public void CancelTransfer(string bufferId)
{
_inProgress.TryRemove(bufferId, out _);
}
/// <summary>
/// Number of in-progress transfers.
/// </summary>
public int ActiveTransfers => _inProgress.Count;
/// <summary>
/// Maximum time (seconds) a transfer can be in progress before cleanup. Default: 120s.
/// </summary>
public int TransferTimeoutSeconds { get; set; } = 120;
/// <summary>
/// Clean up stale transfers that have exceeded the timeout.
/// Returns the number of transfers cleaned up.
/// </summary>
public int CleanupStaleTransfers()
{
var now = DateTime.UtcNow;
int cleaned = 0;
foreach (var kvp in _inProgress)
{
if ((now - kvp.Value.StartTime).TotalSeconds > TransferTimeoutSeconds)
{
if (_inProgress.TryRemove(kvp.Key, out _))
cleaned++;
}
}
return cleaned;
}
private byte[] Assemble(BufferTransferState state)
{
var result = new byte[state.TotalBytes];
int pos = 0;
for (int i = 0; i < state.TotalChunks; i++)
{
var chunk = state.ReceivedChunks[i];
if (chunk == null)
throw new InvalidOperationException(
$"Buffer {state.BufferId} missing chunk {i}/{state.TotalChunks}");
Array.Copy(chunk, 0, result, pos, chunk.Length);
pos += chunk.Length;
}
return result;
}
}
/// <summary>
/// A single chunk of buffer data for transmission.
/// </summary>
public class BufferChunk
{
public string BufferId { get; set; } = "";
public int ChunkIndex { get; set; }
public int TotalChunks { get; set; }
public int TotalBytes { get; set; }
public byte[] Data { get; set; } = Array.Empty<byte>();
}
/// <summary>
/// Tracks an in-progress buffer transfer.
/// </summary>
internal class BufferTransferState
{
public string BufferId { get; set; } = "";
public int TotalChunks { get; set; }
public int TotalBytes { get; set; }
public byte[][] ReceivedChunks { get; set; } = Array.Empty<byte[]>();
public int ReceivedCount { get; set; }
public DateTime StartTime { get; set; } = DateTime.UtcNow;
}