-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathP2PMemoryBuffer.cs
More file actions
159 lines (145 loc) · 5.36 KB
/
Copy pathP2PMemoryBuffer.cs
File metadata and controls
159 lines (145 loc) · 5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
using ILGPU;
using ILGPU.Runtime;
namespace SpawnDev.ILGPU.P2P;
/// <summary>
/// P2P memory buffer — represents data that may reside on a remote peer device.
/// Tracks data locality for intelligent kernel dispatch routing.
///
/// The buffer maintains a local shadow copy of the data. When the coordinator
/// copies data to this buffer (CopyTo), the shadow is updated and marked dirty.
/// When a kernel dispatch targets the buffer's resident peer, the dirty shadow
/// is sent via WebRTC. When the coordinator reads back (CopyFrom), the latest
/// data is retrieved from the shadow (updated after kernel execution).
///
/// This design avoids blocking the synchronous MemoryBuffer API on async WebRTC.
/// </summary>
public class P2PMemoryBuffer : MemoryBuffer
{
/// <summary>
/// Which peer this buffer's data currently resides on (null = local/host).
/// </summary>
public RemotePeer? ResidentPeer { get; set; }
/// <summary>
/// Whether the local copy is current (not stale from remote writes).
/// </summary>
public bool IsLocalCurrent { get; internal set; } = true;
/// <summary>
/// Local shadow copy of the buffer data (for send/receive staging).
/// Access must be synchronized via ShadowLock.
/// </summary>
public byte[] ShadowData { get; set; }
/// <summary>
/// Whether the shadow has been modified locally and needs to be sent to the peer.
/// </summary>
public bool IsDirty { get; set; }
/// <summary>Lock for thread-safe shadow data access.</summary>
internal readonly object ShadowLock = new();
/// <summary>
/// Unique buffer ID for transport-level tracking.
/// </summary>
public string BufferId { get; } = Guid.NewGuid().ToString("N");
public P2PMemoryBuffer(Accelerator accelerator, long length, int elementSize)
: base(accelerator, length, elementSize)
{
NativePtr = IntPtr.Zero;
ShadowData = new byte[length * elementSize];
}
/// <inheritdoc/>
protected override void MemSet(
AcceleratorStream stream, byte value, in ArrayView<byte> targetView)
{
lock (ShadowLock)
{
Array.Fill(ShadowData, value, 0, (int)Math.Min(targetView.Length, ShadowData.Length));
IsDirty = true;
IsLocalCurrent = true;
}
}
/// <inheritdoc/>
/// <remarks>
/// READ path. ILGPU calls this when a CPU-side caller invokes
/// <c>view.CopyToCPU(...)</c>: <paramref name="sourceView"/> points into THIS
/// buffer's storage (ShadowData) and <paramref name="targetView"/> is the CPU
/// destination. Bytes flow OUT of ShadowData into the target.
/// </remarks>
protected override void CopyTo(
AcceleratorStream stream,
in ArrayView<byte> sourceView,
in ArrayView<byte> targetView)
{
lock (ShadowLock)
{
unsafe
{
var dstPtr = targetView.LoadEffectiveAddressAsPtr();
if (targetView.Length > 0 && ShadowData.Length > 0 && dstPtr != IntPtr.Zero)
{
var copyLen = (int)Math.Min(targetView.Length, ShadowData.Length);
var targetSpan = new Span<byte>((void*)dstPtr, copyLen);
ShadowData.AsSpan(0, copyLen).CopyTo(targetSpan);
}
}
}
}
/// <inheritdoc/>
/// <remarks>
/// WRITE path. ILGPU calls this when a CPU-side caller invokes
/// <c>view.CopyFromCPU(...)</c>: <paramref name="sourceView"/> is the CPU
/// source and <paramref name="targetView"/> points into THIS buffer's
/// storage. Bytes flow IN from the source to ShadowData; mark the buffer
/// dirty so the next dispatch ships fresh data to the resident peer.
/// </remarks>
protected override void CopyFrom(
AcceleratorStream stream,
in ArrayView<byte> sourceView,
in ArrayView<byte> targetView)
{
lock (ShadowLock)
{
unsafe
{
var srcPtr = sourceView.LoadEffectiveAddressAsPtr();
if (sourceView.Length > 0 && ShadowData.Length > 0 && srcPtr != IntPtr.Zero)
{
var copyLen = (int)Math.Min(sourceView.Length, ShadowData.Length);
var sourceSpan = new Span<byte>((void*)srcPtr, copyLen);
sourceSpan.CopyTo(ShadowData.AsSpan(0, copyLen));
IsDirty = true;
IsLocalCurrent = true;
}
}
}
}
/// <summary>
/// Update the shadow data from bytes received from a remote peer (after kernel execution).
/// </summary>
public void UpdateFromRemote(byte[] data)
{
lock (ShadowLock)
{
Array.Copy(data, ShadowData, Math.Min(data.Length, ShadowData.Length));
IsDirty = false;
IsLocalCurrent = true;
}
}
/// <summary>
/// Get the current shadow data for transmission to a remote peer.
/// </summary>
public byte[] GetShadowForTransmission()
{
lock (ShadowLock)
{
IsDirty = false;
return ShadowData.ToArray();
}
}
/// <inheritdoc/>
protected override void DisposeAcceleratorObject(bool disposing)
{
if (disposing)
{
ResidentPeer = null;
ShadowData = Array.Empty<byte>();
}
}
}