-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathP2PDispatcher.cs
More file actions
613 lines (539 loc) · 22.4 KB
/
Copy pathP2PDispatcher.cs
File metadata and controls
613 lines (539 loc) · 22.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
namespace SpawnDev.ILGPU.P2P;
/// <summary>
/// Manages kernel dispatch across peers with fault tolerance.
/// Handles peer loss mid-computation, automatic retry, and rebalancing.
///
/// Resilience model:
/// - Every dispatch is tracked with a timeout
/// - If a peer disconnects or times out, work is re-dispatched to another peer
/// - Buffer data has local shadow copies for recovery
/// - New peers are picked up on next dispatch (no mid-kernel rebalancing)
/// - Heartbeat monitoring detects stale peers before they fail
/// </summary>
public class P2PDispatcher : IDisposable
{
private bool _disposed;
private readonly P2PAccelerator _accelerator;
private readonly Dictionary<string, PendingDispatch> _pending = new();
private readonly object _lock = new();
private Timer? _heartbeatTimer;
/// <summary>
/// Per-attempt budget for kernel compute + result push-back from the worker
/// (ms). The cumulative dispatch budget is <see cref="DispatchTimeoutMs"/> *
/// <see cref="MaxRetries"/>; a single dispatch's <see cref="DispatchAsync"/>
/// throws <see cref="TimeoutException"/> after that elapsed. 60s per attempt
/// covers realistic compute + multi-MB result push-back over LAN-loopback
/// SCTP at ~1.5 MB/s; previously 30s, which was too tight for >1MB result
/// buffers and 10-way concurrent dispatch (workers serialize per-peer message
/// processing, so 5+ concurrent dispatches per peer accumulate latency).
/// </summary>
public int DispatchTimeoutMs { get; set; } = 60_000;
/// <summary>
/// Cumulative-budget multiplier on <see cref="DispatchTimeoutMs"/>. Naming
/// is historical: actual peer-loss retries (see <see cref="HandlePeerLost"/>)
/// fire only when a peer disconnects mid-dispatch; the multiplier here just
/// expands the wait budget on the result task to absorb realistic compute +
/// transfer variance. 3 -> total per-dispatch budget = 180s with default
/// <see cref="DispatchTimeoutMs"/>.
/// </summary>
public int MaxRetries { get; set; } = 3;
/// <summary>
/// Coordinator's public key (base64 SPKI) for dispatch authentication.
/// Set from the coordinator's SwarmIdentity. Included in every dispatch
/// so workers can verify the sender has Coordinator+ authority.
/// </summary>
public string? CoordinatorPublicKey { get; set; }
/// <summary>
/// Heartbeat interval (ms). Peers that miss 3 consecutive heartbeats are marked stale.
/// </summary>
public int HeartbeatIntervalMs { get; set; } = 5_000;
/// <summary>
/// Fired when a dispatch fails permanently (all retries exhausted).
/// </summary>
public event Action<string, string>? OnDispatchFailed; // dispatchId, error
/// <summary>
/// Fired when a dispatch is retried on a different peer.
/// </summary>
public event Action<string, string, string>? OnDispatchRetried; // dispatchId, failedPeerId, newPeerId
/// <summary>
/// Fired when a peer is proactively evicted (thermal/battery critical).
/// Work is gracefully handed off to healthier peers before the device drops.
/// </summary>
public event Action<string, int>? OnPeerEvicted; // peerId, dispatchesMoved
public P2PDispatcher(P2PAccelerator accelerator)
{
_accelerator = accelerator;
}
/// <summary>
/// Start heartbeat monitoring.
/// </summary>
public void StartMonitoring()
{
_heartbeatTimer = new Timer(CheckHeartbeats, null, HeartbeatIntervalMs, HeartbeatIntervalMs);
}
/// <summary>
/// Stop monitoring.
/// </summary>
public void StopMonitoring()
{
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;
}
/// <summary>
/// Dispatch a kernel to the best available peer.
/// Returns the dispatch ID for tracking.
/// </summary>
public string Dispatch(KernelDispatchRequest request,
IReadOnlyDictionary<string, byte[]>? inputBuffers = null,
string[]? preferredBufferIds = null)
{
var peer = SelectHealthyPeer(preferredBufferIds);
if (peer == null)
throw new InvalidOperationException("No healthy peers available for dispatch");
var pending = new PendingDispatch
{
DispatchId = request.DispatchId,
Request = request,
InputBuffers = inputBuffers,
AssignedPeer = peer,
StartTime = DateTime.UtcNow,
Attempts = 1,
};
lock (_lock)
{
_pending[request.DispatchId] = pending;
}
peer.IncrementPending();
// Fire-and-forget path can't await the buffer sends synchronously - start them
// now, let them overlap with caller's post-return work. The receive-side per-peer
// ordered queue (rc.12) guarantees chunks drain before the KernelDispatch message.
_ = SendInputBuffersAsync(peer, inputBuffers);
SendDispatchToPeer(peer, request);
return request.DispatchId;
}
/// <summary>
/// Dispatch a kernel and await the result.
/// Returns the dispatch result when the peer completes execution.
/// Throws on timeout or permanent failure.
///
/// When <paramref name="inputBuffers"/> is provided, each buffer is transmitted
/// to the selected peer BEFORE the KernelDispatch message fires. This closes
/// the long-standing library gap where <c>DispatchAsync</c> took
/// <c>(bufferId, data, elementSize)</c> tuples at the accelerator layer but
/// the raw <c>data</c> never reached the peer. Stored on the <see cref="PendingDispatch"/>
/// so a retry can re-ship the inputs to the replacement peer.
/// </summary>
public async Task<KernelDispatchResult> DispatchAsync(
KernelDispatchRequest request,
IReadOnlyDictionary<string, byte[]>? inputBuffers = null,
string[]? preferredBufferIds = null)
{
var peer = SelectHealthyPeer(preferredBufferIds);
if (peer == null)
throw new InvalidOperationException("No healthy peers available for dispatch");
var tcs = new TaskCompletionSource<KernelDispatchResult>();
var pending = new PendingDispatch
{
DispatchId = request.DispatchId,
Request = request,
InputBuffers = inputBuffers,
AssignedPeer = peer,
StartTime = DateTime.UtcNow,
Attempts = 1,
CompletionSource = tcs,
};
lock (_lock)
{
_pending[request.DispatchId] = pending;
}
peer.IncrementPending();
await SendInputBuffersAsync(peer, inputBuffers).ConfigureAwait(false);
SendDispatchToPeer(peer, request);
// Await with timeout
using var cts = new CancellationTokenSource(DispatchTimeoutMs * MaxRetries);
try
{
return await tcs.Task.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
throw new TimeoutException(
$"P2P dispatch {request.DispatchId} timed out after {DispatchTimeoutMs * MaxRetries}ms");
}
}
/// <summary>
/// Ship every provided input buffer to the selected peer via <see cref="OnSendBuffer"/>
/// (wired to <c>P2PTransport.SendBufferAsync</c> by the facade). Awaits all sends
/// in parallel; the receive side's per-peer ordered queue guarantees they drain
/// before the following KernelDispatch message starts processing.
/// </summary>
private async Task SendInputBuffersAsync(RemotePeer peer, IReadOnlyDictionary<string, byte[]>? inputBuffers)
{
if (inputBuffers == null || inputBuffers.Count == 0) return;
var handler = OnSendBuffer;
if (handler == null) return;
var sends = inputBuffers
.Where(kv => kv.Value != null)
.Select(kv => handler(peer.PeerId, kv.Key, kv.Value))
.ToArray();
if (sends.Length > 0)
await Task.WhenAll(sends).ConfigureAwait(false);
}
/// <summary>
/// Handle a peer reporting kernel completion.
/// </summary>
public void HandleResult(string dispatchId, KernelDispatchResult result)
{
PendingDispatch? pending;
lock (_lock)
{
if (!_pending.TryGetValue(dispatchId, out pending))
return;
}
pending.AssignedPeer.DecrementPending();
if (result.Success)
{
pending.AssignedPeer.RecordSuccess(result.DurationMs);
lock (_lock) { _pending.Remove(dispatchId); }
pending.CompletionSource?.TrySetResult(result);
}
else
{
pending.AssignedPeer.RecordFailure();
// Execution failed on peer — retry on another peer
RetryDispatch(pending, $"Peer {pending.AssignedPeer.PeerId} reported error: {result.Error}");
}
}
/// <summary>
/// Handle a peer disconnecting — retry all its pending work.
/// </summary>
public void HandlePeerLost(string peerId)
{
List<PendingDispatch> affected;
lock (_lock)
{
affected = _pending.Values
.Where(p => p.AssignedPeer.PeerId == peerId)
.ToList();
}
foreach (var dispatch in affected)
{
dispatch.AssignedPeer.DecrementPending();
RetryDispatch(dispatch, $"Peer {peerId} disconnected");
}
}
/// <summary>
/// Handle a new peer joining — no immediate action needed.
/// New peers will be selected on the next dispatch via SelectHealthyPeer.
/// Active dispatches are NOT rebalanced mid-execution.
/// </summary>
public void HandlePeerJoined(RemotePeer peer)
{
// New peer is immediately available for future dispatches.
// No rebalancing of in-flight work — that would cause more harm than good.
}
/// <summary>
/// Process a heartbeat from a peer.
/// </summary>
public void HandleHeartbeat(string peerId)
{
var peer = _accelerator.Peers.FirstOrDefault(p => p.PeerId == peerId);
if (peer != null)
{
peer.LastHeartbeat = DateTime.UtcNow;
}
}
private void RetryDispatch(PendingDispatch dispatch, string reason)
{
if (dispatch.Attempts >= MaxRetries)
{
lock (_lock) { _pending.Remove(dispatch.DispatchId); }
OnDispatchFailed?.Invoke(dispatch.DispatchId, $"Max retries exceeded. Last: {reason}");
dispatch.CompletionSource?.TrySetException(
new Exception($"P2P dispatch failed after {MaxRetries} attempts: {reason}"));
return;
}
var newPeer = SelectHealthyPeer(exclude: dispatch.AssignedPeer.PeerId);
if (newPeer == null)
{
lock (_lock) { _pending.Remove(dispatch.DispatchId); }
OnDispatchFailed?.Invoke(dispatch.DispatchId, $"No healthy peers available for retry. {reason}");
dispatch.CompletionSource?.TrySetException(
new Exception($"P2P dispatch failed, no peers for retry: {reason}"));
return;
}
var failedPeerId = dispatch.AssignedPeer.PeerId;
dispatch.Attempts++;
dispatch.AssignedPeer = newPeer;
dispatch.StartTime = DateTime.UtcNow;
newPeer.IncrementPending();
OnDispatchRetried?.Invoke(dispatch.DispatchId, failedPeerId, newPeer.PeerId);
// Re-ship input buffers to the replacement peer before re-dispatching so the
// new worker has the data the original one was holding. Fire-and-forget on the
// buffer task is acceptable here; the per-peer ordered queue at the receiver
// guarantees the buffer chunks arrive ahead of the re-sent KernelDispatch.
_ = SendInputBuffersAsync(newPeer, dispatch.InputBuffers);
SendDispatchToPeer(newPeer, dispatch.Request);
}
private RemotePeer? SelectHealthyPeer(string[]? preferredBufferIds = null, string? exclude = null)
{
var candidates = _accelerator.Peers
.Where(p => p.IsConnected && p.PeerId != exclude && !IsStale(p))
.Select(p => (peer: p, score: ScorePeer(p)))
.Where(x => x.score > 0.0) // Exclude thermally critical / dead battery peers
.OrderByDescending(x => x.score)
.ToList();
if (candidates.Count == 0) return null;
return candidates.First().peer;
}
/// <summary>
/// Score a peer for dispatch selection. Higher = better.
/// Balances compute power, capacity, reliability, and thermal/battery health.
/// A peer that overheats and drops is worse than a slower peer that stays up.
/// </summary>
/// <summary>
/// Exposes peer scoring for diagnostics and testing.
/// </summary>
public double ScorePeer(RemotePeer peer)
{
var caps = peer.Capabilities;
double tflops = caps?.EstimatedTflops ?? 1.0;
double memory = caps?.AvailableMemory ?? 0;
int pending = peer.PendingOperations;
// Ability: TFLOPS normalized (assume max ~20 TFLOPS for a desktop GPU)
double abilityScore = Math.Min(tflops / 20.0, 1.0);
// Load: penalize peers with many pending operations
double loadScore = 1.0 / (1.0 + pending);
// Reliability: peers with recent heartbeats score higher
double reliabilityScore = 1.0;
if (peer.LastHeartbeat != DateTime.MinValue)
{
double secsSinceHeartbeat = (DateTime.UtcNow - peer.LastHeartbeat).TotalSeconds;
reliabilityScore = Math.Max(0.1, 1.0 - (secsSinceHeartbeat / (HeartbeatIntervalMs / 1000.0 * 3)));
}
// Memory: bonus for peers with more VRAM (can handle larger tensors)
double memoryScore = Math.Min(memory / (8.0 * 1024 * 1024 * 1024), 1.0);
// Thermal/Battery: penalize peers at risk of dropping off
double healthScore = 1.0;
if (caps != null)
{
// Thermal throttling: 0=nominal(1.0), 1=fair(0.7), 2=serious(0.3), 3=critical(0.0)
healthScore *= caps.ThermalState switch
{
0 => 1.0,
1 => 0.7,
2 => 0.3,
3 => 0.0, // critical — don't send work, it'll crash
_ => 0.5,
};
// Battery: penalize discharging devices with low battery
if (!caps.IsCharging && caps.BatteryLevel >= 0)
{
if (caps.BatteryLevel < 0.1)
healthScore *= 0.1; // <10% battery, about to die
else if (caps.BatteryLevel < 0.2)
healthScore *= 0.4; // low battery
else if (caps.BatteryLevel < 0.5)
healthScore *= 0.7; // moderate battery
// >50% on battery is fine, no penalty
}
// Charging devices get no battery penalty
}
// Reputation: dispatch history — success rate + identity strength
double reputationScore = peer.Reputation;
// Weighted combination: health acts as a multiplier, not an additive factor.
// A thermally critical peer gets zero score regardless of TFLOPS.
double baseScore = (abilityScore * 0.35) + (loadScore * 0.25) +
(reliabilityScore * 0.15) + (memoryScore * 0.10) +
(reputationScore * 0.15);
return baseScore * healthScore;
}
private bool IsStale(RemotePeer peer)
{
if (peer.LastHeartbeat == DateTime.MinValue) return false; // never sent heartbeat yet
return (DateTime.UtcNow - peer.LastHeartbeat).TotalMilliseconds > HeartbeatIntervalMs * 3;
}
private void CheckHeartbeats(object? state)
{
try
{
var now = DateTime.UtcNow;
// Check for timed-out dispatches
List<PendingDispatch> timedOut;
lock (_lock)
{
timedOut = _pending.Values
.Where(p => (now - p.StartTime).TotalMilliseconds > DispatchTimeoutMs)
.ToList();
}
foreach (var dispatch in timedOut)
{
dispatch.AssignedPeer.DecrementPending();
RetryDispatch(dispatch, "Dispatch timed out");
}
// Check for stale peers
foreach (var peer in _accelerator.Peers.Where(p => p.IsConnected && IsStale(p)).ToList())
{
HandlePeerLost(peer.PeerId);
}
// Proactive thermal eviction: if a peer's capabilities degrade,
// move its heavy work to healthier peers before it crashes.
foreach (var peer in _accelerator.Peers.Where(p => p.IsConnected).ToList())
{
if (ShouldEvict(peer))
{
InitiateGracefulHandoff(peer);
}
}
}
catch (Exception)
{
// Timer callback must never throw — a crash here silently kills heartbeat monitoring.
// Swallow and continue; the next tick will retry.
}
}
/// <summary>
/// Determines if a peer should have its work evicted proactively.
/// Triggers on thermal critical, battery critical, or TFLOPS degradation.
/// </summary>
private bool ShouldEvict(RemotePeer peer)
{
var caps = peer.Capabilities;
if (caps == null) return false;
// Thermal critical — evict immediately
if (caps.ThermalState >= 3) return true;
// Battery about to die — evict if discharging below 5%
if (!caps.IsCharging && caps.BatteryLevel >= 0 && caps.BatteryLevel < 0.05)
return true;
return false;
}
/// <summary>
/// Graceful handoff: move a peer's pending work to healthier peers
/// before the peer drops off the network. The peer's current buffer state
/// is preserved (signed via BEP 46) so the receiving peer can continue
/// from where the evicted peer left off.
/// </summary>
private void InitiateGracefulHandoff(RemotePeer evictedPeer)
{
List<PendingDispatch> toMove;
lock (_lock)
{
toMove = _pending.Values
.Where(p => p.AssignedPeer.PeerId == evictedPeer.PeerId)
.ToList();
}
if (toMove.Count == 0) return;
OnPeerEvicted?.Invoke(evictedPeer.PeerId, toMove.Count);
foreach (var dispatch in toMove)
{
dispatch.AssignedPeer.DecrementPending();
RetryDispatch(dispatch, $"Graceful handoff from {evictedPeer.PeerId} (thermal/battery)");
}
}
/// <summary>
/// Update a peer's capabilities (called when peer sends updated status).
/// Enables real-time thermal/battery monitoring.
/// </summary>
public void UpdatePeerCapabilities(string peerId, PeerCapabilities capabilities)
{
var peer = _accelerator.Peers.FirstOrDefault(p => p.PeerId == peerId);
if (peer != null)
{
peer.Capabilities = capabilities;
peer.LastHeartbeat = DateTime.UtcNow;
}
}
private void SendDispatchToPeer(RemotePeer peer, KernelDispatchRequest request)
{
// Include coordinator's public key for worker-side authority verification
if (CoordinatorPublicKey != null)
request.CoordinatorPublicKey = CoordinatorPublicKey;
var message = new P2PMessage
{
Type = P2PMessageType.KernelDispatch,
Payload = System.Text.Json.JsonSerializer.SerializeToElement(request),
};
// Fire event for transport layer to send via WebRTC
OnSendMessage?.Invoke(peer.PeerId, message);
}
/// <summary>
/// Fired when the dispatcher needs to send a message to a peer.
/// Wire this to P2PTransport.SendMessageAsync.
/// </summary>
public event Action<string, P2PMessage>? OnSendMessage;
/// <summary>
/// Fired when the dispatcher needs to ship an input buffer to a peer.
/// Wire this to P2PTransport.SendBufferAsync so DispatchAsync's
/// (bufferId, data, elementSize) tuples actually transmit the data.
/// </summary>
public event Func<string, string, byte[], Task>? OnSendBuffer;
/// <inheritdoc/>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
StopMonitoring();
}
/// <summary>
/// Get a snapshot of all pending dispatches (for coordinator transfer).
/// </summary>
public PendingDispatchInfo[] GetPendingSnapshot()
{
lock (_lock)
{
return _pending.Values.Select(p => new PendingDispatchInfo
{
DispatchId = p.DispatchId,
Request = p.Request,
AssignedPeerId = p.AssignedPeer.PeerId,
Attempts = p.Attempts,
}).ToArray();
}
}
/// <summary>
/// Accept pending dispatch state from a coordinator transfer.
/// The new coordinator takes over tracking these dispatches.
/// </summary>
public void HandlePendingTransfer(PendingDispatchInfo info)
{
var peer = _accelerator.Peers.FirstOrDefault(p => p.PeerId == info.AssignedPeerId);
if (peer == null) return; // Peer not connected to us — will be retried when they reconnect
var pending = new PendingDispatch
{
DispatchId = info.DispatchId,
Request = info.Request,
AssignedPeer = peer,
StartTime = DateTime.UtcNow,
Attempts = info.Attempts,
};
lock (_lock)
{
_pending[info.DispatchId] = pending;
}
}
/// <summary>
/// Number of pending dispatches currently tracked.
/// </summary>
public int PendingCount
{
get { lock (_lock) { return _pending.Count; } }
}
}
/// <summary>
/// Tracks an in-flight kernel dispatch.
/// </summary>
internal class PendingDispatch
{
public string DispatchId { get; set; } = "";
public KernelDispatchRequest Request { get; set; } = new();
/// <summary>
/// Input buffer payloads captured at dispatch time so a retry to a different
/// peer can re-ship them. Key = bufferId, value = raw bytes. Null when the
/// caller used the fire-and-forget path without providing inputs.
/// </summary>
public IReadOnlyDictionary<string, byte[]>? InputBuffers { get; set; }
public RemotePeer AssignedPeer { get; set; } = new();
public DateTime StartTime { get; set; }
public int Attempts { get; set; }
public TaskCompletionSource<KernelDispatchResult>? CompletionSource { get; set; }
}