feat(jobs/mcp): thread_id continuation with race-safe FIFO queue#1243
Merged
Conversation
…b_id Fix fake Activity attributes in environment tests (task_result_id -> id/thread_id/status) to match the updated API surface from the thread continuation feature.
… outages Closes a TOCTOU race in the thread-continuation FIFO and bounds the blast radius of broker outages so a single bad submission cannot strand QUEUED siblings or double-promote them. - DB: partial unique constraint `activity_one_active_per_thread` enforces "at most one active (READY/RUNNING) API/MCP Activity per thread". QUEUED is intentionally outside the constraint; webhook trigger types are also excluded (they share deterministic thread_ids across events). - Dispatcher: atomic CAS (`UPDATE filter(status=QUEUED) -> READY`) replaces the racy read-then-save; an in-call loop replaces signal recursion; a cap of 3 consecutive enqueue failures bails the loop so a broker outage leaves the rest QUEUED for the new `release_orphan_queued_threads` management command to recover. - Services: post-create error paths (enqueue or task_result_id link) transition rows to FAILED with `finished_at` set and re-emit `activity_finished` so queued siblings advance. - API/MCP: schema-level UUID validation for `thread_id` (proper 422 on malformed input); MCP `submit_job` now rejects unauthenticated calls early; MCP batch-poll timeout reports each job's real status (QUEUED/READY/RUNNING) instead of a `PENDING` placeholder. - UI: dedicated "Waiting in queue" hero and amber QUEUED status badge.
Replace the hand-rolled regex pattern on the REST schema and the manual ``uuid_mod.UUID(thread_id)`` parse inside the MCP ``submit_job`` tool with a Pydantic ``UUID`` type. Pydantic validates UUIDs natively, so both entrypoints now share one boundary check expressed in the type system instead of two divergent ad-hoc styles. The REST path is fully covered: ninja always runs Pydantic, and the existing 422-at-schema tests still pin the contract. The MCP path keeps a defensive ``str(uuid_mod.UUID(str(thread_id)))`` normalisation because FastMCP only validates at the protocol layer — direct callers (tests, in-process use) still pass raw strings. Also log on the malformed-thread_id branch so operators can spot misuse, and add a test covering the ``TypeError`` arm.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds thread_id continuation support to both the Jobs API and the MCP
submit_jobtool, with a race-safe FIFO queue for concurrent submissions on the same thread.submit_jobaccepts an optionalthread_id; subsequent submissions on the same thread resume the same chat thread. Job IDs now matchActivity.id(the durable identifier that survivesDBTaskResultpruning).READY/RUNNING) Activity, the new submission lands in a newQUEUEDstatus. Queued siblings are dispatched increated_atorder when the active sibling reaches a terminal state.activity_one_active_per_threadblocks double-active rows at the DB layer; the dispatcher uses an atomic CAS and an in-call loop (replacing signal recursion). A 3-failure cap prevents broker outages from mass-failing the QUEUED backlog.release_orphan_queued_threadsmanagement command recovers rare TOCTOU losses.Test plan
make testpasses (unit + the new constraint, dispatcher race, services, and MCP/Jobs API tests)thread_idfrom the API/MCP — second isQUEUED, dispatches on first's terminal transitionrelease_orphan_queued_threadsagainst a synthetic orphan and confirm it releases