Skip to content

Commit 1f9d295

Browse files
authored
feat(realtime_client): support new postgres changes filter operators, multi-filter, column selection, and replication-ready events (#1526)
1 parent 56a47f7 commit 1f9d295

6 files changed

Lines changed: 415 additions & 19 deletions

File tree

packages/realtime_client/lib/src/realtime_channel.dart

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22
import 'dart:convert';
33
import 'dart:typed_data';
44

5+
import 'package:collection/collection.dart';
56
import 'package:http/http.dart';
67
import 'package:meta/meta.dart';
78
import 'package:realtime_client/realtime_client.dart';
@@ -273,6 +274,10 @@ class RealtimeChannel {
273274
final filter = clientPostgresBinding.filter['filter'];
274275
final serverPostgresFilter = serverPostgresFilters[i];
275276

277+
// NOTE: `select` is intentionally not part of this equality check (mirroring
278+
// supabase-js), so a server that echoes `select` back in a slightly
279+
// different shape does not force a spurious unsubscribe. The client
280+
// binding keeps its own `select` regardless.
276281
if (serverPostgresFilter != null &&
277282
serverPostgresFilter['event'] == event &&
278283
serverPostgresFilter['schema'] == schema &&
@@ -369,6 +374,13 @@ class RealtimeChannel {
369374
///
370375
/// [filter] can be used to further control which rows to listen to within the given [schema] and [table].
371376
///
377+
/// [filters] combines multiple [PostgresChangeFilter]s with an `AND`. Provide
378+
/// either [filter] or [filters], not both.
379+
///
380+
/// [select] restricts the change payload to a subset of columns instead of the
381+
/// full row (reducing payload size). The listed columns must be selectable by
382+
/// the subscribing role.
383+
///
372384
/// ```dart
373385
/// supabase.channel('my_channel').onPostgresChanges(
374386
/// event: PostgresChangeEvent.all,
@@ -388,15 +400,29 @@ class RealtimeChannel {
388400
String? schema,
389401
String? table,
390402
PostgresChangeFilter? filter,
403+
List<PostgresChangeFilter>? filters,
404+
List<String>? select,
391405
required void Function(PostgresChangePayload payload) callback,
392406
}) {
407+
assert(
408+
filter == null || filters == null,
409+
'Provide either `filter` or `filters`, not both.',
410+
);
411+
412+
final allFilters = [
413+
?filter,
414+
...?filters,
415+
];
416+
final filterString = allFilters.isEmpty ? null : allFilters.join(',');
417+
393418
return onEvents(
394419
'postgres_changes',
395420
ChannelFilter(
396421
event: event.toRealtimeEvent(),
397422
schema: schema,
398423
table: table,
399-
filter: filter?.toString(),
424+
filter: filterString,
425+
select: select,
400426
),
401427
(payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload)),
402428
);
@@ -517,7 +543,38 @@ class RealtimeChannel {
517543
return result;
518544
}
519545

520-
/// Sets up a listener for realtime system events for debugging purposes.
546+
/// Sets up a listener for realtime `system` events.
547+
///
548+
/// The [callback] receives the raw payload (typically a `Map`). To work with
549+
/// it as a typed value, parse it with [RealtimeSystemPayload.fromJson].
550+
///
551+
/// Opt in to the replication-ready notification with
552+
/// [RealtimeChannelConfig.replicationReady] when creating the channel, then
553+
/// watch for `status == 'ok'` to know the Postgres replication connection is
554+
/// ready.
555+
///
556+
/// ```dart
557+
/// final channel = supabase.channel(
558+
/// 'room1',
559+
/// opts: const RealtimeChannelConfig(replicationReady: true),
560+
/// );
561+
/// channel
562+
/// .onPostgresChanges(
563+
/// event: PostgresChangeEvent.all,
564+
/// schema: 'public',
565+
/// table: 'messages',
566+
/// callback: (payload) => print('Change received! $payload'),
567+
/// )
568+
/// .onSystemEvents((payload) {
569+
/// final system = RealtimeSystemPayload.fromJson(
570+
/// Map<String, dynamic>.from(payload as Map),
571+
/// );
572+
/// if (system.extension == 'system' && system.status == 'ok') {
573+
/// print('Replication connection is ready: ${system.message}');
574+
/// }
575+
/// })
576+
/// .subscribe();
577+
/// ```
521578
RealtimeChannel onSystemEvents(
522579
void Function(dynamic payload) callback,
523580
) {
@@ -548,7 +605,7 @@ class RealtimeChannel {
548605
}
549606

550607
@internal
551-
RealtimeChannel off(String type, Map<String, String> filter) {
608+
RealtimeChannel off(String type, Map<String, dynamic> filter) {
552609
final typeLower = type.toLowerCase();
553610

554611
_bindings[typeLower] = _bindings[typeLower]!.where((bind) {
@@ -983,13 +1040,14 @@ class RealtimeChannel {
9831040
@internal
9841041
bool get isLeaving => _state == ChannelStates.leaving;
9851042

986-
static bool _isEqual(Map<String, String> obj1, Map<String, String> obj2) {
1043+
static bool _isEqual(Map<String, Object?> obj1, Map<String, Object?> obj2) {
9871044
if (obj1.keys.length != obj2.keys.length) {
9881045
return false;
9891046
}
9901047

1048+
const equality = DeepCollectionEquality();
9911049
for (final k in obj1.keys) {
992-
if (obj1[k] != obj2[k]) {
1050+
if (!equality.equals(obj1[k], obj2[k])) {
9931051
return false;
9941052
}
9951053
}

packages/realtime_client/lib/src/types.dart

Lines changed: 145 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ typedef BindingCallback = void Function(dynamic payload, [dynamic ref]);
66

77
class Binding {
88
String type;
9-
Map<String, String> filter;
9+
Map<String, dynamic> filter;
1010
BindingCallback callback;
1111
String? id;
1212

@@ -19,7 +19,7 @@ class Binding {
1919

2020
Binding copyWith({
2121
String? type,
22-
Map<String, String>? filter,
22+
Map<String, dynamic>? filter,
2323
BindingCallback? callback,
2424
String? id,
2525
}) {
@@ -79,24 +79,31 @@ class ChannelFilter {
7979
final String? schema;
8080
final String? table;
8181

82-
/// For [RealtimeListenTypes.postgresChanges] it's of the format `column=filter.value` with `filter` being one of `eq, neq, lt, lte, gt, gte, in`
82+
/// For [RealtimeListenTypes.postgresChanges] it's of the format `column=filter.value` with `filter` being one of `eq, neq, lt, lte, gt, gte, in, like, ilike, is, match, imatch, isdistinct`.
8383
///
84-
/// Only one filter can be applied
84+
/// Multiple conditions can be combined with commas; they are applied as an `AND`.
85+
/// Any operator can be negated with the `not.` prefix.
8586
final String? filter;
8687

88+
/// For [RealtimeListenTypes.postgresChanges], restricts the change payload to
89+
/// a subset of columns instead of the full row.
90+
final List<String>? select;
91+
8792
const ChannelFilter({
8893
this.event,
8994
this.schema,
9095
this.table,
9196
this.filter,
97+
this.select,
9298
});
9399

94-
Map<String, String> toMap() {
100+
Map<String, dynamic> toMap() {
95101
return {
96102
if (event != null) 'event': event!,
97103
if (schema != null) 'schema': schema!,
98104
if (table != null) 'table': table!,
99105
if (filter != null) 'filter': filter!,
106+
if (select != null) 'select': select!,
100107
};
101108
}
102109
}
@@ -181,13 +188,24 @@ class RealtimeChannelConfig {
181188
/// Defines if the channel is private or not and if RLS policies will be used to check data
182189
final bool private;
183190

191+
/// [replicationReady] instructs the server to emit a `system` event once the
192+
/// Postgres replication connection backing this channel is established and
193+
/// ready to stream changes.
194+
///
195+
/// Listen for it with [RealtimeChannel.onSystemEvents]; the payload's
196+
/// [RealtimeSystemPayload.status] is `'ok'`
197+
/// (message: `'Replication connection established'`) on success or `'error'`
198+
/// if the connection is not ready in time.
199+
final bool replicationReady;
200+
184201
const RealtimeChannelConfig({
185202
this.ack = false,
186203
this.self = false,
187204
this.replay,
188205
this.key = '',
189206
this.enabled = false,
190207
this.private = false,
208+
this.replicationReady = false,
191209
});
192210

193211
Map<String, dynamic> toMap() {
@@ -198,6 +216,9 @@ class RealtimeChannelConfig {
198216
if (replay != null) {
199217
broadcastConfig['replay'] = replay!.toMap();
200218
}
219+
if (replicationReady) {
220+
broadcastConfig['replication_ready'] = true;
221+
}
201222

202223
return {
203224
'config': {
@@ -212,6 +233,48 @@ class RealtimeChannelConfig {
212233
}
213234
}
214235

236+
/// Payload of a `system` event emitted by the server.
237+
///
238+
/// Most notably, when a channel is created with
239+
/// [RealtimeChannelConfig.replicationReady] set to `true`, the server sends one
240+
/// of these once the Postgres replication connection is ready
241+
/// ([status] is `'ok'`) or fails to become ready in time ([status] is
242+
/// `'error'`).
243+
class RealtimeSystemPayload {
244+
/// The extension that produced the message, e.g. `'system'` or
245+
/// `'postgres_changes'`.
246+
final String extension;
247+
248+
/// `'ok'` on success, `'error'` on failure.
249+
final String status;
250+
251+
/// Human-readable description, e.g. `'Replication connection established'`.
252+
final String message;
253+
254+
/// The channel (sub)topic the message refers to.
255+
final String channel;
256+
257+
const RealtimeSystemPayload({
258+
required this.extension,
259+
required this.status,
260+
required this.message,
261+
required this.channel,
262+
});
263+
264+
factory RealtimeSystemPayload.fromJson(Map<String, dynamic> json) {
265+
return RealtimeSystemPayload(
266+
extension: json['extension']?.toString() ?? '',
267+
status: json['status']?.toString() ?? '',
268+
message: json['message']?.toString() ?? '',
269+
channel: json['channel']?.toString() ?? '',
270+
);
271+
}
272+
273+
@override
274+
String toString() =>
275+
'RealtimeSystemPayload(extension: $extension, status: $status, message: $message, channel: $channel)';
276+
}
277+
215278
/// Data class that contains the Postgres change event payload.
216279
class PostgresChangePayload {
217280
final String schema;
@@ -291,6 +354,10 @@ class PostgresChangePayload {
291354
}
292355

293356
/// Specifies the type of filter to be applied on realtime Postgres Change listener.
357+
///
358+
/// These mirror the PostgREST operator surface that the Realtime server
359+
/// evaluates for Postgres Changes. Any operator can be negated with the `not.`
360+
/// prefix via [PostgresChangeFilter.negate].
294361
enum PostgresChangeFilterType {
295362
/// Listens to changes where a column's value in a table equals a client-specified value.
296363
eq,
@@ -312,6 +379,54 @@ enum PostgresChangeFilterType {
312379

313380
/// Listen to changes when a column's value in a table equals any of the values specified.
314381
inFilter,
382+
383+
/// Listens to changes where a column matches a case-sensitive pattern (`LIKE`).
384+
///
385+
/// Use `%` and `_` as wildcards, e.g. `title=like.%foo%`.
386+
like,
387+
388+
/// Listens to changes where a column matches a case-insensitive pattern (`ILIKE`).
389+
ilike,
390+
391+
/// Listens to changes where a column `IS` a given value (`null`, `true`,
392+
/// `false` or `unknown`), e.g. `deleted_at=is.null`.
393+
isFilter,
394+
395+
/// Listens to changes where a column matches a POSIX regular expression (`~`).
396+
match,
397+
398+
/// Listens to changes where a column matches a case-insensitive POSIX regular
399+
/// expression (`~*`).
400+
imatch,
401+
402+
/// Listens to changes where a column is distinct from a value (NULL-safe
403+
/// inequality, `IS DISTINCT FROM`).
404+
isDistinct;
405+
406+
/// The operator token used in the filter wire format (the part between
407+
/// `column=` and `.value`). Most match [name], but a few differ because the
408+
/// enum names avoid Dart reserved words / casing conventions.
409+
String get token {
410+
switch (this) {
411+
case PostgresChangeFilterType.inFilter:
412+
return 'in';
413+
case PostgresChangeFilterType.isFilter:
414+
return 'is';
415+
case PostgresChangeFilterType.isDistinct:
416+
return 'isdistinct';
417+
case PostgresChangeFilterType.eq:
418+
case PostgresChangeFilterType.neq:
419+
case PostgresChangeFilterType.lt:
420+
case PostgresChangeFilterType.lte:
421+
case PostgresChangeFilterType.gt:
422+
case PostgresChangeFilterType.gte:
423+
case PostgresChangeFilterType.like:
424+
case PostgresChangeFilterType.ilike:
425+
case PostgresChangeFilterType.match:
426+
case PostgresChangeFilterType.imatch:
427+
return name;
428+
}
429+
}
315430
}
316431

317432
/// {@template postgres_change_filter}
@@ -327,22 +442,42 @@ class PostgresChangeFilter {
327442
/// The value to perform the filter on.
328443
final dynamic value;
329444

445+
/// When `true`, the operator is negated with the `not.` prefix
446+
/// (e.g. `status=not.in.(draft,archived)`, `deleted_at=not.is.null`).
447+
final bool negate;
448+
330449
/// {@macro postgres_change_filter}
331450
const PostgresChangeFilter({
332451
required this.type,
333452
required this.column,
334453
required this.value,
454+
this.negate = false,
335455
});
336456

457+
/// Quotes a scalar value PostgREST-style when it contains a reserved
458+
/// character (`,`, `(`, `)`, `"`, `\`) or surrounding whitespace, so the
459+
/// server's filter parser doesn't misread it as a condition/list boundary.
460+
/// Values without reserved characters are sent verbatim.
461+
static String _serializeScalar(Object? value) {
462+
final serialized = value == null ? 'null' : '$value';
463+
final needsQuoting =
464+
RegExp(r'[,()"\\]').hasMatch(serialized) ||
465+
serialized != serialized.trim();
466+
if (!needsQuoting) return serialized;
467+
final escaped = serialized.replaceAll(r'\', r'\\').replaceAll('"', r'\"');
468+
return '"$escaped"';
469+
}
470+
337471
@override
338472
String toString() {
473+
final prefix = negate ? 'not.' : '';
339474
if (type == PostgresChangeFilterType.inFilter) {
340-
return '$column=in.(${value.map((s) {
341-
final escaped = '$s'.replaceAll(r'\', r'\\').replaceAll('"', r'\"');
342-
return '"$escaped"';
343-
}).join(',')})';
475+
final items = (value as Iterable)
476+
.map((s) => _serializeScalar(s))
477+
.join(',');
478+
return '$column=${prefix}in.($items)';
344479
}
345-
return '$column=${type.name}.$value';
480+
return '$column=$prefix${type.token}.${_serializeScalar(value)}';
346481
}
347482
}
348483

0 commit comments

Comments
 (0)