From a1b6c7c6b2f47726f60dba00800bc3c39a428392 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Fri, 29 May 2026 02:27:45 +0200 Subject: [PATCH] feat: abort requests --- infra/postgrest/db/00-schema.sql | 14 ++- .../postgrest/lib/src/postgrest_builder.dart | 90 ++++++++++++------- packages/postgrest/pubspec.yaml | 2 +- packages/postgrest/test/basic_test.dart | 24 +++++ packages/postgrest/test/retry_test.dart | 43 ++++++++- 5 files changed, 139 insertions(+), 34 deletions(-) diff --git a/infra/postgrest/db/00-schema.sql b/infra/postgrest/db/00-schema.sql index feea99cfb..7c1554e1f 100644 --- a/infra/postgrest/db/00-schema.sql +++ b/infra/postgrest/db/00-schema.sql @@ -122,4 +122,16 @@ CREATE TABLE public.imported_data ( created_at timestamp with time zone default timezone('utc'::text, now()) not null, updated_at timestamp with time zone default timezone('utc'::text, now()) not null, UNIQUE(external_id, source_system) -); \ No newline at end of file +); + +-- Long-running task for testing abortion +create or replace function public.long_running_task() +returns void as $$ +declare + start_time timestamp := clock_timestamp(); +begin + while clock_timestamp() < start_time + interval '10 seconds' loop + PERFORM pg_sleep(0.1); + end loop; +end; +$$ language plpgsql; diff --git a/packages/postgrest/lib/src/postgrest_builder.dart b/packages/postgrest/lib/src/postgrest_builder.dart index 89aa158db..5762d2b05 100644 --- a/packages/postgrest/lib/src/postgrest_builder.dart +++ b/packages/postgrest/lib/src/postgrest_builder.dart @@ -50,6 +50,7 @@ class PostgrestBuilder implements Future { final CountOption? _count; final bool _retryEnabled; final Duration Function(int attempt) _retryDelay; + final Completer? _abortCompleter; final _log = Logger('supabase.postgrest'); static Duration _defaultRetryDelay(int attempt) => @@ -69,6 +70,7 @@ class PostgrestBuilder implements Future { PostgrestConverter? converter, bool retryEnabled = true, @visibleForTesting Duration Function(int attempt)? retryDelay, + Completer? abortCompleter, }) : _maybeSingle = maybeSingle, _method = method, _converter = converter, @@ -80,7 +82,8 @@ class PostgrestBuilder implements Future { _count = count, _body = body, _retryEnabled = retryEnabled, - _retryDelay = retryDelay ?? _defaultRetryDelay; + _retryDelay = retryDelay ?? _defaultRetryDelay, + _abortCompleter = abortCompleter; PostgrestBuilder _copyWith({ Uri? url, @@ -95,6 +98,7 @@ class PostgrestBuilder implements Future { PostgrestConverter? converter, bool? retryEnabled, Duration Function(int attempt)? retryDelay, + Completer? abortCompleter, }) { return PostgrestBuilder( url: url ?? _url, @@ -109,6 +113,7 @@ class PostgrestBuilder implements Future { converter: converter ?? _converter, retryEnabled: retryEnabled ?? _retryEnabled, retryDelay: retryDelay ?? _retryDelay, + abortCompleter: abortCompleter ?? _abortCompleter, ); } @@ -121,6 +126,31 @@ class PostgrestBuilder implements Future { PostgrestBuilder retry({required bool enabled}) => _copyWith(retryEnabled: enabled); + /// Allows manually triggering request abortion by completing the provided + /// [Completer]. + /// + /// On abort, a [RequestAbortedException] will be thrown. + /// This is useful for setting a timeout for the request. + /// + /// Aborting a request will also stop any retries. + /// + /// Example: + /// ```dart + /// final abortCompleter = Completer(); + /// Timer(Duration(seconds: 5), () => abortCompleter.complete()); + /// try { + /// final response = await client + /// .from('table') + /// .select() + /// .abortCompleter(abortCompleter); + /// } on RequestAbortedException catch (e) { + /// print('Request was aborted: $e'); + /// } + /// ``` + PostgrestBuilder abortCompleter(Completer abortCompleter) { + return _copyWith(abortCompleter: abortCompleter); + } + PostgrestBuilder setHeader(String key, String value) { return _copyWith( headers: {..._headers, key: value}, @@ -164,35 +194,33 @@ class PostgrestBuilder implements Future { _log.finest("Request: ${method.value} $_url"); final Future Function() send; - if (method == _HttpMethod.get) { - send = () => (_httpClient?.get ?? http.get)(_url, headers: execHeaders); - } else if (method == _HttpMethod.post) { - send = () => (_httpClient?.post ?? http.post)( - _url, - headers: execHeaders, - body: bodyStr, - ); - } else if (method == _HttpMethod.put) { - send = () => (_httpClient?.put ?? http.put)( - _url, - headers: execHeaders, - body: bodyStr, - ); - } else if (method == _HttpMethod.patch) { - send = () => (_httpClient?.patch ?? http.patch)( - _url, - headers: execHeaders, - body: bodyStr, - ); - } else if (method == _HttpMethod.delete) { - send = () => - (_httpClient?.delete ?? http.delete)(_url, headers: execHeaders); - } else if (method == _HttpMethod.head) { - send = - () => (_httpClient?.head ?? http.head)(_url, headers: execHeaders); - } else { - throw StateError('Unknown HTTP method: ${method.value}'); - } + send = () async { + final AbortableRequest request = AbortableRequest(method.value, _url, + abortTrigger: _abortCompleter?.future); + request.headers.addAll(execHeaders); + if (method == _HttpMethod.get) { + } else if (method == _HttpMethod.post) { + request.body = bodyStr; + } else if (method == _HttpMethod.put) { + request.body = bodyStr; + } else if (method == _HttpMethod.patch) { + request.body = bodyStr; + } else if (method == _HttpMethod.delete) { + } else if (method == _HttpMethod.head) { + } else { + throw StateError('Unknown HTTP method: ${method.value}'); + } + final client = _httpClient ?? http.Client(); + + try { + final streamResponse = await client.send(request); + return http.Response.fromStream(streamResponse); + } finally { + if (_httpClient == null) { + client.close(); + } + } + }; final response = await _executeWithRetry(send, method, execHeaders); return _parseResponse(response, method); @@ -227,6 +255,8 @@ class PostgrestBuilder implements Future { attempt == maxRetries) { return response; } + } on RequestAbortedException catch (_) { + rethrow; } on Exception { if (attempt == maxRetries) rethrow; } diff --git a/packages/postgrest/pubspec.yaml b/packages/postgrest/pubspec.yaml index 017cb2035..b148f4ad1 100644 --- a/packages/postgrest/pubspec.yaml +++ b/packages/postgrest/pubspec.yaml @@ -9,7 +9,7 @@ environment: sdk: '>=3.3.0 <4.0.0' dependencies: - http: '>=0.13.0 <2.0.0' + http: ^1.6.0 yet_another_json_isolate: 2.1.0 meta: ^1.9.1 logging: ^1.2.0 diff --git a/packages/postgrest/test/basic_test.dart b/packages/postgrest/test/basic_test.dart index a65f88bc6..44e83133c 100644 --- a/packages/postgrest/test/basic_test.dart +++ b/packages/postgrest/test/basic_test.dart @@ -1,5 +1,7 @@ +import 'dart:async'; import 'dart:io'; +import 'package:http/http.dart'; import 'package:postgrest/postgrest.dart'; import 'package:test/test.dart'; @@ -469,6 +471,28 @@ void main() { expect(res.data.first, isA()); expect(res.count, greaterThan(3)); }); + + test('aborts long-running function call', () async { + final startTime = DateTime.now(); + + final completer = Completer(); + // Abort after 1 second (before the 10-second function completes) + Timer(Duration(seconds: 1), () => completer.complete()); + + try { + await postgrest + .rpc('long_running_task') + .select() + .abortCompleter(completer); + } on RequestAbortedException catch (e) { + print('Request aborted: $e'); + } + + final elapsedTime = DateTime.now().difference(startTime); + + expect(elapsedTime.inSeconds, lessThan(5)); + expect(elapsedTime.inSeconds, greaterThanOrEqualTo(1)); + }); }); group("Custom http client", () { CustomHttpClient customHttpClient = CustomHttpClient(); diff --git a/packages/postgrest/test/retry_test.dart b/packages/postgrest/test/retry_test.dart index f368b0799..36b6559e4 100644 --- a/packages/postgrest/test/retry_test.dart +++ b/packages/postgrest/test/retry_test.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:io'; import 'dart:typed_data'; @@ -41,7 +42,21 @@ class _MockRetryClient extends BaseClient { throw StateError( 'Unexpected call #${index + 1}, only ${_responses.length} configured'); } - return _responses[index](request); + + final completer = Completer(); + if (request is AbortableRequest) { + request.abortTrigger?.then((_) { + if (!completer.isCompleted) { + completer.completeError(RequestAbortedException()); + } + }); + } + Future.delayed(Duration(milliseconds: 200)).then((_) { + if (!completer.isCompleted) { + completer.complete(_responses[index](request)); + } + }); + return completer.future; } } @@ -53,7 +68,7 @@ PostgrestClient _buildClient( 'http://localhost:3000', httpClient: mock, retryEnabled: retryEnabled, - retryDelay: (_) => Duration.zero, + retryDelay: (_) => Duration(seconds: 1), ); } @@ -213,5 +228,29 @@ void main() { ); expect(mock.callCount, 4); }); + + test('GET retries on 520 but aborts before exhausting all retries', + () async { + final mock = _MockRetryClient([_status(520), _status(520), _ok()]); + final client = _buildClient(mock); + + final completer = Completer(); + completer.complete(); + // Abort after the first retry (before the success response) + // Timer(Duration(milliseconds: 60), () => completer.complete()); + + await expectLater( + client + .from('users') + .select() + .retry(enabled: true) + .abortCompleter(completer), + throwsA(isA()), + ); + + // Verify that only 1 attempt was made before abort + // (not all 3 retries exhausted) + expect(mock.callCount, 1); + }); }); }