Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion infra/postgrest/db/00-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,16 @@ CREATE TABLE public.imported_data (
created_at timestamp with time zone default timezone('utc'::text, now()) not null,
updated_at timestamp with time zone default timezone('utc'::text, now()) not null,
UNIQUE(external_id, source_system)
);
);

-- Long-running task for testing abortion
create or replace function public.long_running_task()
returns void as $$
declare
start_time timestamp := clock_timestamp();
begin
while clock_timestamp() < start_time + interval '10 seconds' loop
PERFORM pg_sleep(0.1);
end loop;
end;
$$ language plpgsql;
90 changes: 60 additions & 30 deletions packages/postgrest/lib/src/postgrest_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
final CountOption? _count;
final bool _retryEnabled;
final Duration Function(int attempt) _retryDelay;
final Completer<void>? _abortCompleter;
final _log = Logger('supabase.postgrest');

static Duration _defaultRetryDelay(int attempt) =>
Expand All @@ -69,6 +70,7 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
PostgrestConverter<S, R>? converter,
bool retryEnabled = true,
@visibleForTesting Duration Function(int attempt)? retryDelay,
Completer<void>? abortCompleter,
}) : _maybeSingle = maybeSingle,
_method = method,
_converter = converter,
Expand All @@ -80,7 +82,8 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
_count = count,
_body = body,
_retryEnabled = retryEnabled,
_retryDelay = retryDelay ?? _defaultRetryDelay;
_retryDelay = retryDelay ?? _defaultRetryDelay,
_abortCompleter = abortCompleter;

PostgrestBuilder<T, S, R> _copyWith({
Uri? url,
Expand All @@ -95,6 +98,7 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
PostgrestConverter<S, R>? converter,
bool? retryEnabled,
Duration Function(int attempt)? retryDelay,
Completer<void>? abortCompleter,
}) {
return PostgrestBuilder<T, S, R>(
url: url ?? _url,
Expand All @@ -109,6 +113,7 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
converter: converter ?? _converter,
retryEnabled: retryEnabled ?? _retryEnabled,
retryDelay: retryDelay ?? _retryDelay,
abortCompleter: abortCompleter ?? _abortCompleter,
);
}

Expand All @@ -121,6 +126,31 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
PostgrestBuilder<T, S, R> retry({required bool enabled}) =>
_copyWith(retryEnabled: enabled);

/// Allows manually triggering request abortion by completing the provided
/// [Completer].
///
/// On abort, a [RequestAbortedException] will be thrown.
/// This is useful for setting a timeout for the request.
///
/// Aborting a request will also stop any retries.
///
/// Example:
/// ```dart
/// final abortCompleter = Completer<void>();
/// Timer(Duration(seconds: 5), () => abortCompleter.complete());
/// try {
/// final response = await client
/// .from('table')
/// .select()
/// .abortCompleter(abortCompleter);
/// } on RequestAbortedException catch (e) {
/// print('Request was aborted: $e');
/// }
/// ```
PostgrestBuilder<T, S, R> abortCompleter(Completer<void> abortCompleter) {
return _copyWith(abortCompleter: abortCompleter);
}

PostgrestBuilder<T, S, R> setHeader(String key, String value) {
return _copyWith(
headers: {..._headers, key: value},
Expand Down Expand Up @@ -164,35 +194,33 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
_log.finest("Request: ${method.value} $_url");

final Future<http.Response> Function() send;
if (method == _HttpMethod.get) {
send = () => (_httpClient?.get ?? http.get)(_url, headers: execHeaders);
} else if (method == _HttpMethod.post) {
send = () => (_httpClient?.post ?? http.post)(
_url,
headers: execHeaders,
body: bodyStr,
);
} else if (method == _HttpMethod.put) {
send = () => (_httpClient?.put ?? http.put)(
_url,
headers: execHeaders,
body: bodyStr,
);
} else if (method == _HttpMethod.patch) {
send = () => (_httpClient?.patch ?? http.patch)(
_url,
headers: execHeaders,
body: bodyStr,
);
} else if (method == _HttpMethod.delete) {
send = () =>
(_httpClient?.delete ?? http.delete)(_url, headers: execHeaders);
} else if (method == _HttpMethod.head) {
send =
() => (_httpClient?.head ?? http.head)(_url, headers: execHeaders);
} else {
throw StateError('Unknown HTTP method: ${method.value}');
}
send = () async {
final AbortableRequest request = AbortableRequest(method.value, _url,
abortTrigger: _abortCompleter?.future);
request.headers.addAll(execHeaders);
if (method == _HttpMethod.get) {
} else if (method == _HttpMethod.post) {
request.body = bodyStr;
} else if (method == _HttpMethod.put) {
request.body = bodyStr;
} else if (method == _HttpMethod.patch) {
request.body = bodyStr;
} else if (method == _HttpMethod.delete) {
} else if (method == _HttpMethod.head) {
} else {
throw StateError('Unknown HTTP method: ${method.value}');
}
final client = _httpClient ?? http.Client();

try {
final streamResponse = await client.send(request);
return http.Response.fromStream(streamResponse);
} finally {
if (_httpClient == null) {
client.close();
}
}
};

final response = await _executeWithRetry(send, method, execHeaders);
return _parseResponse(response, method);
Expand Down Expand Up @@ -227,6 +255,8 @@ class PostgrestBuilder<T, S, R> implements Future<T> {
attempt == maxRetries) {
return response;
}
} on RequestAbortedException catch (_) {
rethrow;
} on Exception {
if (attempt == maxRetries) rethrow;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/postgrest/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ environment:
sdk: '>=3.3.0 <4.0.0'

dependencies:
http: '>=0.13.0 <2.0.0'
http: ^1.6.0
yet_another_json_isolate: 2.1.0
meta: ^1.9.1
logging: ^1.2.0
Expand Down
24 changes: 24 additions & 0 deletions packages/postgrest/test/basic_test.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import 'dart:async';
import 'dart:io';

import 'package:http/http.dart';
import 'package:postgrest/postgrest.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -469,6 +471,28 @@ void main() {
expect(res.data.first, isA<List>());
expect(res.count, greaterThan(3));
});

test('aborts long-running function call', () async {
final startTime = DateTime.now();

final completer = Completer<void>();
// Abort after 1 second (before the 10-second function completes)
Timer(Duration(seconds: 1), () => completer.complete());

try {
await postgrest
.rpc('long_running_task')
.select()
.abortCompleter(completer);
} on RequestAbortedException catch (e) {
print('Request aborted: $e');
}

final elapsedTime = DateTime.now().difference(startTime);

expect(elapsedTime.inSeconds, lessThan(5));
expect(elapsedTime.inSeconds, greaterThanOrEqualTo(1));
});
});
group("Custom http client", () {
CustomHttpClient customHttpClient = CustomHttpClient();
Expand Down
43 changes: 41 additions & 2 deletions packages/postgrest/test/retry_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';

Expand Down Expand Up @@ -41,7 +42,21 @@ class _MockRetryClient extends BaseClient {
throw StateError(
'Unexpected call #${index + 1}, only ${_responses.length} configured');
}
return _responses[index](request);

final completer = Completer<StreamedResponse>();
if (request is AbortableRequest) {
request.abortTrigger?.then((_) {
if (!completer.isCompleted) {
completer.completeError(RequestAbortedException());
}
});
}
Future.delayed(Duration(milliseconds: 200)).then((_) {
if (!completer.isCompleted) {
completer.complete(_responses[index](request));
}
});
return completer.future;
}
}

Expand All @@ -53,7 +68,7 @@ PostgrestClient _buildClient(
'http://localhost:3000',
httpClient: mock,
retryEnabled: retryEnabled,
retryDelay: (_) => Duration.zero,
retryDelay: (_) => Duration(seconds: 1),
);
}

Expand Down Expand Up @@ -213,5 +228,29 @@ void main() {
);
expect(mock.callCount, 4);
});

test('GET retries on 520 but aborts before exhausting all retries',
() async {
final mock = _MockRetryClient([_status(520), _status(520), _ok()]);
final client = _buildClient(mock);

final completer = Completer<void>();
completer.complete();
// Abort after the first retry (before the success response)
// Timer(Duration(milliseconds: 60), () => completer.complete());

await expectLater(
client
.from('users')
.select()
.retry(enabled: true)
.abortCompleter(completer),
throwsA(isA<RequestAbortedException>()),
);

// Verify that only 1 attempt was made before abort
// (not all 3 retries exhausted)
expect(mock.callCount, 1);
});
});
}
Loading