|
15 | 15 | using Vertex.Abstractions.Serialization; |
16 | 16 | using Vertex.Abstractions.Snapshot; |
17 | 17 | using Vertex.Abstractions.Storage; |
| 18 | +using Vertex.Protocol; |
| 19 | +using Vertex.Runtime.Event; |
18 | 20 | using Vertex.Runtime.Exceptions; |
19 | 21 | using Vertex.Runtime.Options; |
20 | 22 |
|
@@ -169,12 +171,53 @@ protected virtual ValueTask CreateSnapshot() |
169 | 171 |
|
170 | 172 | public Task OnNext(Immutable<byte[]> bytes) |
171 | 173 | { |
172 | | - throw new NotImplementedException(); |
| 174 | + return this.OnNext(bytes.Value); |
173 | 175 | } |
174 | 176 |
|
175 | | - public Task OnNext(Immutable<List<byte[]>> items) |
| 177 | + public async Task OnNext(Immutable<List<byte[]>> items) |
176 | 178 | { |
177 | | - throw new NotImplementedException(); |
| 179 | + foreach (var bytes in items.Value) |
| 180 | + { |
| 181 | + await this.OnNext(bytes); |
| 182 | + } |
| 183 | + } |
| 184 | + |
| 185 | + private async Task OnNext(byte[] bytes) |
| 186 | + { |
| 187 | + if (this.TryConvertToEventUnit(bytes, out var data)) |
| 188 | + { |
| 189 | + await this.Tell(data); |
| 190 | + if (this.Logger.IsEnabled(LogLevel.Trace)) |
| 191 | + { |
| 192 | + this.Logger.LogTrace("OnNext completed: {0}->{1}->{2}", this.ActorType.FullName, this.ActorId.ToString(), this.Serializer.Serialize(data)); |
| 193 | + } |
| 194 | + } |
| 195 | + else |
| 196 | + { |
| 197 | + this.Logger.LogError(new ArgumentException(nameof(bytes)), "Deserialization failed"); |
| 198 | + } |
| 199 | + } |
| 200 | + |
| 201 | + protected bool TryConvertToEventUnit(byte[] bytes, out EventUnit<TPrimaryKey> eventUnit) |
| 202 | + { |
| 203 | + if (EventConverter.TryParseWithNoId(bytes, out var transport) && |
| 204 | + this.EventTypeContainer.TryGet(transport.EventName, out var type)) |
| 205 | + { |
| 206 | + var data = this.Serializer.Deserialize(transport.EventBytes, type); |
| 207 | + if (data is IEvent @event) |
| 208 | + { |
| 209 | + var eventMeta = transport.MetaBytes.ParseToEventMeta(); |
| 210 | + eventUnit = new EventUnit<TPrimaryKey> |
| 211 | + { |
| 212 | + ActorId = this.ActorId, |
| 213 | + Meta = eventMeta, |
| 214 | + Event = @event |
| 215 | + }; |
| 216 | + return true; |
| 217 | + } |
| 218 | + } |
| 219 | + eventUnit = default; |
| 220 | + return false; |
178 | 221 | } |
179 | 222 |
|
180 | 223 | protected async ValueTask Tell(EventUnit<TPrimaryKey> eventUnit) |
|
0 commit comments