Skip to content

Commit 8c095cf

Browse files
feat(fdc): Hardening (#18173)
* debug logging and hardering * Hardening: ensure only one multicast stream controller is created in a ref * Handle cancellations * Formatting fix
1 parent c9e7bed commit 8c095cf

3 files changed

Lines changed: 72 additions & 27 deletions

File tree

packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,12 @@ abstract class OperationRef<Data, Variables> {
8080
try {
8181
final decoded = jsonDecode(serializer(vars));
8282
final sortedStr = jsonEncode(_sortKeys(decoded));
83-
return '$operationName::$sortedStr';
83+
final hashVars = convertToSha256(sortedStr);
84+
return '$operationName::$hashVars';
8485
} catch (_) {
85-
return '$operationName::${serializer(vars)}';
86+
final rawVars = serializer(vars);
87+
final hashVars = convertToSha256(rawVars);
88+
return '$operationName::$hashVars';
8689
}
8790
} else {
8891
return operationName;
@@ -180,7 +183,7 @@ class QueryManager {
180183
try {
181184
await queryRef.execute(fetchPolicy: QueryFetchPolicy.cacheOnly);
182185
} catch (e) {
183-
log('Error executing impacted query $e');
186+
log('Error executing impacted query $queryId $e');
184187
}
185188
}
186189
}
@@ -207,7 +210,12 @@ class QueryManager {
207210
trackedQueries[queryId] = ref;
208211

209212
final streamController =
210-
StreamController<QueryResult<Data, Variables>>.broadcast();
213+
StreamController<QueryResult<Data, Variables>>.broadcast(
214+
onCancel: () {
215+
trackedQueries.remove(queryId);
216+
ref._onAllSubscribersCancelled();
217+
},
218+
);
211219

212220
return streamController;
213221
}
@@ -326,6 +334,15 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
326334
}
327335

328336
StreamController<QueryResult<Data, Variables>>? _streamController;
337+
Stream<ServerResponse>? _serverStream;
338+
StreamSubscription<ServerResponse>? _serverStreamSubscription;
339+
340+
void _onAllSubscribersCancelled() {
341+
_serverStreamSubscription?.cancel();
342+
_serverStreamSubscription = null;
343+
_serverStream = null;
344+
log("QueryRef $_queryId: All subscribers cancelled. Unsubscribed from server stream.");
345+
}
329346

330347
Stream<QueryResult<Data, Variables>> subscribe() {
331348
_streamController ??= _queryManager.addQuery(this);
@@ -344,49 +361,76 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
344361
}
345362
}
346363

347-
// Initiate Web Socket stream
348-
_streamFromServer();
364+
// Initiate Web Socket stream only if not already streaming
365+
if (_serverStream == null) {
366+
_streamFromServer();
367+
}
349368
});
350369

351370
return stream;
352371
}
353372

354373
void _streamFromServer() async {
355374
bool shouldRetry = await _shouldRetry();
375+
log("QueryRef $_queryId _streamFromServer loop started.");
356376
try {
357-
final stream = _transport.invokeStreamQuery<Data, Variables>(
377+
_serverStream = _transport.invokeStreamQuery<Data, Variables>(
358378
operationName,
359379
deserializer,
360380
serializer,
361381
variables,
362382
_lastToken,
363383
);
364384

365-
await for (final serverResponse in stream) {
366-
if (dataConnect.cacheManager != null) {
367-
await dataConnect.cacheManager!.update(_queryId, serverResponse);
368-
}
369-
Data typedData = _convertBodyJsonToData(serverResponse.data);
370-
371-
QueryResult<Data, Variables> res =
372-
QueryResult(dataConnect, typedData, DataSource.server, this);
373-
publishResultToStream(res);
374-
}
375-
} on DataConnectError catch (e) {
376-
if (shouldRetry &&
377-
e.code == DataConnectErrorCode.unauthorized.toString()) {
378-
_streamFromServer();
379-
} else {
380-
publishErrorToStream(e);
381-
}
385+
_serverStreamSubscription = _serverStream!.listen(
386+
(serverResponse) async {
387+
log("QueryRef $_queryId _streamFromServer loop received snapshot.");
388+
if (dataConnect.cacheManager != null) {
389+
try {
390+
await dataConnect.cacheManager!.update(_queryId, serverResponse);
391+
} catch (e) {
392+
log("QueryRef $_queryId _streamFromServer loop cache update failed: $e");
393+
}
394+
}
395+
Data typedData = _convertBodyJsonToData(serverResponse.data);
396+
397+
QueryResult<Data, Variables> res =
398+
QueryResult(dataConnect, typedData, DataSource.server, this);
399+
publishResultToStream(res);
400+
},
401+
onError: (e) {
402+
_serverStreamSubscription?.cancel();
403+
_serverStreamSubscription = null;
404+
_serverStream = null;
405+
406+
if (shouldRetry &&
407+
e is DataConnectError &&
408+
e.code == DataConnectErrorCode.unauthorized.toString()) {
409+
_streamFromServer();
410+
} else {
411+
publishErrorToStream(e);
412+
}
413+
},
414+
onDone: () {
415+
_serverStreamSubscription?.cancel();
416+
_serverStreamSubscription = null;
417+
_serverStream = null;
418+
},
419+
);
382420
} catch (e) {
421+
_serverStreamSubscription?.cancel();
422+
_serverStreamSubscription = null;
423+
_serverStream = null;
424+
log("QueryRef $_queryId _streamFromServer loop Unknown loop failure: $e");
383425
publishErrorToStream(e);
384426
}
385427
}
386428

387429
void publishResultToStream(QueryResult<Data, Variables> result) {
388430
if (_streamController != null) {
389431
_streamController?.add(result);
432+
} else {
433+
log("QueryRef $_queryId _streamFromServer loop _streamController is null");
390434
}
391435
}
392436

packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,9 @@ class WebSocketTransport implements DataConnectTransport {
617617
);
618618

619619
if (_channel != null) {
620-
_channel?.sink.add(jsonEncode(request.toJson()));
620+
final encodedMessage = jsonEncode(request.toJson());
621+
developer.log('Sending subscribe message $encodedMessage');
622+
_channel?.sink.add(encodedMessage);
621623
}
622624
},
623625
onCancel: () {

packages/firebase_data_connect/firebase_data_connect/test/src/core/ref_test.dart

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ void main() {
9898
);
9999
final stream = queryManager.addQuery(ref);
100100

101-
//expect(queryManager.trackedQueries['testQuery'], isNotNull);
102-
expect(queryManager.trackedQueries['testQuery::varsAsStr'], isNotNull);
101+
expect(queryManager.trackedQueries.values.contains(ref), isTrue);
103102
expect(stream, isA<StreamController>());
104103
});
105104
});

0 commit comments

Comments
 (0)