Skip to content
This repository was archived by the owner on Nov 8, 2020. It is now read-only.

Commit 941ee30

Browse files
author
uless
committed
使用Immutable<T>提高事件转发的性能
1 parent eef3067 commit 941ee30

6 files changed

Lines changed: 31 additions & 26 deletions

File tree

src/Ray.Core/Core/Abstractions/IConcurrentObserver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ namespace Ray.Core
66
public interface IConcurrentObserver : IObserver
77
{
88
[AlwaysInterleave]
9-
Task ConcurrentOnNext(byte[] bytes);
9+
Task ConcurrentOnNext(Immutable<byte[]> bytes);
1010
}
1111
}

src/Ray.Core/Core/Abstractions/IObserver.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using System.Threading.Tasks;
2+
using Orleans.Concurrency;
23

34
namespace Ray.Core
45
{
56
public interface IObserver
67
{
7-
Task OnNext(byte[] bytes);
8+
Task OnNext(Immutable<byte[]> bytes);
89
Task<long> GetVersion();
910
Task<long> GetAndSaveVersion(long compareVersion);
1011
}

src/Ray.Core/Core/Grains/ConcurrentObserverGrain.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
4-
using System.Threading.Tasks;
5-
using Microsoft.Extensions.DependencyInjection;
1+
using Microsoft.Extensions.DependencyInjection;
62
using Microsoft.Extensions.Logging;
3+
using Orleans.Concurrency;
74
using Ray.Core.Channels;
85
using Ray.Core.Event;
96
using Ray.Core.Exceptions;
107
using Ray.Core.Serialization;
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Linq;
11+
using System.Threading.Tasks;
1112

1213
namespace Ray.Core
1314
{
@@ -32,9 +33,9 @@ public override Task OnDeactivateAsync()
3233
ConcurrentChannel.Complete();
3334
return base.OnDeactivateAsync();
3435
}
35-
public async Task ConcurrentOnNext(byte[] bytes)
36+
public async Task ConcurrentOnNext(Immutable<byte[]> bytes)
3637
{
37-
var (success, transport) = EventBytesTransport.FromBytesWithNoId(bytes);
38+
var (success, transport) = EventBytesTransport.FromBytesWithNoId(bytes.Value);
3839
if (success)
3940
{
4041
var data = Serializer.Deserialize(TypeContainer.GetType(transport.EventType), transport.EventBytes);

src/Ray.Core/Core/Grains/ObserverGrain.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1-
using System;
2-
using System.Linq;
3-
using System.Runtime.CompilerServices;
4-
using System.Threading.Tasks;
5-
using Microsoft.Extensions.DependencyInjection;
1+
using Microsoft.Extensions.DependencyInjection;
62
using Microsoft.Extensions.Logging;
73
using Orleans;
4+
using Orleans.Concurrency;
85
using Ray.Core.Configuration;
96
using Ray.Core.Event;
107
using Ray.Core.Exceptions;
118
using Ray.Core.Serialization;
129
using Ray.Core.Snapshot;
1310
using Ray.Core.Storage;
11+
using System;
12+
using System.Linq;
13+
using System.Runtime.CompilerServices;
14+
using System.Threading.Tasks;
1415

1516
namespace Ray.Core
1617
{
@@ -76,7 +77,7 @@ protected async virtual ValueTask DependencyInjection()
7677
await eventStorageTask;
7778
EventStorage = eventStorageTask.Result;
7879
//创建状态存储器
79-
var followConfigTask = configureBuilder.GetObserverConfig(ServiceProvider,GrainType, GrainId);
80+
var followConfigTask = configureBuilder.GetObserverConfig(ServiceProvider, GrainType, GrainId);
8081
if (!followConfigTask.IsCompletedSuccessfully)
8182
await followConfigTask;
8283
var stateStorageTask = storageFactory.CreateObserverSnapshotStorage(followConfigTask.Result, GrainId);
@@ -188,9 +189,9 @@ protected virtual ValueTask InitFirstSnapshot()
188189
return Consts.ValueTaskDone;
189190
}
190191
#endregion
191-
public Task OnNext(byte[] bytes)
192+
public Task OnNext(Immutable<byte[]> bytes)
192193
{
193-
var (success, transport) = EventBytesTransport.FromBytesWithNoId(bytes);
194+
var (success, transport) = EventBytesTransport.FromBytesWithNoId(bytes.Value);
194195
if (success)
195196
{
196197
var data = Serializer.Deserialize(TypeContainer.GetType(transport.EventType), transport.EventBytes);

src/Ray.Core/Core/Grains/ShadowGrain.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
using System;
2-
using System.Runtime.CompilerServices;
3-
using System.Threading.Tasks;
4-
using Microsoft.Extensions.DependencyInjection;
1+
using Microsoft.Extensions.DependencyInjection;
52
using Microsoft.Extensions.Logging;
63
using Orleans;
4+
using Orleans.Concurrency;
75
using Ray.Core.Configuration;
86
using Ray.Core.Event;
97
using Ray.Core.Exceptions;
108
using Ray.Core.Serialization;
119
using Ray.Core.Snapshot;
1210
using Ray.Core.Storage;
11+
using System;
12+
using System.Runtime.CompilerServices;
13+
using System.Threading.Tasks;
1314

1415
namespace Ray.Core
1516
{
@@ -174,9 +175,9 @@ protected virtual ValueTask CreateSnapshot()
174175
return Consts.ValueTaskDone;
175176
}
176177
#endregion
177-
public Task OnNext(byte[] bytes)
178+
public Task OnNext(Immutable<byte[]> bytes)
178179
{
179-
var (success, transport) = EventBytesTransport.FromBytesWithNoId(bytes);
180+
var (success, transport) = EventBytesTransport.FromBytesWithNoId(bytes.Value);
180181
if (success)
181182
{
182183
var eventType = TypeContainer.GetType(transport.EventType);

src/Ray.Core/Core/Observer/ObserverUnit.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
using Microsoft.Extensions.DependencyInjection;
22
using Orleans;
3+
using Orleans.Concurrency;
34
using Ray.Core.Event;
45
using Ray.Core.Serialization;
56
using System;
7+
using System.Collections.Concurrent;
68
using System.Collections.Generic;
79
using System.Linq;
810
using System.Linq.Expressions;
911
using System.Threading.Tasks;
10-
using System.Collections.Concurrent;
1112

1213
namespace Ray.Core
1314
{
@@ -91,8 +92,8 @@ Task func(byte[] bytes)
9192
{
9293
var observer = GetObserver(observerType, actorId);
9394
if (observer is IConcurrentObserver concurrentObserver)
94-
return concurrentObserver.ConcurrentOnNext(bytes);
95-
return observer.OnNext(bytes);
95+
return concurrentObserver.ConcurrentOnNext(new Immutable<byte[]>(bytes));
96+
return observer.OnNext(new Immutable<byte[]>(bytes));
9697
}
9798
return Task.CompletedTask;
9899
}

0 commit comments

Comments
 (0)