Skip to content

Commit 9c039ff

Browse files
CopilotmrdevrobotCopilot
authored
[Network] Handler registry: replace hardcoded switch/case in TcpSyncServer with fully DI-driven handler registry (#19)
* Initial plan * Replace hardcoded switch/case in TcpSyncServer with dictionary-based handler registry Co-authored-by: mrdevrobot <12503462+mrdevrobot@users.noreply.github.com> * Allow DI injection of user-defined INetworkMessageHandler instances into TcpSyncServer Co-authored-by: mrdevrobot <12503462+mrdevrobot@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Extract all built-in handlers into standalone INetworkMessageHandler classes Co-authored-by: mrdevrobot <12503462+mrdevrobot@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mrdevrobot <12503462+mrdevrobot@users.noreply.github.com> Co-authored-by: MrDevRobot <mrdevrobot@entgldb.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent a1e0086 commit 9c039ff

12 files changed

Lines changed: 803 additions & 472 deletions
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using EntglDb.Core;
2+
using EntglDb.Core.Storage;
3+
using EntglDb.Network.Proto;
4+
using Google.Protobuf;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
8+
namespace EntglDb.Network.Handlers;
9+
10+
/// <summary>
11+
/// Handles <see cref="MessageType.GetChainRangeReq"/> by returning oplog entries between two chain hashes.
12+
/// Returns a <see cref="ChainRangeResponse"/> with <see cref="ChainRangeResponse.SnapshotRequired"/> set
13+
/// to <c>true</c> when the requested range cannot be filled (e.g. pruned chain).
14+
/// </summary>
15+
internal sealed class GetChainRangeHandler : INetworkMessageHandler
16+
{
17+
private readonly IOplogStore _oplogStore;
18+
19+
public GetChainRangeHandler(IOplogStore oplogStore)
20+
{
21+
_oplogStore = oplogStore;
22+
}
23+
24+
public MessageType MessageType => MessageType.GetChainRangeReq;
25+
26+
public async Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context)
27+
{
28+
var rangeReq = GetChainRangeRequest.Parser.ParseFrom(context.Payload);
29+
var rangeEntries = await _oplogStore.GetChainRangeAsync(rangeReq.StartHash, rangeReq.EndHash, context.CancellationToken);
30+
var rangeRes = new ChainRangeResponse();
31+
32+
if (!rangeEntries.Any() && rangeReq.StartHash != rangeReq.EndHash)
33+
{
34+
// Gap cannot be filled (likely pruned or unknown branch)
35+
rangeRes.SnapshotRequired = true;
36+
}
37+
else
38+
{
39+
foreach (var e in rangeEntries)
40+
{
41+
rangeRes.Entries.Add(new ProtoOplogEntry
42+
{
43+
Collection = e.Collection,
44+
Key = e.Key,
45+
Operation = e.Operation.ToString(),
46+
JsonData = e.Payload?.GetRawText() ?? "",
47+
HlcWall = e.Timestamp.PhysicalTime,
48+
HlcLogic = e.Timestamp.LogicalCounter,
49+
HlcNode = e.Timestamp.NodeId,
50+
Hash = e.Hash,
51+
PreviousHash = e.PreviousHash
52+
});
53+
}
54+
}
55+
return (rangeRes, MessageType.ChainRangeRes);
56+
}
57+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using EntglDb.Core.Storage;
2+
using EntglDb.Network.Proto;
3+
using Google.Protobuf;
4+
using System.Threading.Tasks;
5+
6+
namespace EntglDb.Network.Handlers;
7+
8+
/// <summary>
9+
/// Handles <see cref="MessageType.GetClockReq"/> by returning the latest HLC timestamp from the oplog.
10+
/// </summary>
11+
internal sealed class GetClockHandler : INetworkMessageHandler
12+
{
13+
private readonly IOplogStore _oplogStore;
14+
15+
public GetClockHandler(IOplogStore oplogStore)
16+
{
17+
_oplogStore = oplogStore;
18+
}
19+
20+
public MessageType MessageType => MessageType.GetClockReq;
21+
22+
public async Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context)
23+
{
24+
var clock = await _oplogStore.GetLatestTimestampAsync(context.CancellationToken);
25+
return (new ClockResponse
26+
{
27+
HlcWall = clock.PhysicalTime,
28+
HlcLogic = clock.LogicalCounter,
29+
HlcNode = clock.NodeId
30+
}, MessageType.ClockRes);
31+
}
32+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using EntglDb.Core.Storage;
2+
using EntglDb.Network.Proto;
3+
using Google.Protobuf;
4+
using Microsoft.Extensions.Logging;
5+
using System.IO;
6+
using System.Threading.Tasks;
7+
8+
namespace EntglDb.Network.Handlers;
9+
10+
/// <summary>
11+
/// Handles <see cref="MessageType.GetSnapshotReq"/> by streaming the full database snapshot
12+
/// to the requesting peer in <see cref="MessageType.SnapshotChunkMsg"/> chunks.
13+
/// Returns <c>(null, MessageType.Unknown)</c> because the response is sent directly via
14+
/// <see cref="IMessageHandlerContext.SendMessageAsync"/>.
15+
/// </summary>
16+
internal sealed class GetSnapshotHandler : INetworkMessageHandler
17+
{
18+
private const int ChunkSizeBytes = 80 * 1024; // 80 KB
19+
20+
private readonly ISnapshotService _snapshotService;
21+
private readonly ILogger<GetSnapshotHandler> _logger;
22+
23+
public GetSnapshotHandler(ISnapshotService snapshotService, ILogger<GetSnapshotHandler> logger)
24+
{
25+
_snapshotService = snapshotService;
26+
_logger = logger;
27+
}
28+
29+
public MessageType MessageType => MessageType.GetSnapshotReq;
30+
31+
public async Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context)
32+
{
33+
_logger.LogInformation("Processing GetSnapshotReq from {Endpoint}", context.RemoteEndPoint);
34+
var tempFile = Path.GetTempFileName();
35+
try
36+
{
37+
using (var fs = File.Create(tempFile))
38+
{
39+
await _snapshotService.CreateSnapshotAsync(fs, context.CancellationToken);
40+
}
41+
42+
using (var fs = File.OpenRead(tempFile))
43+
{
44+
byte[] buffer = new byte[ChunkSizeBytes];
45+
int bytesRead;
46+
while ((bytesRead = await fs.ReadAsync(buffer, 0, buffer.Length, context.CancellationToken)) > 0)
47+
{
48+
var chunk = new SnapshotChunk
49+
{
50+
Data = ByteString.CopyFrom(buffer, 0, bytesRead),
51+
IsLast = false
52+
};
53+
await context.SendMessageAsync(MessageType.SnapshotChunkMsg, chunk);
54+
}
55+
56+
// Signal end of snapshot
57+
await context.SendMessageAsync(MessageType.SnapshotChunkMsg, new SnapshotChunk { IsLast = true });
58+
}
59+
}
60+
finally
61+
{
62+
if (File.Exists(tempFile)) File.Delete(tempFile);
63+
}
64+
return (null, MessageType.Unknown);
65+
}
66+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using EntglDb.Core.Storage;
2+
using EntglDb.Network.Proto;
3+
using Google.Protobuf;
4+
using System.Threading.Tasks;
5+
6+
namespace EntglDb.Network.Handlers;
7+
8+
/// <summary>
9+
/// Handles <see cref="MessageType.GetVectorClockReq"/> by returning the full vector clock from the oplog.
10+
/// </summary>
11+
internal sealed class GetVectorClockHandler : INetworkMessageHandler
12+
{
13+
private readonly IOplogStore _oplogStore;
14+
15+
public GetVectorClockHandler(IOplogStore oplogStore)
16+
{
17+
_oplogStore = oplogStore;
18+
}
19+
20+
public MessageType MessageType => MessageType.GetVectorClockReq;
21+
22+
public async Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context)
23+
{
24+
var vectorClock = await _oplogStore.GetVectorClockAsync(context.CancellationToken);
25+
var vcRes = new VectorClockResponse();
26+
foreach (var nodeId in vectorClock.NodeIds)
27+
{
28+
var ts = vectorClock.GetTimestamp(nodeId);
29+
vcRes.Entries.Add(new VectorClockEntry
30+
{
31+
NodeId = nodeId,
32+
HlcWall = ts.PhysicalTime,
33+
HlcLogic = ts.LogicalCounter
34+
});
35+
}
36+
return (vcRes, MessageType.VectorClockRes);
37+
}
38+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using EntglDb.Core;
2+
using EntglDb.Core.Storage;
3+
using EntglDb.Network.Proto;
4+
using Google.Protobuf;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
8+
namespace EntglDb.Network.Handlers;
9+
10+
/// <summary>
11+
/// Handles <see cref="MessageType.PullChangesReq"/> by returning oplog entries since the requested timestamp.
12+
/// </summary>
13+
internal sealed class PullChangesHandler : INetworkMessageHandler
14+
{
15+
private readonly IOplogStore _oplogStore;
16+
17+
public PullChangesHandler(IOplogStore oplogStore)
18+
{
19+
_oplogStore = oplogStore;
20+
}
21+
22+
public MessageType MessageType => MessageType.PullChangesReq;
23+
24+
public async Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context)
25+
{
26+
var pReq = PullChangesRequest.Parser.ParseFrom(context.Payload);
27+
var since = new HlcTimestamp(pReq.SinceWall, pReq.SinceLogic, pReq.SinceNode);
28+
29+
// Use collection filter from request
30+
var filter = pReq.Collections.Any() ? pReq.Collections : null;
31+
var oplog = await _oplogStore.GetOplogAfterAsync(since, filter, context.CancellationToken);
32+
33+
var csRes = new ChangeSetResponse();
34+
foreach (var e in oplog)
35+
{
36+
csRes.Entries.Add(new ProtoOplogEntry
37+
{
38+
Collection = e.Collection,
39+
Key = e.Key,
40+
Operation = e.Operation.ToString(),
41+
JsonData = e.Payload?.GetRawText() ?? "",
42+
HlcWall = e.Timestamp.PhysicalTime,
43+
HlcLogic = e.Timestamp.LogicalCounter,
44+
HlcNode = e.Timestamp.NodeId,
45+
Hash = e.Hash,
46+
PreviousHash = e.PreviousHash
47+
});
48+
}
49+
return (csRes, MessageType.ChangeSetRes);
50+
}
51+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using EntglDb.Core;
2+
using EntglDb.Core.Storage;
3+
using EntglDb.Network.Proto;
4+
using Google.Protobuf;
5+
using Microsoft.Extensions.Logging;
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Text.Json;
9+
using System.Threading.Tasks;
10+
11+
namespace EntglDb.Network.Handlers;
12+
13+
/// <summary>
14+
/// Handles <see cref="MessageType.PushChangesReq"/> by applying a batch of oplog entries from a remote peer.
15+
/// </summary>
16+
internal sealed class PushChangesHandler : INetworkMessageHandler
17+
{
18+
private readonly IOplogStore _oplogStore;
19+
private readonly ILogger<PushChangesHandler> _logger;
20+
21+
public PushChangesHandler(IOplogStore oplogStore, ILogger<PushChangesHandler> logger)
22+
{
23+
_oplogStore = oplogStore;
24+
_logger = logger;
25+
}
26+
27+
public MessageType MessageType => MessageType.PushChangesReq;
28+
29+
public async Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context)
30+
{
31+
var pushReq = PushChangesRequest.Parser.ParseFrom(context.Payload);
32+
var entries = new List<OplogEntry>();
33+
34+
foreach (var e in pushReq.Entries)
35+
{
36+
if (!Enum.TryParse<OperationType>(e.Operation, ignoreCase: true, out var operation))
37+
{
38+
_logger.LogWarning("Failed to parse OperationType from value '{Operation}' in PushChangesReq.", e.Operation);
39+
return (new AckResponse { Success = false }, MessageType.AckRes);
40+
}
41+
42+
entries.Add(new OplogEntry(
43+
e.Collection,
44+
e.Key,
45+
operation,
46+
string.IsNullOrEmpty(e.JsonData) ? (JsonElement?)null : JsonSerializer.Deserialize<JsonElement>(e.JsonData),
47+
new HlcTimestamp(e.HlcWall, e.HlcLogic, e.HlcNode),
48+
e.PreviousHash,
49+
e.Hash
50+
));
51+
}
52+
53+
await _oplogStore.ApplyBatchAsync(entries, context.CancellationToken);
54+
55+
return (new AckResponse { Success = true }, MessageType.AckRes);
56+
}
57+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System.Net;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using EntglDb.Network.Proto;
5+
using Google.Protobuf;
6+
7+
namespace EntglDb.Network;
8+
9+
/// <summary>
10+
/// Context passed to a <see cref="INetworkMessageHandler"/> when a message is received from a remote peer.
11+
/// </summary>
12+
public interface IMessageHandlerContext
13+
{
14+
/// <summary>
15+
/// The raw protobuf payload bytes of the incoming message.
16+
/// Use the corresponding protobuf parser (e.g. <c>MyRequest.Parser.ParseFrom(Payload)</c>) to deserialize.
17+
/// </summary>
18+
byte[] Payload { get; }
19+
20+
/// <summary>
21+
/// The remote endpoint of the connected client.
22+
/// </summary>
23+
EndPoint? RemoteEndPoint { get; }
24+
25+
/// <summary>
26+
/// Cancellation token for the operation.
27+
/// </summary>
28+
CancellationToken CancellationToken { get; }
29+
30+
/// <summary>
31+
/// Sends a message directly to the connected client on the current stream.
32+
/// Use this for streaming responses (e.g. multi-chunk transfers) where the handler
33+
/// writes its own response rather than returning a single <see cref="IMessage"/>.
34+
/// </summary>
35+
/// <param name="type">The <see cref="MessageType"/> of the outgoing message.</param>
36+
/// <param name="message">The protobuf message to send.</param>
37+
/// <param name="useCompression">Whether to compress the message payload. Defaults to <c>false</c>.</param>
38+
Task SendMessageAsync(MessageType type, IMessage message, bool useCompression = false);
39+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using System.Threading.Tasks;
2+
using EntglDb.Network.Proto;
3+
using Google.Protobuf;
4+
5+
namespace EntglDb.Network;
6+
7+
/// <summary>
8+
/// Defines a network message handler for the EntglDb sync server.
9+
/// Both built-in core handlers and user-supplied custom handlers implement this interface.
10+
/// All implementations are registered in the DI container and are collected by
11+
/// <see cref="TcpSyncServer"/> at construction to build the handler dispatch registry.
12+
/// </summary>
13+
/// <remarks>
14+
/// <para>
15+
/// Register user-defined handlers <em>after</em> calling <c>AddEntglDbNetwork</c> so that
16+
/// they appear last in the DI collection and can override core handlers for the same
17+
/// <see cref="MessageType"/>:
18+
/// </para>
19+
/// <code>
20+
/// services.AddEntglDbNetwork&lt;MyConfigProvider&gt;();
21+
/// services.AddSingleton&lt;INetworkMessageHandler, MyCustomHandler&gt;(); // added after → takes precedence
22+
/// </code>
23+
/// <para>
24+
/// Return <c>(null, MessageType.Unknown)</c> when the handler streams its response directly
25+
/// via <see cref="IMessageHandlerContext.SendMessageAsync"/> (i.e. no further response needs
26+
/// to be sent by the dispatcher).
27+
/// </para>
28+
/// </remarks>
29+
public interface INetworkMessageHandler
30+
{
31+
/// <summary>
32+
/// The <see cref="MessageType"/> this handler is responsible for processing.
33+
/// </summary>
34+
MessageType MessageType { get; }
35+
36+
/// <summary>
37+
/// Handles an incoming message and returns an optional response.
38+
/// </summary>
39+
/// <param name="context">Context containing the raw payload, remote endpoint, and cancellation token.</param>
40+
/// <returns>
41+
/// A tuple of the response message and its <see cref="MessageType"/>.
42+
/// Return <c>(null, MessageType.Unknown)</c> if the handler sends the response itself (streaming).
43+
/// </returns>
44+
Task<(IMessage? Response, MessageType ResponseType)> HandleAsync(IMessageHandlerContext context);
45+
}

0 commit comments

Comments
 (0)