Skip to content

Commit f9164ae

Browse files
committed
Improve API for registering streams
1 parent f2e5ffb commit f9164ae

5 files changed

Lines changed: 102 additions & 97 deletions

File tree

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 9 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class PowerSyncDatabaseOptions() : BasePowerSyncDatabaseOptions()
5353
public Func<IPowerSyncBackendConnector, Remote>? RemoteFactory { get; set; }
5454
}
5555

56-
public class PowerSyncDBEvents : EventManager<PowerSyncDBEvents.IPowerSyncDBEvent>
56+
public class PowerSyncDBEvents : EventManager
5757
{
5858
public interface IPowerSyncDBEvent;
5959

@@ -80,60 +80,18 @@ public class StatusUpdatedEvent(SyncStatusOptions status) : IPowerSyncDBEvent
8080
public EventStream<StatusChangedEvent> OnStatusChanged { get; } = new();
8181
public EventStream<StatusUpdatedEvent> OnStatusUpdated { get; } = new();
8282

83-
public override bool TryGetStream<T>(out EventStream<T> stream)
83+
public PowerSyncDBEvents()
8484
{
85-
if (typeof(T) == typeof(PowerSyncDBEvents.InitializedEvent))
86-
{
87-
stream = (EventStream<T>)(object)OnInitialized;
88-
return true;
89-
}
90-
91-
if (typeof(T) == typeof(PowerSyncDBEvents.ClosingEvent))
92-
{
93-
stream = (EventStream<T>)(object)OnClosing;
94-
return true;
95-
}
96-
97-
if (typeof(T) == typeof(PowerSyncDBEvents.ClosedEvent))
98-
{
99-
stream = (EventStream<T>)(object)OnClosed;
100-
return true;
101-
}
102-
103-
if (typeof(T) == typeof(PowerSyncDBEvents.SchemaChangedEvent))
104-
{
105-
stream = (EventStream<T>)(object)OnSchemaChanged;
106-
return true;
107-
}
108-
109-
if (typeof(T) == typeof(PowerSyncDBEvents.StatusChangedEvent))
110-
{
111-
stream = (EventStream<T>)(object)OnStatusChanged;
112-
return true;
113-
}
114-
115-
if (typeof(T) == typeof(PowerSyncDBEvents.StatusUpdatedEvent))
116-
{
117-
stream = (EventStream<T>)(object)OnStatusUpdated;
118-
return true;
119-
}
120-
121-
stream = null!;
122-
return false;
123-
}
124-
125-
public override void Close()
126-
{
127-
OnInitialized.Close();
128-
OnClosing.Close();
129-
OnClosed.Close();
130-
OnSchemaChanged.Close();
131-
OnStatusChanged.Close();
132-
OnStatusUpdated.Close();
85+
Register(OnInitialized);
86+
Register(OnClosing);
87+
Register(OnClosed);
88+
Register(OnSchemaChanged);
89+
Register(OnStatusChanged);
90+
Register(OnStatusUpdated);
13391
}
13492
}
13593

136-
public interface IPowerSyncDatabase
94+
public interface IPowerSyncDatabase : ICloseableAsync
13795
{
13896
public Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnectionOptions? options = null);
13997
public ISyncStream SyncStream(string name, Dictionary<string, object>? parameters = null);
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
namespace PowerSync.Common.Utils;
2+
3+
public interface IEventManager : ICloseable
4+
{
5+
/// <summary>
6+
/// Registers a new EventStream into the EventManager.
7+
/// </summary>
8+
void Register<T>(EventStream<T> stream);
9+
10+
/// <summary>
11+
/// Attempts to retreive the EventStream associated with events of type T.
12+
/// </summary>
13+
bool TryGetStream<T>(out EventStream<T> stream);
14+
15+
/// <summary>
16+
/// Posts a message to the stream managing events of type T.
17+
/// </summary>
18+
void Emit<T>(T evt);
19+
20+
/// <summary>
21+
/// Attemps to post a message to the stream managing events of type T.
22+
/// </summary>
23+
bool TryEmit<T>(T evt);
24+
}
25+
26+
public class EventManager : IEventManager
27+
{
28+
private readonly Dictionary<Type, object> _streams = new();
29+
30+
public void Register<T>(EventStream<T> stream)
31+
{
32+
_streams[typeof(T)] = stream;
33+
}
34+
35+
public bool TryGetStream<T>(out EventStream<T> stream)
36+
{
37+
if (_streams.TryGetValue(typeof(T), out var streamObj))
38+
{
39+
stream = (EventStream<T>)streamObj;
40+
return true;
41+
}
42+
stream = null!;
43+
return false;
44+
}
45+
46+
public void Emit<T>(T evt)
47+
{
48+
if (TryGetStream<T>(out var stream))
49+
{
50+
stream.Emit(evt);
51+
}
52+
else
53+
{
54+
throw new InvalidOperationException($"No stream emits events of type {nameof(T)}.");
55+
}
56+
}
57+
58+
public bool TryEmit<T>(T evt)
59+
{
60+
if (TryGetStream<T>(out var stream))
61+
{
62+
stream.Emit(evt);
63+
return true;
64+
}
65+
else
66+
{
67+
return false;
68+
}
69+
}
70+
71+
public void Close()
72+
{
73+
foreach (var kvp in _streams)
74+
{
75+
((ICloseable)kvp.Value).Close();
76+
}
77+
_streams.Clear();
78+
}
79+
}
80+

PowerSync/PowerSync.Common/Utils/EventStream.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace PowerSync.Common.Utils;
44
using System.Runtime.CompilerServices;
55
using System.Threading.Channels;
66

7-
public interface IEventStream<T>
7+
public interface IEventStream<T> : ICloseable
88
{
99
void Emit(T item);
1010

@@ -13,8 +13,6 @@ public interface IEventStream<T>
1313
IEnumerable<T> Listen(CancellationToken cancellationToken);
1414

1515
IAsyncEnumerable<T> ListenAsync(CancellationToken cancellationToken);
16-
17-
void Close();
1816
}
1917

2018
public class EventStream<T> : IEventStream<T>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace PowerSync.Common.Utils;
2+
3+
public interface ICloseable
4+
{
5+
public void Close();
6+
}
7+
8+
public interface ICloseableAsync
9+
{
10+
public Task Close();
11+
}
12+

PowerSync/PowerSync.Common/Utils/IEventManager.cs

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)