Skip to content

Commit 841eedc

Browse files
authored
Merge pull request #3399 from AElfProject/feature/grpc-bidirectional-streaming
Feature/grpc bidirectional streaming
2 parents a0804d7 + 9c17a41 commit 841eedc

46 files changed

Lines changed: 1754 additions & 366 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

azure-pipelines.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
parameters:
1010
parts: 3
1111
n: 2
12-
codecoverage: true
12+
codecoverage: false
1313
- template: templates/build-template-window.yml
1414
parameters:
1515
parts: 3

protobuf/peer_service.proto

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,27 @@ import "aelf/core.proto";
77

88
service PeerService {
99

10-
rpc Ping (PingRequest) returns (PongReply) {}
11-
rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {}
12-
13-
rpc RequestBlock (BlockRequest) returns (BlockReply) {}
14-
rpc RequestBlocks (BlocksRequest) returns (BlockList) {}
10+
rpc Ping (PingRequest) returns (PongReply) {}
11+
rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {}
1512

16-
rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {}
17-
18-
rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {}
19-
rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {}
13+
rpc RequestBlock (BlockRequest) returns (BlockReply) {}
14+
rpc RequestBlocks (BlocksRequest) returns (BlockList) {}
2015

21-
rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {}
16+
rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {}
2217

23-
rpc GetNodes (NodesRequest) returns (NodeList) {}
18+
rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {}
19+
rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {}
2420

25-
rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {}
26-
rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {}
21+
rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {}
2722

28-
rpc Disconnect (DisconnectReason) returns (VoidReply) {}
23+
rpc RequestByStream (stream StreamMessage) returns (stream StreamMessage) {}
24+
25+
rpc GetNodes (NodesRequest) returns (NodeList) {}
26+
27+
rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {}
28+
rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {}
29+
30+
rpc Disconnect (DisconnectReason) returns (VoidReply) {}
2931
}
3032

3133
// **** No reply *****
@@ -45,3 +47,34 @@ message HealthCheckRequest {
4547
message HealthCheckReply {
4648
}
4749

50+
message StreamMessage {
51+
StreamType stream_type = 1;
52+
MessageType message_type = 2;
53+
string request_id = 3;
54+
bytes message = 4;
55+
map<string, string> meta = 5;
56+
}
57+
58+
enum StreamType {
59+
UNKNOWN = 0;
60+
REQUEST = 1;
61+
REPLY = 2;
62+
}
63+
64+
enum MessageType {
65+
ANY = 0;
66+
67+
HAND_SHAKE = 1;
68+
PING = 2;
69+
CONFIRM_HAND_SHAKE = 3;
70+
HEALTH_CHECK = 4;
71+
REQUEST_BLOCK = 5;
72+
REQUEST_BLOCKS = 6;
73+
GET_NODES = 7;
74+
75+
BLOCK_BROADCAST = 8;
76+
TRANSACTION_BROADCAST = 9;
77+
ANNOUNCEMENT_BROADCAST = 10;
78+
LIB_ANNOUNCEMENT_BROADCAST = 11;
79+
DISCONNECT = 12;
80+
}

src/AElf.Kernel.Core/Blockchain/Application/IBlockValidationProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ await _transactionBlockIndexService.ValidateTransactionBlockIndexExistsInBranchA
157157
block.Header.PreviousBlockHash);
158158
if (!blockIndexExists)
159159
continue;
160-
Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId);
160+
Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId.ToHex());
161161
return false;
162162
}
163163

src/AElf.Kernel.Core/SmartContract/Application/IBlockchainStateService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public async Task MergeBlockStateAsync(long lastIrreversibleBlockHeight, Hash la
158158

159159
Logger.LogDebug(
160160
"Start merge lib height: {LastIrreversibleBlockHeight}, lib block hash: {LastIrreversibleBlockHash}, merge count: {BlockIndexesCount}",
161-
lastIrreversibleBlockHeight, lastIrreversibleBlockHash, blockIndexes.Count);
161+
lastIrreversibleBlockHeight, lastIrreversibleBlockHash.ToHex(), blockIndexes.Count);
162162

163163
foreach (var blockIndex in blockIndexes)
164164
try

src/AElf.Kernel.SmartContractExecution/Application/BlockchainExecutingService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ await _transactionResultService.GetTransactionResultAsync(transactionId, blockHa
112112
== null)
113113
{
114114
Logger.LogWarning(
115-
$"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}");
115+
"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}", blockHash.ToHex(), transactionId.ToHex());
116116

117117
return null;
118118
}

src/AElf.Kernel.Types/Block/BlockHeader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public byte[] GetHashBytes()
3535
private byte[] GetSignatureData()
3636
{
3737
if (!VerifyFields())
38-
throw new InvalidOperationException($"Invalid block header: {this}.");
38+
throw new InvalidOperationException($"Invalid block header: PreviousBlockHash={PreviousBlockHash?.ToHex()}, mtr={MerkleTreeRootOfTransactions?.ToHex()}, ChainId={ChainId}, Height={Height}, Time={Time}.");
3939

4040
if (Signature.IsEmpty)
4141
return this.ToByteArray();

src/AElf.Kernel.Types/KernelConstants.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using Google.Protobuf.WellKnownTypes;
23

34
namespace AElf.Kernel;
@@ -16,4 +17,5 @@ public static class KernelConstants
1617
public const string SignaturePlaceholder = "SignaturePlaceholder";
1718
public const string BlockExecutedDataKey = "BlockExecutedData";
1819
public static Duration AllowedFutureBlockTimeSpan = new() { Seconds = 4 };
20+
public static string SupportStreamMinVersion = "1.4.0.0";
1921
}

src/AElf.Launcher/AElf.Launcher.csproj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
<ServerGarbageCollection>true</ServerGarbageCollection>
66
</PropertyGroup>
77
<ItemGroup>
8-
<ProjectReference Include="..\AElf.Blockchains.MainChain\AElf.Blockchains.MainChain.csproj"/>
9-
<ProjectReference Include="..\AElf.Blockchains.SideChain\AElf.Blockchains.SideChain.csproj"/>
8+
<ProjectReference Include="..\AElf.Blockchains.MainChain\AElf.Blockchains.MainChain.csproj" />
9+
<ProjectReference Include="..\AElf.Blockchains.SideChain\AElf.Blockchains.SideChain.csproj" />
1010
<None Update="Dockerfile">
1111
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
1212
</None>
@@ -15,9 +15,9 @@
1515
</None>
1616
</ItemGroup>
1717
<ItemGroup>
18-
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="5.2.2"/>
19-
<PackageReference Include="Volo.Abp.Autofac" Version="5.2.2"/>
20-
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
18+
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="5.2.2" />
19+
<PackageReference Include="Volo.Abp.Autofac" Version="5.2.2" />
20+
<FrameworkReference Include="Microsoft.AspNetCore.App" />
2121
</ItemGroup>
2222
<ItemGroup>
2323
<Content Include="appsettings.Development.json">

src/AElf.OS.Core/Network/Application/INetworkService.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Collections.Generic;
22
using System.Threading.Tasks;
33
using AElf.Kernel;
4+
using AElf.OS.Network.Infrastructure;
45
using AElf.OS.Network.Types;
56
using AElf.Types;
67

@@ -28,4 +29,5 @@ Task<bool> RemovePeerByPubkeyAsync(string peerPubkey,
2829
Task CheckPeersHealthAsync();
2930
void CheckNtpDrift();
3031
bool IsPeerPoolFull();
32+
Task<List<NodeInfo>> GetNodesAsync(IPeer peer);
3133
}

src/AElf.OS.Core/Network/Application/NetworkService.cs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public Task BroadcastAnnounceAsync(BlockHeader blockHeader)
142142
Logger.LogInformation(ex, $"Could not broadcast announcement to {peer} " +
143143
$"- status {peer.ConnectionStatus}.");
144144

145-
await HandleNetworkException(peer, ex);
145+
await HandleNetworkExceptionAsync(peer, ex);
146146
}
147147
});
148148
}
@@ -171,7 +171,7 @@ public Task BroadcastTransactionAsync(Transaction transaction)
171171
Logger.LogWarning(ex, $"Could not broadcast transaction to {peer} " +
172172
$"- status {peer.ConnectionStatus}.");
173173

174-
await HandleNetworkException(peer, ex);
174+
await HandleNetworkExceptionAsync(peer, ex);
175175
}
176176
});
177177
}
@@ -201,7 +201,7 @@ public Task BroadcastLibAnnounceAsync(Hash libHash, long libHeight)
201201
{
202202
Logger.LogWarning(ex, $"Could not broadcast lib announcement to {peer} " +
203203
$"- status {peer.ConnectionStatus}.");
204-
await HandleNetworkException(peer, ex);
204+
await HandleNetworkExceptionAsync(peer, ex);
205205
}
206206
});
207207
}
@@ -256,7 +256,6 @@ public async Task<Response<List<BlockWithTransactions>>> GetBlocksAsync(Hash pre
256256

257257
if (peer == null)
258258
throw new InvalidOperationException($"Could not find peer {peerPubkey}.");
259-
260259
var response = await Request(peer, p => p.GetBlocksAsync(previousBlock, count));
261260

262261
if (response.Success && response.Payload != null
@@ -343,7 +342,7 @@ private void EnqueueBlock(IPeer peer, BlockWithTransactions blockWithTransaction
343342
if (ex != null)
344343
{
345344
Logger.LogWarning(ex, $"Could not broadcast block to {peer} - status {peer.ConnectionStatus}.");
346-
await HandleNetworkException(peer, ex);
345+
await HandleNetworkExceptionAsync(peer, ex);
347346
}
348347
});
349348
}
@@ -372,13 +371,33 @@ private async Task<Response<T>> Request<T>(IPeer peer, Func<IPeer, Task<T>> func
372371
if (ex.ExceptionType == NetworkExceptionType.HandlerException)
373372
return new Response<T>(default);
374373

375-
await HandleNetworkException(peer, ex);
374+
await HandleNetworkExceptionAsync(peer, ex);
376375
}
377376

378377
return new Response<T>();
379378
}
380379

381-
private async Task HandleNetworkException(IPeer peer, NetworkException exception)
380+
public async Task<List<NodeInfo>> GetNodesAsync(IPeer peer)
381+
{
382+
try
383+
{
384+
var nodeList = await peer.GetNodesAsync();
385+
386+
if (nodeList?.Nodes == null)
387+
return new List<NodeInfo>();
388+
389+
Logger.LogDebug("get nodes: {nodeList} from peer: {peer}.", nodeList, peer);
390+
return nodeList.Nodes.ToList();
391+
}
392+
catch (Exception e)
393+
{
394+
if (e is NetworkException exception) await HandleNetworkExceptionAsync(peer, exception);
395+
Logger.LogWarning(e, "get nodes failed. peer={peer}", peer);
396+
return new List<NodeInfo>();
397+
}
398+
}
399+
400+
private async Task HandleNetworkExceptionAsync(IPeer peer, NetworkException exception)
382401
{
383402
if (exception.ExceptionType == NetworkExceptionType.Unrecoverable)
384403
{
@@ -398,8 +417,9 @@ private async Task RecoverPeerAsync(IPeer peer)
398417
return;
399418

400419
var success = await peer.TryRecoverAsync();
401-
402-
if (!success)
420+
if (success)
421+
await _networkServer.BuildStreamForPeerAsync(peer);
422+
else
403423
await _networkServer.TrySchedulePeerReconnectionAsync(peer);
404424
}
405425

0 commit comments

Comments
 (0)