@@ -42,18 +42,6 @@ const PAGE_SIZE: usize = 50;
4242// Keep this small while still allowing progress if one runtime worker blocks on sync store access.
4343const INTERNAL_RUNTIME_WORKERS : usize = 2 ;
4444
45- async fn run_on_internal_runtime < T > (
46- runtime : Arc < StoreRuntime > , future : impl Future < Output = io:: Result < T > > + Send + ' static ,
47- ) -> io:: Result < T >
48- where
49- T : Send + ' static ,
50- {
51- let task = runtime. spawn ( future) ;
52- task. await . map_err ( |e| {
53- io:: Error :: new ( io:: ErrorKind :: Other , format ! ( "PostgreSQL runtime task failed: {}" , e) )
54- } ) ?
55- }
56-
5745fn sql_identifier ( identifier : & str ) -> io:: Result < String > {
5846 if identifier. is_empty ( ) || identifier. contains ( '\0' ) {
5947 return Err ( io:: Error :: new (
@@ -164,10 +152,12 @@ impl PostgresStore {
164152 "PostgreSQL" ,
165153 ) ?) ;
166154 let tls = Self :: build_tls_connector ( certificate_pem) ?;
167- let inner = run_on_internal_runtime ( Arc :: clone ( & internal_runtime) , async move {
155+ let task = internal_runtime. spawn ( async move {
168156 PostgresStoreInner :: new ( connection_string, db_name, kv_table_name, tls, logger) . await
169- } )
170- . await ?;
157+ } ) ;
158+ let inner = task. await . map_err ( |e| {
159+ io:: Error :: new ( io:: ErrorKind :: Other , format ! ( "PostgreSQL runtime task failed: {}" , e) )
160+ } ) ??;
171161 let inner = Arc :: new ( inner) ;
172162 let next_write_version = AtomicU64 :: new ( 1 ) ;
173163 Ok ( Self { inner, next_write_version, internal_runtime : Some ( internal_runtime) } )
@@ -239,10 +229,15 @@ impl KVStore for PostgresStore {
239229 let inner = Arc :: clone ( & self . inner ) ;
240230 let runtime = self . internal_runtime ( ) ;
241231 async move {
242- run_on_internal_runtime ( runtime , async move {
232+ let task = runtime . spawn ( async move {
243233 inner. read_internal ( & primary_namespace, & secondary_namespace, & key) . await
244- } )
245- . await
234+ } ) ;
235+ task. await . map_err ( |e| {
236+ io:: Error :: new (
237+ io:: ErrorKind :: Other ,
238+ format ! ( "PostgreSQL runtime task failed: {}" , e) ,
239+ )
240+ } ) ?
246241 }
247242 }
248243
@@ -257,7 +252,7 @@ impl KVStore for PostgresStore {
257252 let inner = Arc :: clone ( & self . inner ) ;
258253 let runtime = self . internal_runtime ( ) ;
259254 async move {
260- run_on_internal_runtime ( runtime , async move {
255+ let task = runtime . spawn ( async move {
261256 inner
262257 . write_internal (
263258 inner_lock_ref,
@@ -269,8 +264,13 @@ impl KVStore for PostgresStore {
269264 buf,
270265 )
271266 . await
272- } )
273- . await
267+ } ) ;
268+ task. await . map_err ( |e| {
269+ io:: Error :: new (
270+ io:: ErrorKind :: Other ,
271+ format ! ( "PostgreSQL runtime task failed: {}" , e) ,
272+ )
273+ } ) ?
274274 }
275275 }
276276
@@ -285,7 +285,7 @@ impl KVStore for PostgresStore {
285285 let inner = Arc :: clone ( & self . inner ) ;
286286 let runtime = self . internal_runtime ( ) ;
287287 async move {
288- run_on_internal_runtime ( runtime , async move {
288+ let task = runtime . spawn ( async move {
289289 inner
290290 . remove_internal (
291291 inner_lock_ref,
@@ -296,8 +296,13 @@ impl KVStore for PostgresStore {
296296 & key,
297297 )
298298 . await
299- } )
300- . await
299+ } ) ;
300+ task. await . map_err ( |e| {
301+ io:: Error :: new (
302+ io:: ErrorKind :: Other ,
303+ format ! ( "PostgreSQL runtime task failed: {}" , e) ,
304+ )
305+ } ) ?
301306 }
302307 }
303308
@@ -309,10 +314,15 @@ impl KVStore for PostgresStore {
309314 let inner = Arc :: clone ( & self . inner ) ;
310315 let runtime = self . internal_runtime ( ) ;
311316 async move {
312- run_on_internal_runtime ( runtime , async move {
317+ let task = runtime . spawn ( async move {
313318 inner. list_internal ( & primary_namespace, & secondary_namespace) . await
314- } )
315- . await
319+ } ) ;
320+ task. await . map_err ( |e| {
321+ io:: Error :: new (
322+ io:: ErrorKind :: Other ,
323+ format ! ( "PostgreSQL runtime task failed: {}" , e) ,
324+ )
325+ } ) ?
316326 }
317327 }
318328}
@@ -326,12 +336,17 @@ impl PaginatedKVStore for PostgresStore {
326336 let inner = Arc :: clone ( & self . inner ) ;
327337 let runtime = self . internal_runtime ( ) ;
328338 async move {
329- run_on_internal_runtime ( runtime , async move {
339+ let task = runtime . spawn ( async move {
330340 inner
331341 . list_paginated_internal ( & primary_namespace, & secondary_namespace, page_token)
332342 . await
333- } )
334- . await
343+ } ) ;
344+ task. await . map_err ( |e| {
345+ io:: Error :: new (
346+ io:: ErrorKind :: Other ,
347+ format ! ( "PostgreSQL runtime task failed: {}" , e) ,
348+ )
349+ } ) ?
335350 }
336351 }
337352}
0 commit comments