-
Notifications
You must be signed in to change notification settings - Fork 310
feat: split transaction queue item processing #820
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -424,8 +424,8 @@ begin | |
| create table %I.%I | ||
| ( %s | ||
| , queued_at pg_catalog.timestamptz not null default now() | ||
| , loading_retries pg_catalog.int4 not null default 0 | ||
| , loading_retry_after pg_catalog.timestamptz | ||
| , attempts pg_catalog.int4 not null default 0 | ||
| , retry_after pg_catalog.timestamptz | ||
| ) | ||
| $sql$ | ||
| , queue_schema, queue_table | ||
|
|
@@ -507,6 +507,7 @@ begin | |
| ( %s | ||
| , created_at pg_catalog.timestamptz not null default now() | ||
| , failure_step pg_catalog.text not null default '' | ||
| , attempts pg_catalog.int4 not null default 0 | ||
| ) | ||
| $sql$ | ||
| , queue_schema, queue_failed_table | ||
|
|
@@ -1266,4 +1267,83 @@ end | |
| $func$ | ||
| language plpgsql volatile security invoker | ||
| set search_path to pg_catalog, pg_temp | ||
| ; | ||
| ; | ||
|
|
||
| create or replace function ai._get_next_queue_batch( | ||
| queue_table pg_catalog.regclass, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking that it might be more clear if instead of queue_table use vectorizer_id. You can get the queue table from the same query that fetches the source_pk. |
||
| batch_size pg_catalog.int4 | ||
| ) returns setof record AS $$ | ||
| declare | ||
| source_pk pg_catalog.jsonb; | ||
| lock_id_string pg_catalog.text; | ||
| query pg_catalog.text; | ||
| lock_count pg_catalog.int4 := 0; | ||
| row record; | ||
| begin | ||
| -- get the source_pk for this queue table | ||
| select v.source_pk | ||
| into source_pk | ||
| from ai.vectorizer v | ||
| where pg_catalog.to_regclass(pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)) operator(pg_catalog.=) _get_next_queue_batch.queue_table; | ||
|
|
||
| -- construct the "lock id string" | ||
| -- this is a string of all pk column names and their values, e.g. for a | ||
| -- two-column pk consisting of 'time' and 'url' this will generate: | ||
| -- hashtext(format('time|%s|url|%s', time, url)) | ||
| select pg_catalog.format($fmt$pg_catalog.hashtext(pg_catalog.format('%s', %s))$fmt$, format_string, format_args) | ||
| into lock_id_string | ||
| from ( | ||
| select | ||
| pg_catalog.string_agg(pg_catalog.format('%s|%%s', attname), '|' order by attnum) as format_string | ||
| , pg_catalog.string_agg(attname, ', ' order by attnum) as format_args | ||
| from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text) | ||
| ) as _; | ||
|
|
||
| -- TODO: for very small batch sizes (<10), an array _may_ be faster | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do have a small reservation about the potential catalog bloat that using a temp table might cause, however hopefully the frequency of the execution of this function is low enough to be tolerable. |
||
| drop table if exists seen_lock_ids; | ||
| create temporary table seen_lock_ids (lock_id bigint); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you probably won't ever call this function twice in the same transaction, but having a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, thanks. Done. |
||
| create index on seen_lock_ids (lock_id); | ||
|
|
||
| -- construct query to get all | ||
| query := pg_catalog.format($sql$ | ||
| select | ||
| q.ctid as _ctid | ||
| , %s as _lock_id | ||
| , q.* | ||
| from %s as q | ||
| where (retry_after is null or retry_after <= now()) | ||
| and %s not in ( | ||
| -- exclude all locks that we already hold | ||
| select objid::int | ||
| from pg_locks | ||
| where locktype = 'advisory' | ||
| and pid = pg_catalog.pg_backend_pid() | ||
| and classid = %s | ||
| ) | ||
| $sql$, lock_id_string, _get_next_queue_batch.queue_table, lock_id_string, _get_next_queue_batch.queue_table::pg_catalog.oid); | ||
|
|
||
| for row in execute query | ||
| loop | ||
| if lock_count operator(pg_catalog.>=) batch_size then | ||
| exit; | ||
| end if; | ||
|
|
||
| if exists(select 1 from pg_temp.seen_lock_ids WHERE lock_id operator(pg_catalog.=) row._lock_id) then | ||
| continue; | ||
| end if; | ||
|
|
||
| insert into pg_temp.seen_lock_ids (lock_id) values (row._lock_id); | ||
|
|
||
| if pg_catalog.pg_try_advisory_lock(queue_table::pg_catalog.oid::int, row._lock_id) then | ||
| lock_count := lock_count operator(pg_catalog.+) 1; | ||
| return next row; | ||
| end if; | ||
| end loop; | ||
|
|
||
| drop table seen_lock_ids; | ||
|
|
||
| return; | ||
| end; | ||
| $$ language plpgsql | ||
| set search_path to pg_catalog, pg_temp | ||
| ; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| -- rename loading_retries and loading_retry_after for all existing queue tables | ||
| do language plpgsql $block$ | ||
| declare | ||
| _vectorizer record; | ||
| begin | ||
| for _vectorizer in select queue_schema, queue_table from ai.vectorizer | ||
| loop | ||
| execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table); | ||
| execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table); | ||
| end loop; | ||
| for _vectorizer in select queue_schema, queue_failed_table from ai.vectorizer | ||
| loop | ||
| execute format('alter table %I.%I add column attempts pg_catalog.int4 not null default 0', _vectorizer.queue_schema, _vectorizer.queue_failed_table); | ||
| end loop; | ||
| end; | ||
| $block$; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| Table "ai._vectorizer_q_failed_1" | ||
| Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | ||
| --------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+------------- | ||
| title | text | | not null | | extended | | | | ||
| published | timestamp with time zone | | not null | | plain | | | | ||
| created_at | timestamp with time zone | | not null | now() | plain | | | | ||
| failure_step | text | | not null | ''::text | extended | | | | ||
| attempts | integer | | not null | 0 | plain | | | | ||
| Indexes: | ||
| "_vectorizer_q_failed_1_title_published_idx" btree (title, published) | ||
| Access method: heap |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| Table "ai._vectorizer_q_failed_1" | ||
| Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | ||
| --------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+------------- | ||
| title | text | | not null | | extended | | | | ||
| published | timestamp with time zone | | not null | | plain | | | | ||
| created_at | timestamp with time zone | | not null | now() | plain | | | | ||
| failure_step | text | | not null | ''::text | extended | | | | ||
| attempts | integer | | not null | 0 | plain | | | | ||
| Indexes: | ||
| "_vectorizer_q_failed_1_title_published_idx" btree (title, published) | ||
| Access method: heap |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,11 @@ | ||
| Table "ai._vectorizer_q_1" | ||
| Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | ||
| ---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+------------- | ||
| title | text | | not null | | extended | | | | ||
| published | timestamp with time zone | | not null | | plain | | | | ||
| queued_at | timestamp with time zone | | not null | now() | plain | | | | ||
| loading_retries | integer | | not null | 0 | plain | | | | ||
| loading_retry_after | timestamp with time zone | | | | plain | | | | ||
| Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | ||
| -------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+------------- | ||
| title | text | | not null | | extended | | | | ||
| published | timestamp with time zone | | not null | | plain | | | | ||
| queued_at | timestamp with time zone | | not null | now() | plain | | | | ||
| attempts | integer | | not null | 0 | plain | | | | ||
| retry_after | timestamp with time zone | | | | plain | | | | ||
| Indexes: | ||
| "_vectorizer_q_1_title_published_idx" btree (title, published) | ||
| Access method: heap |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,11 @@ | ||
| Table "ai._vectorizer_q_1" | ||
| Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | ||
| ---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+------------- | ||
| title | text | | not null | | extended | | | | ||
| published | timestamp with time zone | | not null | | plain | | | | ||
| queued_at | timestamp with time zone | | not null | now() | plain | | | | ||
| loading_retries | integer | | not null | 0 | plain | | | | ||
| loading_retry_after | timestamp with time zone | | | | plain | | | | ||
| Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | ||
| -------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+------------- | ||
| title | text | | not null | | extended | | | | ||
| published | timestamp with time zone | | not null | | plain | | | | ||
| queued_at | timestamp with time zone | | not null | now() | plain | | | | ||
| attempts | integer | | not null | 0 | plain | | | | ||
| retry_after | timestamp with time zone | | | | plain | | | | ||
| Indexes: | ||
| "_vectorizer_q_1_title_published_idx" btree (title, published) | ||
| Access method: heap |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,21 @@ | ||
| from .create_vectorizer import CreateVectorizer | ||
| from .vectorizer import Executor, Vectorizer | ||
| from .vectorizer import ( | ||
| EmbeddingError, | ||
| Executor, | ||
| FormattingError, | ||
| LoadingError, | ||
| ParsingError, | ||
| Vectorizer, | ||
| ) | ||
| from .worker import Worker | ||
|
|
||
| __all__ = [ | ||
| "Vectorizer", | ||
| "Executor", | ||
| "CreateVectorizer", | ||
| "Worker", | ||
| "FormattingError", | ||
| "LoadingError", | ||
| "ParsingError", | ||
| "EmbeddingError", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,8 +38,8 @@ def voyage_token_counter(model: str) -> Callable[[str], int] | None: | |
| try: | ||
| tokenizer: Tokenizer = client.tokenizer(model) | ||
| return lambda text: len(tokenizer.encode(text).tokens) | ||
| except BaseException: | ||
| logger.warn(f"Tokenizer for model '{model}' not found") | ||
| except Exception as e: | ||
| logger.warn(f"Tokenizer for model '{model}' not found: {e}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think generally speaking simply using |
||
| return None | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function be prepend with
_? We are calling it from the worker. I thought we named things with_when we deemed them private, in the sense that only public SQL functions reference them.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. The function is not intended to be used by anyone other than our python code, as such it is internal/private.