Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/supabase/lib/src/supabase_query_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {

/// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream].
///
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup).
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/subscribing-to-database-changes#enable-postgres-changes).
///
/// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates.
///
/// The underlying [RealtimeChannel] is public by default. Set [private] to `true` to make it private, which requires additional RLS policies to be set up. See https://supabase.com/docs/guides/realtime/authorization for more details.
///
/// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed.
///
/// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream.
Expand All @@ -43,7 +45,10 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
/// ```dart
/// supabase.from('chats').stream(primaryKey: ['id']).eq('room_id','123').order('created_at').limit(20).listen(_onChatsReceived);
/// ```
SupabaseStreamFilterBuilder stream({required List<String> primaryKey}) {
SupabaseStreamFilterBuilder stream({
required List<String> primaryKey,
bool private = false,
}) {
Comment thread
Vinzent03 marked this conversation as resolved.
assert(primaryKey.isNotEmpty, 'Please specify primary key column(s).');
return SupabaseStreamFilterBuilder(
queryBuilder: this,
Expand All @@ -52,6 +57,7 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
schema: _schema,
table: _table,
primaryKey: primaryKey,
private: private,
);
}
}
15 changes: 13 additions & 2 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

final String _realtimeTopic;

/// Whether the underlying [_channel] should be initialized as private
/// or not. Default is false, which means the channel is public.
final bool _private;

RealtimeChannel? _channel;

final String _schema;
Expand Down Expand Up @@ -89,12 +93,14 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
required String schema,
required String table,
required List<String> primaryKey,
required bool private,
}) : _queryBuilder = queryBuilder,
_realtimeTopic = realtimeTopic,
_realtimeClient = realtimeClient,
_schema = schema,
_table = table,
_uniqueColumns = primaryKey;
_uniqueColumns = primaryKey,
_private = private;

/// Orders the result with the specified [column].
///
Expand Down Expand Up @@ -167,7 +173,12 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
);
}

_channel = _realtimeClient.channel(_realtimeTopic);
_channel = _realtimeClient.channel(
_realtimeTopic,
RealtimeChannelConfig(
private: _private,
),
);

_channel!
.onPostgresChanges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
required super.schema,
required super.table,
required super.primaryKey,
required super.private,
});

/// Filters the results where [column] equals [value].
Expand Down
28 changes: 27 additions & 1 deletion packages/supabase/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void main() {
Future<void> handleRequests(
HttpServer server, {
String? expectedFilter,
bool? expectedPrivate,
}) async {
await for (final HttpRequest request in server) {
final headers = request.headers;
Expand Down Expand Up @@ -113,8 +114,9 @@ void main() {
final requestJson = jsonDecode(request);
final topic = requestJson['topic'];
final ref = requestJson["ref"];
final event = requestJson['event'];

if (requestJson["event"] == "phx_leave") {
if (event == 'phx_leave') {
listeners.remove(topic);
return;
}
Expand All @@ -126,10 +128,15 @@ void main() {
final String? realtimeFilter = requestJson['payload']['config']
['postgres_changes']
.first['filter'];
final bool isPrivate =
requestJson['payload']['config']['private'] as bool;

if (expectedFilter != null) {
expect(realtimeFilter, expectedFilter);
}
if (expectedPrivate != null) {
expect(isPrivate, expectedPrivate);
}

final replyString = jsonEncode({
'event': 'phx_reply',
Expand Down Expand Up @@ -682,6 +689,25 @@ void main() {
});
});

group('stream() channel config', () {
test('forwards channelConfig.private=true to realtime join payload', () {
handleRequests(mockServer, expectedPrivate: true);

final stream =
supabase.from('todos').stream(primaryKey: ['id'], private: true);

expect(stream, emits(isList));
});

test('uses default private=false when channelConfig is omitted', () {
handleRequests(mockServer, expectedPrivate: false);

final stream = supabase.from('todos').stream(primaryKey: ['id']);

expect(stream, emits(isList));
});
});

group('Deprecated execute method', () {
test('should work with deprecated execute method', () {
handleRequests(mockServer);
Expand Down
Loading