@@ -56,11 +56,17 @@ async def close_pools():
5656 global _source_pool , _sink_pool
5757 if _source_pool :
5858 logger .info ("Closing source connection pool..." )
59- await _source_pool .close ()
59+ try :
60+ await _source_pool .close ()
61+ except (asyncio .CancelledError , Exception ):
62+ pass
6063 _source_pool = None
6164 if _sink_pool :
6265 logger .info ("Closing sink connection pool..." )
63- await _sink_pool .close ()
66+ try :
67+ await _sink_pool .close ()
68+ except (asyncio .CancelledError , Exception ):
69+ pass
6470 _sink_pool = None
6571
6672
@@ -183,6 +189,7 @@ async def get_source_column_types(
183189async def setup_source (settings : Settings , config : SearchPipeline , target_name : str ):
184190 """Remotely initialize the source publication."""
185191 pub_name = f"pub_{ target_name } "
192+ print (f"DEBUG: setup_source called for { pub_name } " )
186193 logger .info (f"Setting up remote source publication { pub_name } ..." )
187194
188195 # Pre-flight readiness check to avoid race conditions in tests
@@ -199,6 +206,7 @@ async def setup_source(settings: Settings, config: SearchPipeline, target_name:
199206 else ""
200207 )
201208
209+ # Ensure publication exists (idempotent check)
202210 await cur .execute (
203211 f"SELECT 1 FROM pg_publication WHERE pubname = '{ pub_name } '"
204212 )
@@ -210,6 +218,7 @@ async def setup_source(settings: Settings, config: SearchPipeline, target_name:
210218 f"CREATE PUBLICATION { pub_name } FOR TABLE { config .ingest .table } ({ cols } ){ where_clause } "
211219 )
212220 else :
221+ logger .debug (f"Publication { pub_name } exists. Updating definition..." )
213222 await cur .execute (
214223 f"ALTER PUBLICATION { pub_name } SET TABLE { config .ingest .table } ({ cols } ){ where_clause } "
215224 )
@@ -662,7 +671,7 @@ async def ensure_embedding_cache_table(settings: Settings, config: SearchPipelin
662671 )
663672
664673async def cleanup_vectorizer_infrastructure (
665- settings : Settings , config : SearchPipeline , vectorizer_name : str
674+ settings : Settings , config : SearchPipeline , target_name : str , vectorizer_name : str
666675):
667676 """Robustly clean up all infrastructure for a specific vectorizer."""
668677 logger .info (f"Robust cleanup for vectorizer { vectorizer_name } ..." )
@@ -679,13 +688,13 @@ async def cleanup_vectorizer_infrastructure(
679688 -- 1. Check if ANY view is using this as its target (safety)
680689 SELECT table_name INTO live_target
681690 FROM information_schema.view_table_usage
682- WHERE view_name = '{ config . ingest . table } _search'
691+ WHERE view_name = '{ target_name } _search'
683692 AND table_name IN ('{ vectorizer_name } ', '{ embedding_view } ')
684693 LIMIT 1;
685694
686695 -- 2. If it's live, we MUST drop the replica view first
687696 IF live_target IS NOT NULL THEN
688- EXECUTE 'DROP VIEW IF EXISTS ' || quote_ident('{ config . ingest . table } _search') || ' CASCADE';
697+ EXECUTE 'DROP VIEW IF EXISTS ' || quote_ident('{ target_name } _search') || ' CASCADE';
689698 END IF;
690699
691700 -- 3. Drop the pgai vectorizer if it exists
@@ -771,7 +780,7 @@ async def atomic_view_swap(
771780 await conn .set_autocommit (False )
772781 try :
773782 async with conn .cursor () as cur :
774- await cur .execute (f"DROP VIEW IF EXISTS { config . ingest . table } _search" )
783+ await cur .execute (f"DROP VIEW IF EXISTS { target_name } _search" )
775784
776785 extra_cols = ",\n " .join ([f"r.{ c } " for c in config .ingest .columns if c != config .ingest .p_key ])
777786 if extra_cols :
@@ -784,7 +793,7 @@ async def atomic_view_swap(
784793 logger .info ("Implementing Hybrid View with RRF scoring support..." )
785794 await cur .execute (
786795 f"""
787- CREATE VIEW { config . ingest . table } _search AS
796+ CREATE VIEW { target_name } _search AS
788797 SELECT
789798 r.{ config .ingest .p_key } ,
790799 e.chunk as chunk,
@@ -798,7 +807,7 @@ async def atomic_view_swap(
798807 # VECTOR SEARCH ONLY
799808 await cur .execute (
800809 f"""
801- CREATE VIEW { config . ingest . table } _search AS
810+ CREATE VIEW { target_name } _search AS
802811 SELECT
803812 r.{ config .ingest .p_key } ,
804813 e.chunk as chunk,
@@ -827,7 +836,7 @@ async def ensure_outbox_infrastructure(settings: Settings):
827836 # 1. The Outbox: Transactional log of vectorized changes
828837 await cur .execute (
829838 """
830- CREATE TABLE IF NOT EXISTS _sink_outbox (
839+ CREATE TABLE IF NOT EXISTS public. _sink_outbox (
831840 id BIGSERIAL PRIMARY KEY,
832841 target_name TEXT NOT NULL,
833842 version_id TEXT NOT NULL,
@@ -840,7 +849,7 @@ async def ensure_outbox_infrastructure(settings: Settings):
840849 )
841850 # Index for the MirrorWorker to poll efficiently
842851 await cur .execute (
843- "CREATE INDEX IF NOT EXISTS idx_sink_outbox_id ON _sink_outbox(id)"
852+ "CREATE INDEX IF NOT EXISTS idx_sink_outbox_id ON public. _sink_outbox(id)"
844853 )
845854
846855 # 2. Mirror Registry: Track progress of external sinks
@@ -926,10 +935,10 @@ async def setup_outbox_trigger(
926935 CREATE OR REPLACE FUNCTION { trigger_fn_name } () RETURNS TRIGGER AS $$
927936 BEGIN
928937 IF (TG_OP = 'DELETE') THEN
929- INSERT INTO _sink_outbox (target_name, version_id, source_id, action)
938+ INSERT INTO public. _sink_outbox (target_name, version_id, source_id, action)
930939 VALUES ('{ target_name } ', '{ version_id } ', OLD.{ config .ingest .p_key } ::text, 'DELETE');
931940 ELSE
932- INSERT INTO _sink_outbox (target_name, version_id, source_id, action, payload)
941+ INSERT INTO public. _sink_outbox (target_name, version_id, source_id, action, payload)
933942 VALUES (
934943 '{ target_name } ',
935944 '{ version_id } ',
@@ -949,14 +958,21 @@ async def setup_outbox_trigger(
949958
950959 # 2. Attach Trigger to pgai STORE table
951960 # We use AFTER INSERT OR UPDATE OR DELETE
952- await cur .execute (
953- f"""
954- DROP TRIGGER IF EXISTS { trigger_name } ON { vectorizer_name } ;
955- CREATE TRIGGER { trigger_name }
956- AFTER INSERT OR UPDATE OR DELETE ON { vectorizer_name }
957- FOR EACH ROW EXECUTE FUNCTION { trigger_fn_name } ();
958- """
959- )
961+ # Use try/except to handle race where worker hasn't created table yet
962+ try :
963+ await cur .execute (
964+ f"""
965+ DROP TRIGGER IF EXISTS { trigger_name } ON { vectorizer_name } ;
966+ CREATE TRIGGER { trigger_name }
967+ AFTER INSERT OR UPDATE OR DELETE ON { vectorizer_name }
968+ FOR EACH ROW EXECUTE FUNCTION { trigger_fn_name } ();
969+ """
970+ )
971+ except Exception as e :
972+ if "does not exist" in str (e ):
973+ logger .warning (f"Deferred trigger setup for { vectorizer_name } : Table not yet created by worker." )
974+ return # Retry next loop
975+ raise e
960976
961977 # 3. Backfill existing rows (Handling the Race Condition)
962978 # Since the vectorizer might have already processed rows before we attached the trigger,
@@ -1339,10 +1355,17 @@ async def find_and_fix_ghost_records(settings: Settings, config: SearchPipeline,
13391355 logger .debug (f"No records found for anti-entropy in { target_name } " )
13401356 return
13411357
1342- min_id_raw , max_id_raw = min (all_ids ), max (all_ids )
13431358 source_types = await get_source_column_types (settings , config )
13441359 id_type = source_types .get (config .ingest .p_key , "TEXT" )
13451360
1361+ # Ensure all IDs are of the same type for comparison
1362+ if id_type in ("INT" , "BIGINT" ):
1363+ all_ids = [int (x ) for x in all_ids ]
1364+ else :
1365+ all_ids = [str (x ) for x in all_ids ]
1366+
1367+ min_id_raw , max_id_raw = min (all_ids ), max (all_ids )
1368+
13461369 # 2. Strategy: Set Comparison for UUIDs/Strings or Small Tables
13471370 if id_type not in ("INT" , "BIGINT" ):
13481371 async with await get_source_conn () as s_conn :
@@ -1405,7 +1428,7 @@ async def drop_subscription_completely(settings: Settings, config: SearchPipelin
14051428 try :
14061429 async with await connect_db (settings .resolved_sink_url ) as conn :
14071430 await conn .set_autocommit (True )
1408- await conn .execute (f"DROP VIEW IF EXISTS { config . ingest . table } _search CASCADE" )
1431+ await conn .execute (f"DROP VIEW IF EXISTS { target_name } _search CASCADE" )
14091432
14101433 # Retry loop for dropping subscription to handle "sync in progress"
14111434 async def try_drop_subscription ():
@@ -1439,7 +1462,7 @@ async def try_drop_subscription():
14391462
14401463 # Cleanup vectorizers
14411464 async with conn .cursor () as cur :
1442- await cur .execute ("SELECT id FROM ai.vectorizer WHERE name LIKE %s" , (f"{ config . ingest . table } _store%" ,))
1465+ await cur .execute ("SELECT id FROM ai.vectorizer WHERE name LIKE %s" , (f"{ target_name } _store%" ,))
14431466 for (vid ,) in await cur .fetchall ():
14441467 await cur .execute (f"SELECT ai.drop_vectorizer({ vid } , drop_all => true)" )
14451468 except Exception as e :
0 commit comments