Skip to content

Commit 9d0de9e

Browse files
authored
feat: optionally configure stream method to use a private channel (#1357)
* feat: optionally configure stream method to use a private channel close #1311 * test: add missing mock tests * refactor: move to dedicated argument in stream method
1 parent 74bb172 commit 9d0de9e

4 files changed

Lines changed: 49 additions & 5 deletions

File tree

packages/supabase/lib/src/supabase_query_builder.dart

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
2525

2626
/// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream].
2727
///
28-
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup).
28+
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/subscribing-to-database-changes#enable-postgres-changes).
2929
///
3030
/// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates.
3131
///
32+
/// The underlying [RealtimeChannel] is public by default. Set [private] to `true` to make it private, which requires additional RLS policies to be set up. See https://supabase.com/docs/guides/realtime/authorization for more details.
33+
///
3234
/// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed.
3335
///
3436
/// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream.
@@ -43,7 +45,10 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
4345
/// ```dart
4446
/// supabase.from('chats').stream(primaryKey: ['id']).eq('room_id','123').order('created_at').limit(20).listen(_onChatsReceived);
4547
/// ```
46-
SupabaseStreamFilterBuilder stream({required List<String> primaryKey}) {
48+
SupabaseStreamFilterBuilder stream({
49+
required List<String> primaryKey,
50+
bool private = false,
51+
}) {
4752
assert(primaryKey.isNotEmpty, 'Please specify primary key column(s).');
4853
return SupabaseStreamFilterBuilder(
4954
queryBuilder: this,
@@ -52,6 +57,7 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
5257
schema: _schema,
5358
table: _table,
5459
primaryKey: primaryKey,
60+
private: private,
5561
);
5662
}
5763
}

packages/supabase/lib/src/supabase_stream_builder.dart

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
5353

5454
final String _realtimeTopic;
5555

56+
/// Whether the underlying [_channel] should be initialized as private
57+
/// or not. Default is false, which means the channel is public.
58+
final bool _private;
59+
5660
RealtimeChannel? _channel;
5761

5862
final String _schema;
@@ -89,12 +93,14 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
8993
required String schema,
9094
required String table,
9195
required List<String> primaryKey,
96+
required bool private,
9297
}) : _queryBuilder = queryBuilder,
9398
_realtimeTopic = realtimeTopic,
9499
_realtimeClient = realtimeClient,
95100
_schema = schema,
96101
_table = table,
97-
_uniqueColumns = primaryKey;
102+
_uniqueColumns = primaryKey,
103+
_private = private;
98104

99105
/// Orders the result with the specified [column].
100106
///
@@ -167,7 +173,12 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
167173
);
168174
}
169175

170-
_channel = _realtimeClient.channel(_realtimeTopic);
176+
_channel = _realtimeClient.channel(
177+
_realtimeTopic,
178+
RealtimeChannelConfig(
179+
private: _private,
180+
),
181+
);
171182

172183
_channel!
173184
.onPostgresChanges(

packages/supabase/lib/src/supabase_stream_filter_builder.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
88
required super.schema,
99
required super.table,
1010
required super.primaryKey,
11+
required super.private,
1112
});
1213

1314
/// Filters the results where [column] equals [value].

packages/supabase/test/mock_test.dart

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ void main() {
2323
Future<void> handleRequests(
2424
HttpServer server, {
2525
String? expectedFilter,
26+
bool? expectedPrivate,
2627
}) async {
2728
await for (final HttpRequest request in server) {
2829
final headers = request.headers;
@@ -113,8 +114,9 @@ void main() {
113114
final requestJson = jsonDecode(request);
114115
final topic = requestJson['topic'];
115116
final ref = requestJson["ref"];
117+
final event = requestJson['event'];
116118

117-
if (requestJson["event"] == "phx_leave") {
119+
if (event == 'phx_leave') {
118120
listeners.remove(topic);
119121
return;
120122
}
@@ -126,10 +128,15 @@ void main() {
126128
final String? realtimeFilter = requestJson['payload']['config']
127129
['postgres_changes']
128130
.first['filter'];
131+
final bool isPrivate =
132+
requestJson['payload']['config']['private'] as bool;
129133

130134
if (expectedFilter != null) {
131135
expect(realtimeFilter, expectedFilter);
132136
}
137+
if (expectedPrivate != null) {
138+
expect(isPrivate, expectedPrivate);
139+
}
133140

134141
final replyString = jsonEncode({
135142
'event': 'phx_reply',
@@ -682,6 +689,25 @@ void main() {
682689
});
683690
});
684691

692+
group('stream() channel config', () {
693+
test('forwards channelConfig.private=true to realtime join payload', () {
694+
handleRequests(mockServer, expectedPrivate: true);
695+
696+
final stream =
697+
supabase.from('todos').stream(primaryKey: ['id'], private: true);
698+
699+
expect(stream, emits(isList));
700+
});
701+
702+
test('uses default private=false when channelConfig is omitted', () {
703+
handleRequests(mockServer, expectedPrivate: false);
704+
705+
final stream = supabase.from('todos').stream(primaryKey: ['id']);
706+
707+
expect(stream, emits(isList));
708+
});
709+
});
710+
685711
group('Deprecated execute method', () {
686712
test('should work with deprecated execute method', () {
687713
handleRequests(mockServer);

0 commit comments

Comments
 (0)