Skip to content

Commit 4e41c46

Browse files
authored
fix(migrations): make spans.task_id rollout safe for large tables (#223)
1 parent abd514b commit 4e41c46

6 files changed

Lines changed: 454 additions & 30 deletions

File tree

agentex/database/migrations/alembic/env.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,35 @@
1212
from alembic import context
1313
from sqlalchemy import engine_from_config, pool
1414

15+
# Default Postgres timeouts applied to every migration. They keep a stuck
16+
# migration from queueing behind active writes and holding locks indefinitely.
17+
#
18+
# - lock_timeout: how long a statement waits for a lock before aborting. 3s
19+
# means a migration that cannot acquire its lock quickly gives up instead of
20+
# blocking writers behind it.
21+
# - statement_timeout: maximum runtime for any single statement. 30s catches
22+
# runaway DDL/UPDATEs; long index builds must use CREATE INDEX CONCURRENTLY
23+
# in an autocommit_block, which runs outside the transaction-bound timeout.
24+
# - idle_in_transaction_session_timeout: kills a transaction that has gone
25+
# idle while still holding locks (e.g. a stalled AccessExclusiveLock).
26+
#
27+
# These are session-level so they persist across each per-migration
28+
# transaction and across autocommit_block boundaries on the same connection.
29+
# Migration authors must NOT override them with `SET lock_timeout` or
30+
# `SET statement_timeout` inside a migration file — the migration linter
31+
# (scripts/ci_tools/migration_lint.py) flags those, with the
32+
# `migration-unsafe-ack` PR label as the documented escape hatch for
33+
# genuinely-long migrations that need a maintenance window.
34+
DEFAULT_MIGRATION_TIMEOUTS: dict[str, str] = {
35+
"lock_timeout": "3s",
36+
"statement_timeout": "30s",
37+
"idle_in_transaction_session_timeout": "10s",
38+
}
39+
40+
41+
def _format_set_statements(timeouts: dict[str, str]) -> list[str]:
42+
return [f"SET {key} = '{value}'" for key, value in timeouts.items()]
43+
1544
# Add explicit error handling to catch import errors
1645
try:
1746
print("Starting migration - importing modules")
@@ -83,6 +112,8 @@ def run_migrations_offline() -> None:
83112
)
84113

85114
with context.begin_transaction():
115+
for stmt in _format_set_statements(DEFAULT_MIGRATION_TIMEOUTS):
116+
context.execute(stmt)
86117
context.run_migrations()
87118
except Exception as e:
88119
print("ERROR IN OFFLINE MIGRATIONS:", str(e))
@@ -106,7 +137,30 @@ def run_migrations_online() -> None:
106137
)
107138

108139
with connectable.connect() as connection:
109-
context.configure(connection=connection, target_metadata=target_metadata)
140+
# Apply default migration timeouts at the session level so they
141+
# persist across per-migration transactions and any autocommit_block
142+
# boundaries opened by migrations (e.g. for CREATE INDEX CONCURRENTLY).
143+
#
144+
# exec_driver_sql autobegins a SQLAlchemy transaction. We commit it
145+
# before configure() so alembic doesn't latch onto it as an
146+
# "external" transaction — that mode disables transaction_per_migration
147+
# and breaks autocommit_block (which asserts self._transaction is not
148+
# None). Postgres SET is session-level, so the timeouts persist past
149+
# the commit.
150+
for stmt in _format_set_statements(DEFAULT_MIGRATION_TIMEOUTS):
151+
connection.exec_driver_sql(stmt)
152+
connection.commit()
153+
154+
# transaction_per_migration=True wraps each migration in its own
155+
# transaction (instead of a single outer transaction for all
156+
# migrations). This lets individual migrations opt into
157+
# autocommit_block() for operations that cannot run inside a
158+
# transaction, such as CREATE INDEX CONCURRENTLY.
159+
context.configure(
160+
connection=connection,
161+
target_metadata=target_metadata,
162+
transaction_per_migration=True,
163+
)
110164

111165
with context.begin_transaction():
112166
context.run_migrations()

agentex/database/migrations/alembic/versions/2026_04_14_1126_add_task_id_to_spans_57c5ed4f59ae.py

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,26 @@
44
Revises: 4a9b7787ccd7
55
Create Date: 2026-04-14 11:26:45.193515
66
7+
The original version of this migration also ran a large UPDATE backfill,
8+
added a foreign key (which scanned the full table under AccessExclusiveLock),
9+
and created an index non-concurrently. On a sufficiently large spans table
10+
this can exhaust the connection pool while concurrent span writes pile up
11+
behind the lock.
12+
13+
The backfill, FK and index are now handled out-of-band (see
14+
docs/runbooks/spans-task-id-backfill.md) and a follow-up tail migration
15+
finalizes the FK + index with non-blocking operations. This revision is
16+
reduced to the only safe in-band step: adding the nullable column. Adding a
17+
nullable column with no default is a metadata-only operation in PostgreSQL
18+
>= 11, so it is fast and does not block writes.
19+
20+
The IF NOT EXISTS guard makes the migration safe to re-run on environments
21+
where the original (heavier) version of this migration already completed
22+
successfully.
723
"""
824
from typing import Sequence, Union
925

1026
from alembic import op
11-
import sqlalchemy as sa
1227

1328

1429
# revision identifiers, used by Alembic.
@@ -19,34 +34,8 @@
1934

2035

2136
def upgrade() -> None:
22-
# Add nullable task_id column first (no FK yet, so backfill can run freely)
23-
op.add_column('spans', sa.Column('task_id', sa.String(), nullable=True))
24-
25-
# Backfill task_id from trace_id where trace_id is a valid task ID.
26-
# Uses a JOIN instead of a subquery for efficient matching.
27-
op.execute("""
28-
UPDATE spans
29-
SET task_id = spans.trace_id
30-
FROM tasks
31-
WHERE spans.trace_id = tasks.id
32-
AND spans.task_id IS NULL
33-
""")
34-
35-
# Add FK constraint after backfill (NULL values are allowed by FK)
36-
op.create_foreign_key(
37-
'fk_spans_task_id_tasks',
38-
'spans',
39-
'tasks',
40-
['task_id'],
41-
['id'],
42-
ondelete='SET NULL',
43-
)
44-
45-
# Add index for querying spans by task_id
46-
op.create_index('ix_spans_task_id', 'spans', ['task_id'])
37+
op.execute("ALTER TABLE spans ADD COLUMN IF NOT EXISTS task_id VARCHAR")
4738

4839

4940
def downgrade() -> None:
50-
op.drop_index('ix_spans_task_id', table_name='spans')
51-
op.drop_constraint('fk_spans_task_id_tasks', 'spans', type_='foreignkey')
52-
op.drop_column('spans', 'task_id')
41+
op.execute("ALTER TABLE spans DROP COLUMN IF EXISTS task_id")
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""finalize_spans_task_id
2+
3+
Revision ID: a9959ebcbe98
4+
Revises: e9c4ff9e6542
5+
Create Date: 2026-05-06 12:00:00.000000
6+
7+
Finalizes the spans.task_id column added in 57c5ed4f59ae by attaching the
8+
foreign key and creating the lookup index using non-blocking operations.
9+
10+
This migration is intentionally split out from 57c5ed4f59ae so that:
11+
12+
* The FK is added with NOT VALID, which acquires only a brief lock and
13+
skips the full-table scan that the original migration triggered. The FK
14+
is still enforced on all subsequent inserts and updates (and ON DELETE
15+
SET NULL still applies to existing rows).
16+
* The index is built CONCURRENTLY so writes are not blocked.
17+
* Both operations live in autocommit_block() so they run outside the
18+
surrounding migration transaction (CONCURRENTLY cannot run inside a
19+
transaction).
20+
21+
The migration is idempotent: on environments where the original version of
22+
57c5ed4f59ae completed successfully (the FK and index already exist), each
23+
operation is a no-op via IF NOT EXISTS / pg_constraint catalog checks.
24+
25+
The historical backfill of task_id from trace_id is intentionally not run
26+
here — it is a separate, operator-driven step (see
27+
docs/runbooks/spans-task-id-backfill.md). The application reads tolerate
28+
NULL task_id by falling back to trace_id at query time.
29+
"""
30+
from typing import Sequence, Union
31+
32+
from alembic import op
33+
34+
35+
# revision identifiers, used by Alembic.
36+
revision: str = 'a9959ebcbe98'
37+
down_revision: Union[str, None] = 'e9c4ff9e6542'
38+
branch_labels: Union[str, Sequence[str], None] = None
39+
depends_on: Union[str, Sequence[str], None] = None
40+
41+
42+
def upgrade() -> None:
43+
with op.get_context().autocommit_block():
44+
op.execute(
45+
"""
46+
DO $$
47+
BEGIN
48+
IF NOT EXISTS (
49+
SELECT 1
50+
FROM pg_constraint
51+
WHERE conname = 'fk_spans_task_id_tasks'
52+
) THEN
53+
ALTER TABLE spans
54+
ADD CONSTRAINT fk_spans_task_id_tasks
55+
FOREIGN KEY (task_id) REFERENCES tasks(id)
56+
ON DELETE SET NULL
57+
NOT VALID;
58+
END IF;
59+
END$$;
60+
"""
61+
)
62+
op.execute(
63+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_task_id "
64+
"ON spans (task_id)"
65+
)
66+
67+
68+
def downgrade() -> None:
69+
with op.get_context().autocommit_block():
70+
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_spans_task_id")
71+
op.execute(
72+
"ALTER TABLE spans DROP CONSTRAINT IF EXISTS fk_spans_task_id_tasks"
73+
)

agentex/src/domain/repositories/span_repository.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Annotated, Any
22

33
from fastapi import Depends
4+
from sqlalchemy import or_, select
45
from src.adapters.crud_store.adapter_postgres import PostgresCRUDRepository
56
from src.adapters.orm import SpanORM
67
from src.config.dependencies import (
@@ -36,6 +37,35 @@ async def list(
3637
) -> list[SpanEntity]:
3738
# Default to start_time if no order_by specified
3839
effective_order_by = order_by or "start_time"
40+
41+
# Filtering by task_id matches both the new task_id column and historical
42+
# rows where the value was stored in trace_id. The task_id column was
43+
# added late in the table's life and the prod backfill is run out-of-band
44+
# rather than via migration (see docs/runbooks/spans-task-id-backfill.md),
45+
# so old rows can have task_id NULL even when they belong to a task. For
46+
# task-scoped spans, trace_id holds the task id, so we OR the two columns
47+
# at read time. Both columns are indexed.
48+
#
49+
# The OR fallback is skipped when task_id is None — applying it would
50+
# expand to (task_id IS NULL OR trace_id IS NULL), which on a large
51+
# spans table where virtually all historical rows have task_id NULL
52+
# would return an enormous, unintended result set. A None task_id
53+
# filter falls through to the parent's normal IS NULL handling.
54+
if filters and filters.get("task_id") is not None:
55+
remaining_filters = {k: v for k, v in filters.items() if k != "task_id"}
56+
task_id_value = filters["task_id"]
57+
query = select(self.orm).where(
58+
or_(SpanORM.task_id == task_id_value, SpanORM.trace_id == task_id_value)
59+
)
60+
return await super().list(
61+
filters=remaining_filters or None,
62+
query=query,
63+
order_by=effective_order_by,
64+
order_direction=order_direction,
65+
limit=limit,
66+
page_number=page_number,
67+
)
68+
3969
return await super().list(
4070
filters=filters,
4171
order_by=effective_order_by,
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Unit tests for the migration runner timeout defaults.
2+
3+
Sanity-checks the constants and the SQL formatting helper. The actual
4+
wiring into Alembic's ``run_migrations_online`` / ``run_migrations_offline``
5+
is exercised end-to-end whenever a migration runs locally or in CI, so we
6+
keep this layer to a focused unit test of the values we promise to ship.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from pathlib import Path
12+
13+
import pytest
14+
15+
_ENV_PATH = (
16+
Path(__file__).resolve().parents[3]
17+
/ "database"
18+
/ "migrations"
19+
/ "alembic"
20+
/ "env.py"
21+
)
22+
23+
24+
def _read_env_text() -> str:
25+
return _ENV_PATH.read_text()
26+
27+
28+
def test_default_timeouts_present_in_env() -> None:
29+
text = _read_env_text()
30+
# Ensure the runner sets all three timeouts with the values committed to
31+
# in the spec, so a future refactor that drops one fails this test.
32+
assert "DEFAULT_MIGRATION_TIMEOUTS" in text
33+
assert '"lock_timeout": "3s"' in text
34+
assert '"statement_timeout": "30s"' in text
35+
assert '"idle_in_transaction_session_timeout": "10s"' in text
36+
37+
38+
def test_timeouts_applied_in_online_and_offline_modes() -> None:
39+
text = _read_env_text()
40+
# Online mode: SET statements applied via the live connection before
41+
# context.begin_transaction() so they persist at session level.
42+
assert "connection.exec_driver_sql(stmt)" in text
43+
# Offline mode: SET statements emitted at the top of the generated SQL
44+
# via context.execute().
45+
assert "context.execute(stmt)" in text
46+
47+
48+
def test_format_set_statements_helper_shape() -> None:
49+
# The env.py module imports server-side code (env vars, ORM autoloader);
50+
# skip full execution and re-derive the helper via exec to keep this
51+
# micro-test free of the application stack.
52+
namespace: dict[str, object] = {}
53+
helper_src = (
54+
"def _format_set_statements(timeouts):\n"
55+
" return [f\"SET {k} = '{v}'\" for k, v in timeouts.items()]\n"
56+
)
57+
exec(helper_src, namespace)
58+
formatter = namespace["_format_set_statements"]
59+
out = formatter(
60+
{
61+
"lock_timeout": "3s",
62+
"statement_timeout": "30s",
63+
}
64+
)
65+
assert out == [
66+
"SET lock_timeout = '3s'",
67+
"SET statement_timeout = '30s'",
68+
]
69+
70+
71+
def test_runner_documents_escape_hatch() -> None:
72+
text = _read_env_text()
73+
# The CLAUDE.md docs and the linter both reference "migration-unsafe-ack"
74+
# as the escape hatch — make sure the runner's docstring mentions it so
75+
# anyone reading env.py understands the contract.
76+
assert "migration-unsafe-ack" in text
77+
78+
79+
@pytest.mark.parametrize(
80+
"needle",
81+
(
82+
"lock_timeout",
83+
"statement_timeout",
84+
"idle_in_transaction_session_timeout",
85+
),
86+
)
87+
def test_each_timeout_setting_referenced(needle: str) -> None:
88+
assert needle in _read_env_text()

0 commit comments

Comments
 (0)