9494//! let config = Config::new("queue")
9595//! .with_poll_interval(lazy_strategy)
9696//! .set_buffer_size(5);
97- //! let backend = PostgresStorage::new_with_notify(&pool, &config).await ;
97+ //! let backend = PostgresStorage::new_with_notify(&pool, &config);
9898//!
9999//! tokio::spawn({
100100//! let pool = pool.clone();
190190//! ## License
191191//!
192192//! Licensed under either of Apache License, Version 2.0 or MIT license at your option.
193- //!
193+ //!
194194//! [`PostgresStorageWithListener`]: crate::PostgresStorage
195195//! [`SharedPostgresStorage`]: crate::shared::SharedPostgresStorage
196196use std:: { fmt:: Debug , marker:: PhantomData } ;
@@ -211,7 +211,7 @@ use futures::{
211211 stream:: { self , BoxStream , select} ,
212212} ;
213213use serde:: Deserialize ;
214- use sqlx:: { PgPool , postgres:: PgListener } ;
214+ pub use sqlx:: { PgPool , postgres:: PgConnectOptions , postgres :: PgListener , postgres :: Postgres } ;
215215use ulid:: Ulid ;
216216
217217use crate :: {
@@ -227,7 +227,7 @@ use crate::{
227227} ;
228228
229229mod ack;
230- mod config;
230+ pub mod config;
231231mod fetcher;
232232mod from_row {
233233 use chrono:: { DateTime , Utc } ;
@@ -278,7 +278,7 @@ mod from_row {
278278 }
279279 }
280280}
281- mod context {
281+ pub mod context {
282282 pub type PgContext = apalis_sql:: context:: SqlContext ;
283283}
284284mod queries;
@@ -289,6 +289,11 @@ pub type PgTask<Args> = Task<Args, PgContext, Ulid>;
289289
290290pub type CompactType = Vec < u8 > ;
291291
292+ #[ derive( Debug , Clone , Default ) ]
293+ pub struct PgNotify {
294+ _private : PhantomData < ( ) > ,
295+ }
296+
292297#[ pin_project:: pin_project]
293298pub struct PostgresStorage <
294299 Args ,
@@ -354,20 +359,17 @@ impl<Args> PostgresStorage<Args> {
354359 }
355360 }
356361
357- pub async fn new_with_notify (
362+ pub fn new_with_notify (
358363 pool : & PgPool ,
359364 config : & Config ,
360- ) -> PostgresStorage < Args , CompactType , JsonCodec < CompactType > , PgListener > {
365+ ) -> PostgresStorage < Args , CompactType , JsonCodec < CompactType > , PgNotify > {
361366 let sink = PgSink :: new ( pool, config) ;
362- let mut fetcher = PgListener :: connect_with ( pool)
363- . await
364- . expect ( "Failed to create listener" ) ;
365- fetcher. listen ( "apalis::job::insert" ) . await . unwrap ( ) ;
367+
366368 PostgresStorage {
367369 _marker : PhantomData ,
368370 pool : pool. clone ( ) ,
369371 config : config. clone ( ) ,
370- fetcher,
372+ fetcher : PgNotify :: default ( ) ,
371373 sink,
372374 }
373375 }
@@ -383,6 +385,18 @@ impl<Args> PostgresStorage<Args> {
383385 }
384386}
385387
388+ impl < Args , Compact , Codec , Fetcher > PostgresStorage < Args , Compact , Codec , Fetcher > {
389+ pub fn with_codec < NewCodec > ( self ) -> PostgresStorage < Args , Compact , NewCodec , Fetcher > {
390+ PostgresStorage {
391+ _marker : PhantomData ,
392+ sink : PgSink :: new ( & self . pool , & self . config ) ,
393+ pool : self . pool ,
394+ config : self . config ,
395+ fetcher : self . fetcher ,
396+ }
397+ }
398+ }
399+
386400impl < Args , Decode > Backend
387401 for PostgresStorage < Args , CompactType , Decode , PgFetcher < Args , CompactType , Decode > >
388402where
@@ -448,7 +462,7 @@ where
448462 }
449463}
450464
451- impl < Args , Decode > Backend for PostgresStorage < Args , CompactType , Decode , PgListener >
465+ impl < Args , Decode > Backend for PostgresStorage < Args , CompactType , Decode , PgNotify >
452466where
453467 Args : Send + ' static + Unpin ,
454468 Decode : Codec < Args , Compact = CompactType > + ' static + Send ,
@@ -497,6 +511,15 @@ where
497511 let pool = self . pool . clone ( ) ;
498512 let worker_id = worker. name ( ) . to_owned ( ) ;
499513 let namespace = self . config . queue ( ) . to_string ( ) ;
514+ let listener = async move {
515+ let mut fetcher = PgListener :: connect_with ( & pool)
516+ . await
517+ . expect ( "Failed to create listener" ) ;
518+ fetcher. listen ( "apalis::job::insert" ) . await . unwrap ( ) ;
519+ fetcher
520+ } ;
521+ let fetcher = stream:: once ( listener) . flat_map ( |f| f. into_stream ( ) ) ;
522+ let pool = self . pool . clone ( ) ;
500523 let register_worker = initial_heartbeat (
501524 self . pool . clone ( ) ,
502525 self . config . clone ( ) ,
@@ -505,8 +528,7 @@ where
505528 )
506529 . map ( |_| Ok ( None ) ) ;
507530 let register = stream:: once ( register_worker) ;
508- let lazy_fetcher = self
509- . fetcher
531+ let lazy_fetcher = fetcher
510532 . into_stream ( )
511533 . filter_map ( move |notification| {
512534 let namespace = namespace. clone ( ) ;
@@ -630,7 +652,7 @@ mod tests {
630652 . await
631653 . unwrap ( ) ;
632654 let config = Config :: new ( "test" ) ;
633- let mut backend = PostgresStorage :: new_with_notify ( & pool, & config) . await ;
655+ let mut backend = PostgresStorage :: new_with_notify ( & pool, & config) ;
634656
635657 let mut items = stream:: repeat_with ( || {
636658 Task :: builder ( 42u32 )
0 commit comments