forked from OrleansContrib/Orleankka
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathInfrastructure.cs
More file actions
55 lines (46 loc) · 1.54 KB
/
Infrastructure.cs
File metadata and controls
55 lines (46 loc) · 1.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleankka;
using Orleankka.Meta;
namespace Example
{
public abstract class CqsActor : Actor
{
public override Task<object> OnReceive(object message)
{
var cmd = message as Command;
if (cmd != null)
return HandleCommand(cmd);
var query = message as Query;
if (query != null)
return HandleQuery(query);
throw new InvalidOperationException("Unknown message type: " + message.GetType());
}
protected abstract Task<object> HandleCommand(Command cmd);
protected abstract Task<object> HandleQuery(Query query);
}
public abstract class EventSourcedActor : CqsActor
{
protected override async Task<object> HandleCommand(Command cmd)
{
var events = await Dispatch<IEnumerable<Event>>(cmd);
var stream = System.StreamOf("sms", $"{GetType().Name}-{Id}");
foreach (var @event in events)
{
await Dispatch(@event);
await stream.Push(Wrap(@event));
}
return events;
}
private object Wrap(Event @event)
{
var envelopeType = typeof(EventEnvelope<>).MakeGenericType(@event.GetType());
return Activator.CreateInstance(envelopeType, Id, @event);
}
protected override Task<object> HandleQuery(Query query)
{
return Dispatch(query);
}
}
}