Skip to content

Commit a5cc8dd

Browse files
authored
Merge pull request #3430 from AElfProject/release/1.4.1
Release v1.4.1
2 parents ff547f2 + 2077337 commit a5cc8dd

4 files changed

Lines changed: 32 additions & 8 deletions

File tree

src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ private async Task<GrpcStreamPeer> DailStreamPeerAsync(GrpcClient client, DnsEnd
240240
try
241241
{
242242
var nodePubkey = (await _accountService.GetPublicKeyAsync()).ToHex();
243-
var call = client.Client.RequestByStream(new CallOptions().WithDeadline(DateTime.MaxValue));
243+
var headers = new Metadata { new(GrpcConstants.GrpcRequestCompressKey, GrpcConstants.GrpcGzipConst) };
244+
var call = client.Client.RequestByStream(new CallOptions().WithHeaders(headers).WithDeadline(DateTime.MaxValue));
244245
var streamPeer = new GrpcStreamPeer(client, remoteEndpoint, connectionInfo, call, null, _streamTaskResourcePool,
245246
new Dictionary<string, string>()
246247
{

src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public override async Task DisconnectAsync(bool gracefulDisconnect)
6060
{
6161
if (!IsConnected) return;
6262
IsConnected = false;
63+
IsClosed = true;
6364
_sendStreamJobs.Complete();
6465
// send disconnect message if the peer is still connected and the connection
6566
// is stable.
@@ -78,14 +79,14 @@ await RequestAsync(() => StreamRequestAsync(MessageType.Disconnect,
7879

7980
public override Task<bool> TryRecoverAsync()
8081
{
81-
return Task.FromResult(true);
82+
return Task.FromResult(false);
8283
}
8384

8485

8586
public override NetworkException HandleRpcException(RpcException exception, string errorMessage)
8687
{
8788
var message = $"Failed request to {this}: {errorMessage}";
88-
var type = NetworkExceptionType.Rpc;
89+
var type = NetworkExceptionType.Rpc;
8990
if (exception.StatusCode ==
9091
// there was an exception, not related to connectivity.
9192
StatusCode.Cancelled)
@@ -101,4 +102,9 @@ public override NetworkException HandleRpcException(RpcException exception, stri
101102

102103
return new NetworkException(message, exception, type);
103104
}
105+
106+
public override string ToString()
107+
{
108+
return $"{{ streamBackPeer listening-port: {RemoteEndpoint}, key: {Info.Pubkey.Substring(0, 45)}... }}";
109+
}
104110
}

src/AElf.OS.Network.Grpc/Peer/GrpcStreamPeer.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class GrpcStreamPeer : GrpcPeer
3333
private StreamSendCallBack _streamSendCallBack;
3434

3535
protected readonly ActionBlock<StreamJob> _sendStreamJobs;
36+
protected bool IsClosed = false;
3637
public ILogger<GrpcStreamPeer> Logger { get; set; }
3738

3839
public GrpcStreamPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInfo peerConnectionInfo,
@@ -54,8 +55,11 @@ public GrpcStreamPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnect
5455

5556
public AsyncDuplexStreamingCall<StreamMessage, StreamMessage> BuildCall()
5657
{
57-
_duplexStreamingCall = _client.RequestByStream(new CallOptions().WithDeadline(DateTime.MaxValue));
58+
if (_client == null) return null;
59+
var headers = new Metadata { new(GrpcConstants.GrpcRequestCompressKey, GrpcConstants.GrpcGzipConst) };
60+
_duplexStreamingCall = _client.RequestByStream(new CallOptions().WithHeaders(headers).WithDeadline(DateTime.MaxValue));
5861
_clientStreamWriter = _duplexStreamingCall.RequestStream;
62+
IsClosed = false;
5963
return _duplexStreamingCall;
6064
}
6165

@@ -75,6 +79,7 @@ public async Task DisposeAsync()
7579
await _duplexStreamingCall?.RequestStream?.CompleteAsync();
7680
_duplexStreamingCall?.Dispose();
7781
_streamListenTaskTokenSource?.Cancel();
82+
IsClosed = true;
7883
}
7984

8085
public override async Task DisconnectAsync(bool gracefulDisconnect)
@@ -104,7 +109,7 @@ public async Task<HandshakeReply> HandShakeAsync(HandshakeRequest request)
104109
var requestId = CommonHelper.GenerateRequestId();
105110
var grpcRequest = new GrpcRequest { ErrorMessage = $"handshake failed.requestId={requestId}" };
106111
var reply = await RequestAsync(() => StreamRequestAsync(MessageType.HandShake, request, metadata, requestId, true), grpcRequest);
107-
return reply != null ? HandshakeReply.Parser.ParseFrom(reply.Message) : new HandshakeReply();
112+
return reply != null ? HandshakeReply.Parser.ParseFrom(reply.Message) : new HandshakeReply() { Error = HandshakeError.InvalidConnection };
108113
}
109114

110115
public override async Task ConfirmHandshakeAsync()
@@ -205,7 +210,8 @@ private async Task WriteStreamJobAsync(StreamJob job)
205210
{
206211
if (job.StreamMessage == null) return;
207212
Logger.LogDebug("write request={requestId} {streamType}-{messageType}", job.StreamMessage.RequestId, job.StreamMessage.StreamType, job.StreamMessage.MessageType);
208-
213+
if (!(job.StreamMessage.StreamType == StreamType.Reply && job.StreamMessage.MessageType == MessageType.RequestBlocks))
214+
_clientStreamWriter.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
209215
await _clientStreamWriter.WriteAsync(job.StreamMessage);
210216
}
211217
catch (RpcException ex)
@@ -215,7 +221,12 @@ private async Task WriteStreamJobAsync(StreamJob job)
215221
}
216222
catch (Exception e)
217223
{
218-
var type = e is InvalidOperationException or TimeoutException ? NetworkExceptionType.PeerUnstable : NetworkExceptionType.HandlerException;
224+
var type = e switch
225+
{
226+
InvalidOperationException => NetworkExceptionType.Unrecoverable,
227+
TimeoutException => NetworkExceptionType.PeerUnstable,
228+
_ => NetworkExceptionType.HandlerException
229+
};
219230
job.SendCallback?.Invoke(
220231
new NetworkException($"{job.StreamMessage?.RequestId}{job.StreamMessage?.StreamType}-{job.StreamMessage?.MessageType} size={job.StreamMessage.ToByteArray().Length} queueCount={_sendStreamJobs.InputCount}", e, type));
221232
await Task.Delay(StreamRecoveryWaitTime);
@@ -237,6 +248,7 @@ protected async Task<TResp> RequestAsync<TResp>(Func<Task<TResp>> func, GrpcRequ
237248

238249
protected async Task<StreamMessage> StreamRequestAsync(MessageType messageType, IMessage message, Metadata header = null, string requestId = null, bool graceful = false)
239250
{
251+
if (IsClosed) return null;
240252
for (var i = 0; i < TimeOutRetryTimes; i++)
241253
{
242254
requestId = requestId == null || i > 0 ? CommonHelper.GenerateRequestId() : requestId;
@@ -352,4 +364,9 @@ public override NetworkException HandleRpcException(RpcException exception, stri
352364

353365
return new NetworkException(message, exception, type);
354366
}
367+
368+
public override string ToString()
369+
{
370+
return $"{{ streamPeer listening-port: {RemoteEndpoint}, key: {Info.Pubkey.Substring(0, 45)}... }}";
371+
}
355372
}

test/AElf.OS.Network.Grpc.Tests/Service/GrpcStreamServiceTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public async Task GrpcStreamPeerTest()
116116
new StreamTaskResourcePool(), new Dictionary<string, string>() { { GrpcConstants.PubkeyMetadataKey, nodePubkey } });
117117
peerback.ConnectionStatus.ShouldBe("Stream Closed");
118118
var res = await peerback.TryRecoverAsync();
119-
res.ShouldBe(true);
119+
res.ShouldBe(false);
120120
var re = peerback.HandleRpcException(new RpcException(new Status(StatusCode.Cancelled, "")), "");
121121
re.ExceptionType.ShouldBe(NetworkExceptionType.Unrecoverable);
122122
re = peerback.HandleRpcException(new RpcException(new Status(StatusCode.Unknown, "")), "");

0 commit comments

Comments
 (0)