Skip to content

Commit b0b2b65

Browse files
committed
Broadcast to all IPv4 subnets for UDP discovery
Replace single IPAddress.Broadcast send with per-interface IPv4 subnet broadcasts to avoid route-selection issues on multi-interface hosts. Added GetBroadcastEndpoints and ComputeBroadcastAddress to compute per-unicast broadcast addresses (skips loopback/tunnel interfaces and non-IPv4 addresses) and fall back to IPAddress.Broadcast if none found. Also handle socket errors for unreachable routes and added NetworkInterface usage.
1 parent f16dc02 commit b0b2b65

1 file changed

Lines changed: 124 additions & 46 deletions

File tree

src/EntglDb.Network/UdpDiscoveryService.cs

Lines changed: 124 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,41 @@
1-
using System;
2-
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
4-
using System.Linq;
5-
using System.Net;
6-
using System.Net.Sockets;
7-
using System.Text;
8-
using System.Text.Json;
9-
using System.Threading;
10-
using System.Threading.Tasks;
11-
using Microsoft.Extensions.Logging;
12-
using EntglDb.Core.Network;
13-
14-
namespace EntglDb.Network;
15-
16-
/// <summary>
17-
/// Provides UDP-based peer discovery for the EntglDb network.
18-
/// Broadcasts presence beacons and listens for other nodes on the local network.
19-
/// </summary>
20-
internal class UdpDiscoveryService : IDiscoveryService
21-
{
22-
private const int DiscoveryPort = 25000;
23-
private readonly ILogger<UdpDiscoveryService> _logger;
24-
private readonly IPeerNodeConfigurationProvider _configProvider;
25-
private readonly ILocalInterestsProvider? _localInterests;
26-
private CancellationTokenSource? _cts;
27-
private readonly ConcurrentDictionary<string, PeerNode> _activePeers = new();
28-
private readonly object _startStopLock = new object();
29-
30-
public UdpDiscoveryService(
31-
IPeerNodeConfigurationProvider peerNodeConfigurationProvider,
32-
ILogger<UdpDiscoveryService> logger,
33-
ILocalInterestsProvider? localInterests = null)
34-
{
35-
_configProvider = peerNodeConfigurationProvider ?? throw new ArgumentNullException(nameof(peerNodeConfigurationProvider));
36-
_localInterests = localInterests;
37-
_logger = logger;
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Net;
6+
using System.Net.NetworkInformation;
7+
using System.Net.Sockets;
8+
using System.Text;
9+
using System.Text.Json;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
using Microsoft.Extensions.Logging;
13+
using EntglDb.Core.Network;
14+
15+
namespace EntglDb.Network;
16+
17+
/// <summary>
18+
/// Provides UDP-based peer discovery for the EntglDb network.
19+
/// Broadcasts presence beacons and listens for other nodes on the local network.
20+
/// </summary>
21+
internal class UdpDiscoveryService : IDiscoveryService
22+
{
23+
private const int DiscoveryPort = 25000;
24+
private readonly ILogger<UdpDiscoveryService> _logger;
25+
private readonly IPeerNodeConfigurationProvider _configProvider;
26+
private readonly ILocalInterestsProvider? _localInterests;
27+
private CancellationTokenSource? _cts;
28+
private readonly ConcurrentDictionary<string, PeerNode> _activePeers = new();
29+
private readonly object _startStopLock = new object();
30+
31+
public UdpDiscoveryService(
32+
IPeerNodeConfigurationProvider peerNodeConfigurationProvider,
33+
ILogger<UdpDiscoveryService> logger,
34+
ILocalInterestsProvider? localInterests = null)
35+
{
36+
_configProvider = peerNodeConfigurationProvider ?? throw new ArgumentNullException(nameof(peerNodeConfigurationProvider));
37+
_localInterests = localInterests;
38+
_logger = logger;
3839
}
3940

4041
/// <summary>
@@ -229,27 +230,37 @@ private async Task BroadcastAsync(CancellationToken token)
229230
using var udp = new UdpClient();
230231
udp.EnableBroadcast = true;
231232

232-
var endpoint = new IPEndPoint(IPAddress.Broadcast, DiscoveryPort);
233-
234233
while (!token.IsCancellationRequested)
235234
{
236235
try
237236
{
238237
// Re-fetch config each time in case it changes (though usually static)
239238
var conf = await _configProvider.GetConfiguration();
240239

241-
var beacon = new DiscoveryBeacon
242-
{
243-
NodeId = conf.NodeId,
244-
TcpPort = conf.TcpPort,
245-
ClusterHash = ComputeClusterHash(conf.AuthToken),
246-
InterestingCollections = _localInterests?.InterestedCollection.ToList() ?? new List<string>()
240+
var beacon = new DiscoveryBeacon
241+
{
242+
NodeId = conf.NodeId,
243+
TcpPort = conf.TcpPort,
244+
ClusterHash = ComputeClusterHash(conf.AuthToken),
245+
InterestingCollections = _localInterests?.InterestedCollection.ToList() ?? new List<string>()
247246
};
248247

249248
var json = JsonSerializer.Serialize(beacon);
250249
var bytes = Encoding.UTF8.GetBytes(json);
251250

252-
await udp.SendAsync(bytes, bytes.Length, endpoint);
251+
// Broadcast on each active IPv4 subnet to avoid route selection issues on multi-interface hosts.
252+
var endpoints = GetBroadcastEndpoints(DiscoveryPort);
253+
foreach (var endpoint in endpoints)
254+
{
255+
try
256+
{
257+
await udp.SendAsync(bytes, bytes.Length, endpoint);
258+
}
259+
catch (SocketException sex) when (sex.SocketErrorCode is SocketError.HostUnreachable or SocketError.NetworkUnreachable)
260+
{
261+
_logger.LogWarning(sex, "UDP Broadcast route unavailable for {Endpoint}", endpoint);
262+
}
263+
}
253264
}
254265
catch (Exception ex)
255266
{
@@ -260,6 +271,73 @@ private async Task BroadcastAsync(CancellationToken token)
260271
}
261272
}
262273

274+
private static IEnumerable<IPEndPoint> GetBroadcastEndpoints(int port)
275+
{
276+
var endpoints = new HashSet<IPAddress>();
277+
278+
foreach (var nic in NetworkInterface.GetAllNetworkInterfaces())
279+
{
280+
if (nic.OperationalStatus != OperationalStatus.Up)
281+
{
282+
continue;
283+
}
284+
285+
if (nic.NetworkInterfaceType is NetworkInterfaceType.Loopback or NetworkInterfaceType.Tunnel)
286+
{
287+
continue;
288+
}
289+
290+
IPInterfaceProperties? properties;
291+
try
292+
{
293+
properties = nic.GetIPProperties();
294+
}
295+
catch
296+
{
297+
continue;
298+
}
299+
300+
foreach (var unicast in properties.UnicastAddresses)
301+
{
302+
var ip = unicast.Address;
303+
var mask = unicast.IPv4Mask;
304+
305+
if (ip.AddressFamily != AddressFamily.InterNetwork || mask == null)
306+
{
307+
continue;
308+
}
309+
310+
if (IPAddress.IsLoopback(ip) || ip.Equals(IPAddress.Any))
311+
{
312+
continue;
313+
}
314+
315+
endpoints.Add(ComputeBroadcastAddress(ip, mask));
316+
}
317+
}
318+
319+
if (endpoints.Count == 0)
320+
{
321+
endpoints.Add(IPAddress.Broadcast);
322+
}
323+
324+
return endpoints.Select(address => new IPEndPoint(address, port));
325+
}
326+
327+
private static IPAddress ComputeBroadcastAddress(IPAddress address, IPAddress mask)
328+
{
329+
var ipBytes = address.GetAddressBytes();
330+
var maskBytes = mask.GetAddressBytes();
331+
var broadcastBytes = new byte[ipBytes.Length];
332+
333+
for (var i = 0; i < ipBytes.Length; i++)
334+
{
335+
broadcastBytes[i] = (byte)(ipBytes[i] | (~maskBytes[i] & 0xFF));
336+
}
337+
338+
return new IPAddress(broadcastBytes);
339+
}
340+
263341
private string ComputeClusterHash(string authToken)
264342
{
265343
if (string.IsNullOrEmpty(authToken)) return "";

0 commit comments

Comments
 (0)