Skip to content
This repository was archived by the owner on May 27, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions projects/pgai/db/sql/idempotent/011-vectorizer-int.sql
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ begin
create table %I.%I
( %s
, queued_at pg_catalog.timestamptz not null default now()
, loading_retries pg_catalog.int4 not null default 0
, loading_retry_after pg_catalog.timestamptz
, attempts pg_catalog.int4 not null default 0
, retry_after pg_catalog.timestamptz
)
$sql$
, queue_schema, queue_table
Expand Down Expand Up @@ -507,6 +507,7 @@ begin
( %s
, created_at pg_catalog.timestamptz not null default now()
, failure_step pg_catalog.text not null default ''
, attempts pg_catalog.int4 not null default 0
)
$sql$
, queue_schema, queue_failed_table
Expand Down Expand Up @@ -1266,4 +1267,83 @@ end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;
;

create or replace function ai._get_next_queue_batch(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function be prepend with _? We are calling it from the worker. I thought we named things with _ when we deemed them private, in the sense that only public SQL functions reference them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The function is not intended to be used by anyone other than our python code, as such it is internal/private.

queue_table pg_catalog.regclass,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that it might be more clear if instead of queue_table use vectorizer_id. You can get the queue table from the same query that fetches the source_pk.

batch_size pg_catalog.int4
) returns setof record AS $$
declare
source_pk pg_catalog.jsonb;
lock_id_string pg_catalog.text;
query pg_catalog.text;
lock_count pg_catalog.int4 := 0;
row record;
begin
-- get the source_pk for this queue table
select v.source_pk
into source_pk
from ai.vectorizer v
where pg_catalog.to_regclass(pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)) operator(pg_catalog.=) _get_next_queue_batch.queue_table;

-- construct the "lock id string"
-- this is a string of all pk column names and their values, e.g. for a
-- two-column pk consisting of 'time' and 'url' this will generate:
-- hashtext(format('time|%s|url|%s', time, url))
select pg_catalog.format($fmt$pg_catalog.hashtext(pg_catalog.format('%s', %s))$fmt$, format_string, format_args)
into lock_id_string
from (
select
pg_catalog.string_agg(pg_catalog.format('%s|%%s', attname), '|' order by attnum) as format_string
, pg_catalog.string_agg(attname, ', ' order by attnum) as format_args
from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text)
) as _;

-- TODO: for very small batch sizes (<10), an array _may_ be faster
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a small reservation about the potential catalog bloat that using a temp table might cause, however hopefully the frequency of the execution of this function is low enough to be tolerable.

drop table if exists seen_lock_ids;
create temporary table seen_lock_ids (lock_id bigint);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably won't ever call this function twice in the same transaction, but having a drop temporary table if exists seen_lock_ids; right before the create will make this more resilient.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks. Done.

create index on seen_lock_ids (lock_id);

-- construct query to get all
query := pg_catalog.format($sql$
select
q.ctid as _ctid
, %s as _lock_id
, q.*
from %s as q
where (retry_after is null or retry_after <= now())
and %s not in (
-- exclude all locks that we already hold
select objid::int
from pg_locks
where locktype = 'advisory'
and pid = pg_catalog.pg_backend_pid()
and classid = %s
)
$sql$, lock_id_string, _get_next_queue_batch.queue_table, lock_id_string, _get_next_queue_batch.queue_table::pg_catalog.oid);

for row in execute query
loop
if lock_count operator(pg_catalog.>=) batch_size then
exit;
end if;

if exists(select 1 from pg_temp.seen_lock_ids WHERE lock_id operator(pg_catalog.=) row._lock_id) then
continue;
end if;

insert into pg_temp.seen_lock_ids (lock_id) values (row._lock_id);

if pg_catalog.pg_try_advisory_lock(queue_table::pg_catalog.oid::int, row._lock_id) then
lock_count := lock_count operator(pg_catalog.+) 1;
return next row;
end if;
end loop;

drop table seen_lock_ids;

return;
end;
$$ language plpgsql
set search_path to pg_catalog, pg_temp
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- rename loading_retries and loading_retry_after for all existing queue tables
do language plpgsql $block$
declare
_vectorizer record;
begin
for _vectorizer in select queue_schema, queue_table from ai.vectorizer
loop
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
end loop;
for _vectorizer in select queue_schema, queue_failed_table from ai.vectorizer
loop
execute format('alter table %I.%I add column attempts pg_catalog.int4 not null default 0', _vectorizer.queue_schema, _vectorizer.queue_failed_table);
end loop;
end;
$block$;
11 changes: 11 additions & 0 deletions projects/pgai/db/tests/golden/failed-queue-table-16.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Table "ai._vectorizer_q_failed_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
--------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
created_at | timestamp with time zone | | not null | now() | plain | | |
failure_step | text | | not null | ''::text | extended | | |
attempts | integer | | not null | 0 | plain | | |
Indexes:
"_vectorizer_q_failed_1_title_published_idx" btree (title, published)
Access method: heap
11 changes: 11 additions & 0 deletions projects/pgai/db/tests/golden/failed-queue-table-17.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Table "ai._vectorizer_q_failed_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
--------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
created_at | timestamp with time zone | | not null | now() | plain | | |
failure_step | text | | not null | ''::text | extended | | |
attempts | integer | | not null | 0 | plain | | |
Indexes:
"_vectorizer_q_failed_1_title_published_idx" btree (title, published)
Access method: heap
14 changes: 7 additions & 7 deletions projects/pgai/db/tests/golden/queue-table-16.expected
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Table "ai._vectorizer_q_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
loading_retries | integer | | not null | 0 | plain | | |
loading_retry_after | timestamp with time zone | | | | plain | | |
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
-------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
attempts | integer | | not null | 0 | plain | | |
retry_after | timestamp with time zone | | | | plain | | |
Indexes:
"_vectorizer_q_1_title_published_idx" btree (title, published)
Access method: heap
14 changes: 7 additions & 7 deletions projects/pgai/db/tests/golden/queue-table-17.expected
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Table "ai._vectorizer_q_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
loading_retries | integer | | not null | 0 | plain | | |
loading_retry_after | timestamp with time zone | | | | plain | | |
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
-------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
attempts | integer | | not null | 0 | plain | | |
retry_after | timestamp with time zone | | | | plain | | |
Indexes:
"_vectorizer_q_1_title_published_idx" btree (title, published)
Access method: heap
4 changes: 4 additions & 0 deletions projects/pgai/db/tests/vectorizer/test_vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ def test_vectorizer_timescaledb():
actual = psql_cmd(r"\d+ ai._vectorizer_q_1")
golden_check("queue-table", actual)

# does the queue failed table look right?
actual = psql_cmd(r"\d+ ai._vectorizer_q_failed_1")
golden_check("failed-queue-table", actual)

# does the view look right?
actual = psql_cmd(r"\d+ website.blog_embedding")
golden_check("view", actual)
Expand Down
128 changes: 126 additions & 2 deletions projects/pgai/pgai/data/ai.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,49 @@ begin
end;
$outer_migration_block$;

-------------------------------------------------------------------------------
-- 032-split-transaction-support.sql
do $outer_migration_block$ /*032-split-transaction-support.sql*/
declare
_sql text;
_migration record;
_migration_name text = $migration_name$032-split-transaction-support.sql$migration_name$;
_migration_body text =
$migration_body$
-- rename loading_retries and loading_retry_after for all existing queue tables
do language plpgsql $block$
declare
_vectorizer record;
begin
for _vectorizer in select queue_schema, queue_table from ai.vectorizer
loop
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
end loop;
for _vectorizer in select queue_schema, queue_failed_table from ai.vectorizer
loop
execute format('alter table %I.%I add column attempts pg_catalog.int4 not null default 0', _vectorizer.queue_schema, _vectorizer.queue_failed_table);
end loop;
end;
$block$;

$migration_body$;
begin
select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
if _migration is not null then
raise notice 'migration %s already applied. skipping.', _migration_name;
if _migration.body operator(pg_catalog.!=) _migration_body then
raise warning 'the contents of migration "%s" have changed', _migration_name;
end if;
return;
end if;
_sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
execute _sql;
insert into ai.pgai_lib_migration ("name", body, applied_at_version)
values (_migration_name, _migration_body, $version$__version__$version$);
end;
$outer_migration_block$;

--------------------------------------------------------------------------------
-- 001-chunking.sql

Expand Down Expand Up @@ -2704,8 +2747,8 @@ begin
create table %I.%I
( %s
, queued_at pg_catalog.timestamptz not null default now()
, loading_retries pg_catalog.int4 not null default 0
, loading_retry_after pg_catalog.timestamptz
, attempts pg_catalog.int4 not null default 0
, retry_after pg_catalog.timestamptz
)
$sql$
, queue_schema, queue_table
Expand Down Expand Up @@ -2787,6 +2830,7 @@ begin
( %s
, created_at pg_catalog.timestamptz not null default now()
, failure_step pg_catalog.text not null default ''
, attempts pg_catalog.int4 not null default 0
)
$sql$
, queue_schema, queue_failed_table
Expand Down Expand Up @@ -3548,6 +3592,86 @@ language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;

create or replace function ai._get_next_queue_batch(
queue_table pg_catalog.regclass,
batch_size pg_catalog.int4
) returns setof record AS $$
declare
source_pk pg_catalog.jsonb;
lock_id_string pg_catalog.text;
query pg_catalog.text;
lock_count pg_catalog.int4 := 0;
row record;
begin
-- get the source_pk for this queue table
select v.source_pk
into source_pk
from ai.vectorizer v
where pg_catalog.to_regclass(pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)) operator(pg_catalog.=) _get_next_queue_batch.queue_table;

-- construct the "lock id string"
-- this is a string of all pk column names and their values, e.g. for a
-- two-column pk consisting of 'time' and 'url' this will generate:
-- hashtext(format('time|%s|url|%s', time, url))
select pg_catalog.format($fmt$pg_catalog.hashtext(pg_catalog.format('%s', %s))$fmt$, format_string, format_args)
into lock_id_string
from (
select
pg_catalog.string_agg(pg_catalog.format('%s|%%s', attname), '|' order by attnum) as format_string
, pg_catalog.string_agg(attname, ', ' order by attnum) as format_args
from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text)
) as _;

-- TODO: for very small batch sizes (<10), an array _may_ be faster
drop table if exists seen_lock_ids;
create temporary table seen_lock_ids (lock_id bigint);
create index on seen_lock_ids (lock_id);

-- construct query to get all
query := pg_catalog.format($sql$
select
q.ctid as _ctid
, %s as _lock_id
, q.*
from %s as q
where (retry_after is null or retry_after <= now())
and %s not in (
-- exclude all locks that we already hold
select objid::int
from pg_locks
where locktype = 'advisory'
and pid = pg_catalog.pg_backend_pid()
and classid = %s
)
$sql$, lock_id_string, _get_next_queue_batch.queue_table, lock_id_string, _get_next_queue_batch.queue_table::pg_catalog.oid);

for row in execute query
loop
if lock_count operator(pg_catalog.>=) batch_size then
exit;
end if;

if exists(select 1 from pg_temp.seen_lock_ids WHERE lock_id operator(pg_catalog.=) row._lock_id) then
continue;
end if;

insert into pg_temp.seen_lock_ids (lock_id) values (row._lock_id);

if pg_catalog.pg_try_advisory_lock(queue_table::pg_catalog.oid::int, row._lock_id) then
lock_count := lock_count operator(pg_catalog.+) 1;
return next row;
end if;
end loop;

drop table seen_lock_ids;

return;
end;
$$ language plpgsql
set search_path to pg_catalog, pg_temp
;


--------------------------------------------------------------------------------
-- 012-vectorizer-api.sql
-------------------------------------------------------------------------------
Expand Down
13 changes: 12 additions & 1 deletion projects/pgai/pgai/vectorizer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
from .create_vectorizer import CreateVectorizer
from .vectorizer import Executor, Vectorizer
from .vectorizer import (
EmbeddingError,
Executor,
FormattingError,
LoadingError,
ParsingError,
Vectorizer,
)
from .worker import Worker

__all__ = [
"Vectorizer",
"Executor",
"CreateVectorizer",
"Worker",
"FormattingError",
"LoadingError",
"ParsingError",
"EmbeddingError",
]
4 changes: 2 additions & 2 deletions projects/pgai/pgai/vectorizer/embedders/voyageai.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def voyage_token_counter(model: str) -> Callable[[str], int] | None:
try:
tokenizer: Tokenizer = client.tokenizer(model)
return lambda text: len(tokenizer.encode(text).tokens)
except BaseException:
logger.warn(f"Tokenizer for model '{model}' not found")
except Exception as e:
logger.warn(f"Tokenizer for model '{model}' not found: {e}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally speaking simply using exc_info=True is going to look nicer than printing the stringified version. It also shows the full stack and chain of exceptions if there was one.

return None


Expand Down
Loading