diff --git a/invokeai/app/services/events/events_common.py b/invokeai/app/services/events/events_common.py index 998fe4f5309..0c530f9a2f7 100644 --- a/invokeai/app/services/events/events_common.py +++ b/invokeai/app/services/events/events_common.py @@ -232,6 +232,10 @@ class QueueItemStatusChangedEvent(QueueItemEventBase): __event_name__ = "queue_item_status_changed" status: QUEUE_ITEM_STATUS = Field(description="The new status of the queue item") + status_sequence: int | None = Field( + default=None, + description="A monotonically increasing version for this queue item's visible status lifecycle", + ) error_type: Optional[str] = Field(default=None, description="The error type, if any") error_message: Optional[str] = Field(default=None, description="The error message, if any") error_traceback: Optional[str] = Field(default=None, description="The error traceback, if any") @@ -256,6 +260,7 @@ def build( user_id=queue_item.user_id, session_id=queue_item.session_id, status=queue_item.status, + status_sequence=queue_item.status_sequence, error_type=queue_item.error_type, error_message=queue_item.error_message, error_traceback=queue_item.error_traceback, diff --git a/invokeai/app/services/session_queue/session_queue_common.py b/invokeai/app/services/session_queue/session_queue_common.py index 09820fe6217..d87221fbbae 100644 --- a/invokeai/app/services/session_queue/session_queue_common.py +++ b/invokeai/app/services/session_queue/session_queue_common.py @@ -219,6 +219,11 @@ class SessionQueueItem(BaseModel): item_id: int = Field(description="The identifier of the session queue item") status: QUEUE_ITEM_STATUS = Field(default="pending", description="The status of this queue item") + status_sequence: int | None = Field( + default=None, + # Fallback for rows serialized before migration_28 added the DB-level default of 0. + description="A monotonically increasing version for this queue item's visible status lifecycle", + ) priority: int = Field(default=0, description="The priority of this queue item") batch_id: str = Field(description="The ID of the batch associated with this queue item") origin: str | None = Field( diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index 172dc08d559..95fb16fcbed 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -72,7 +72,8 @@ def _set_in_progress_to_canceled(self) -> None: cursor.execute( """--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = COALESCE(status_sequence, 0) + 1 WHERE status = 'in_progress'; """ ) @@ -307,7 +308,7 @@ def _set_queue_item_status( cursor.execute( """--sql UPDATE session_queue - SET status = ?, error_type = ?, error_message = ?, error_traceback = ? + SET status = ?, status_sequence = COALESCE(status_sequence, 0) + 1, error_type = ?, error_message = ?, error_traceback = ? WHERE item_id = ? """, (status, error_type, error_message, error_traceback, item_id), @@ -489,7 +490,8 @@ def cancel_by_batch_ids( cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), @@ -537,7 +539,8 @@ def cancel_by_destination( cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), @@ -649,7 +652,8 @@ def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult: cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), @@ -685,7 +689,8 @@ def cancel_all_except_current(self, queue_id: str, user_id: Optional[str] = None cursor.execute( f"""--sql UPDATE session_queue - SET status = 'canceled' + SET status = 'canceled', + status_sequence = COALESCE(status_sequence, 0) + 1 {where}; """, tuple(params), diff --git a/invokeai/app/services/shared/sqlite/sqlite_util.py b/invokeai/app/services/shared/sqlite/sqlite_util.py index fb8ca9fca38..19e3b897202 100644 --- a/invokeai/app/services/shared/sqlite/sqlite_util.py +++ b/invokeai/app/services/shared/sqlite/sqlite_util.py @@ -32,6 +32,7 @@ from invokeai.app.services.shared.sqlite_migrator.migrations.migration_27 import build_migration_27 from invokeai.app.services.shared.sqlite_migrator.migrations.migration_28 import build_migration_28 from invokeai.app.services.shared.sqlite_migrator.migrations.migration_29 import build_migration_29 +from invokeai.app.services.shared.sqlite_migrator.migrations.migration_30 import build_migration_30 from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_impl import SqliteMigrator @@ -81,6 +82,7 @@ def init_db(config: InvokeAIAppConfig, logger: Logger, image_files: ImageFileSto migrator.register_migration(build_migration_27()) migrator.register_migration(build_migration_28()) migrator.register_migration(build_migration_29()) + migrator.register_migration(build_migration_30()) migrator.run_migrations() return db diff --git a/invokeai/app/services/shared/sqlite_migrator/migrations/migration_30.py b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_30.py new file mode 100644 index 00000000000..d60270bfa1c --- /dev/null +++ b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_30.py @@ -0,0 +1,33 @@ +"""Migration 30: Add per-item queue status sequencing. + +This migration adds a `status_sequence` column to `session_queue` so queue item +status updates can be ordered across asynchronous event and snapshot channels. +""" + +import sqlite3 + +from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_common import Migration + + +class Migration30Callback: + """Add a per-queue-item status sequence for cross-channel ordering.""" + + def __call__(self, cursor: sqlite3.Cursor) -> None: + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='session_queue';") + if cursor.fetchone() is None: + return + + cursor.execute("PRAGMA table_info(session_queue);") + columns = [row[1] for row in cursor.fetchall()] + + if "status_sequence" not in columns: + cursor.execute("ALTER TABLE session_queue ADD COLUMN status_sequence INTEGER DEFAULT 0;") + cursor.execute("UPDATE session_queue SET status_sequence = 0 WHERE status_sequence IS NULL;") + + +def build_migration_30() -> Migration: + return Migration( + from_version=29, + to_version=30, + callback=Migration30Callback(), + ) diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts index 1f9687e35aa..3e5a1272e09 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.test.ts @@ -1037,6 +1037,144 @@ describe('StagingAreaApi', () => { expect(progressData[1]?.imageDTOs[1]).toBe(imageDTO2); }); + it('should ignore a stale pending snapshot after an item has already completed', async () => { + const imageDTO = createMockImageDTO({ image_name: 'output1.png' }); + mockApp._setImageDTO('output1.png', imageDTO); + + const completedItems = [ + createMockQueueItem({ + item_id: 1, + status: 'completed', + session: { + id: sessionId, + source_prepared_mapping: { + 'canvas_output:abc': ['prepared-1'], + }, + results: { + 'prepared-1': { image: { image_name: 'output1.png' } }, + }, + }, + }), + ]; + + const stalePendingItems = [ + createMockQueueItem({ + item_id: 1, + status: 'pending', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }), + ]; + + await api.onItemsChangedEvent(completedItems); + await api.onItemsChangedEvent(stalePendingItems); + + expect(api.$items.get()).toHaveLength(1); + expect(api.$items.get()[0]?.status).toBe('completed'); + expect(api.$isPending.get()).toBe(false); + expect(api.$progressData.get()[1]?.imageDTOs).toEqual([imageDTO]); + }); + + it('should ignore a stale pending snapshot after a completed status event arrives first', async () => { + api.onQueueItemStatusChangedEvent( + createMockQueueItemStatusChangedEvent({ + item_id: 1, + destination: sessionId, + status: 'completed', + }) + ); + + await api.onItemsChangedEvent([ + createMockQueueItem({ + item_id: 1, + status: 'pending', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }), + ]); + + expect(api.$items.get()).toEqual([]); + expect(api.$isPending.get()).toBe(false); + }); + + it('should ignore a stale in_progress snapshot after a completed status event arrives first', async () => { + api.onQueueItemStatusChangedEvent( + createMockQueueItemStatusChangedEvent({ + item_id: 1, + destination: sessionId, + status: 'completed', + }) + ); + + await api.onItemsChangedEvent([ + createMockQueueItem({ + item_id: 1, + status: 'in_progress', + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }), + ]); + + expect(api.$items.get()).toEqual([]); + expect(api.$isPending.get()).toBe(false); + }); + + it('should prefer the higher status_sequence when terminal snapshots disagree', async () => { + const completedItem = createMockQueueItem({ + item_id: 1, + status: 'completed', + status_sequence: 2, + }); + + const failedItem = createMockQueueItem({ + item_id: 1, + status: 'failed', + status_sequence: 3, + }); + + await api.onItemsChangedEvent([completedItem]); + await api.onItemsChangedEvent([failedItem]); + + expect(api.$items.get()).toHaveLength(1); + expect(api.$items.get()[0]?.status).toBe('failed'); + }); + + it('should ignore a lower status_sequence on stale in_progress snapshots', async () => { + const completedEvent = createMockQueueItemStatusChangedEvent({ + item_id: 1, + destination: sessionId, + status: 'completed', + status_sequence: 3, + }); + + api.onQueueItemStatusChangedEvent(completedEvent); + + const staleInProgressItem = createMockQueueItem({ + item_id: 1, + status: 'in_progress', + status_sequence: 2, + session: { + id: sessionId, + source_prepared_mapping: {}, + results: {}, + }, + }); + + await api.onItemsChangedEvent([staleInProgressItem]); + + expect(api.$items.get()).toEqual([]); + expect(api.$isPending.get()).toBe(false); + }); + it('should load all images from multiple canvas_output nodes', async () => { const imageDTO1 = createMockImageDTO({ image_name: 'output1.png' }); const imageDTO2 = createMockImageDTO({ image_name: 'output2.png' }); diff --git a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts index 834a3c475c4..6c16a8fdf22 100644 --- a/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts +++ b/invokeai/frontend/web/src/features/controlLayers/components/StagingArea/state.ts @@ -64,6 +64,25 @@ export const getInitialProgressData = (itemId: number): ProgressData => ({ }); type ProgressDataMap = Record; +const TERMINAL_QUEUE_ITEM_STATUS_RANK = 2; + +const getQueueItemStatusRank = (status: S['SessionQueueItem']['status']): number => { + switch (status) { + case 'pending': + return 0; + case 'in_progress': + return 1; + case 'completed': + case 'failed': + case 'canceled': + return TERMINAL_QUEUE_ITEM_STATUS_RANK; + } +}; + +const getStatusSequence = (item: { status_sequence?: number | null }): number | undefined => { + return item.status_sequence ?? undefined; +}; + /** * API for managing the Canvas Staging Area - a view of the image generation queue. * Provides reactive state management for pending, in-progress, and completed images. @@ -82,6 +101,72 @@ export class StagingAreaApi { /** Generation counter to prevent stale async writes in onItemsChangedEvent */ _itemsEventGeneration = 0; + /** + * Highest lifecycle status observed for each queue item. + * Used to ignore stale query snapshots that regress terminal items back to pending/in_progress. + */ + _seenItemStatusRanks = new Map(); + _seenItemStatusSequences = new Map(); + + _recordSeenItemOrdering = ( + itemId: number, + status: S['SessionQueueItem']['status'], + statusSequence?: number + ): void => { + if (statusSequence !== undefined) { + const previousSequence = this._seenItemStatusSequences.get(itemId) ?? -1; + if (statusSequence > previousSequence) { + this._seenItemStatusSequences.set(itemId, statusSequence); + } + } + + const nextRank = getQueueItemStatusRank(status); + const previousRank = this._seenItemStatusRanks.get(itemId) ?? -1; + if (nextRank > previousRank) { + this._seenItemStatusRanks.set(itemId, nextRank); + } + }; + + _shouldAcceptQueueItem = (item: S['SessionQueueItem']): boolean => { + const statusSequence = getStatusSequence(item); + const previousSequence = this._seenItemStatusSequences.get(item.item_id); + const previousRank = this._seenItemStatusRanks.get(item.item_id); + const nextRank = getQueueItemStatusRank(item.status); + + if (statusSequence !== undefined) { + if (previousSequence === undefined) { + return true; + } + if (statusSequence > previousSequence) { + return true; + } + if (statusSequence < previousSequence) { + return false; + } + // Equal-sequence updates should be rare; if they happen, let the later terminal-vs-terminal arrival win. + return previousRank === undefined || nextRank >= previousRank; + } + + return previousRank === undefined || nextRank >= previousRank; + }; + + _pruneSeenItemOrdering = (items: S['SessionQueueItem'][]): void => { + const itemIds = new Set(items.map(({ item_id }) => item_id)); + + // Evict vanished items so long-lived sessions do not grow these maps without bound. + for (const itemId of this._seenItemStatusRanks.keys()) { + if (!itemIds.has(itemId)) { + this._seenItemStatusRanks.delete(itemId); + } + } + + for (const itemId of this._seenItemStatusSequences.keys()) { + if (!itemIds.has(itemId)) { + this._seenItemStatusSequences.delete(itemId); + } + } + }; + /** Item ID of the last started item. Used for auto-switch on start. */ $lastStartedItemId = atom(null); @@ -348,6 +433,7 @@ export class StagingAreaApi { if (data.destination !== this._sessionId) { return; } + this._recordSeenItemOrdering(data.item_id, data.status, getStatusSequence(data)); if (data.status === 'completed') { /** * There is an unpleasant bit of indirection here. When an item is completed, and auto-switch is set to @@ -375,31 +461,47 @@ export class StagingAreaApi { const generation = ++this._itemsEventGeneration; const oldItems = this.$items.get(); + const oldItemsById = new Map(oldItems.map((item) => [item.item_id, item])); + let didSubstituteStaleItem = false; + const nextItems = items.flatMap((item) => { + if (this._shouldAcceptQueueItem(item)) { + return [item]; + } + didSubstituteStaleItem = true; + const previousItem = oldItemsById.get(item.item_id); + return previousItem ? [previousItem] : []; + }); + const normalizedItems = didSubstituteStaleItem ? nextItems : items; + + for (const item of normalizedItems) { + this._recordSeenItemOrdering(item.item_id, item.status, getStatusSequence(item)); + } + this._pruneSeenItemOrdering(normalizedItems); - if (items === oldItems) { + if (normalizedItems === oldItems) { return; } - if (items.length === 0) { + if (normalizedItems.length === 0) { // If there are no items, cannot have a selected item. this.$selectedItemId.set(null); this.$selectedImageIndex.set(0); - } else if (this.$selectedItemId.get() === null && items.length > 0) { + } else if (this.$selectedItemId.get() === null && normalizedItems.length > 0) { // If there is no selected item but there are items, select the first one. - this.$selectedItemId.set(items[0]?.item_id ?? null); + this.$selectedItemId.set(normalizedItems[0]?.item_id ?? null); this.$selectedImageIndex.set(0); } const progressData = this.$progressData.get(); for (const [id, datum] of objectEntries(progressData)) { - if (!datum || !items.find(({ item_id }) => item_id === datum.itemId)) { + if (!datum || !normalizedItems.find(({ item_id }) => item_id === datum.itemId)) { this.$progressData.setKey(id, undefined); continue; } } - for (const item of items) { + for (const item of normalizedItems) { const datum = progressData[item.item_id]; if (item.status === 'canceled' || item.status === 'failed') { @@ -467,30 +569,30 @@ export class StagingAreaApi { } const selectedItemId = this.$selectedItemId.get(); - if (selectedItemId !== null && !items.find(({ item_id }) => item_id === selectedItemId)) { + if (selectedItemId !== null && !normalizedItems.find(({ item_id }) => item_id === selectedItemId)) { // If the selected item no longer exists, select the next best item. // Prefer the next item in the list - must check oldItems to determine this const nextItemIndex = oldItems.findIndex(({ item_id }) => item_id === selectedItemId); if (nextItemIndex !== -1) { - const nextItem = items[nextItemIndex] ?? items[nextItemIndex - 1]; + const nextItem = normalizedItems[nextItemIndex] ?? normalizedItems[nextItemIndex - 1]; if (nextItem) { this.$selectedItemId.set(nextItem.item_id); this.$selectedImageIndex.set(0); } } else { // Next, if there is an in-progress item, select that. - const inProgressItem = items.find(({ status }) => status === 'in_progress'); + const inProgressItem = normalizedItems.find(({ status }) => status === 'in_progress'); if (inProgressItem) { this.$selectedItemId.set(inProgressItem.item_id); this.$selectedImageIndex.set(0); } // Finally just select the first item. - this.$selectedItemId.set(items[0]?.item_id ?? null); + this.$selectedItemId.set(normalizedItems[0]?.item_id ?? null); this.$selectedImageIndex.set(0); } } - this.$items.set(items); + this.$items.set(normalizedItems); }; onImageLoaded = (itemId: number) => { @@ -521,6 +623,8 @@ export class StagingAreaApi { /** Cleans up all state and unsubscribes from all events. */ cleanup = () => { this._itemsEventGeneration++; + this._seenItemStatusRanks.clear(); + this._seenItemStatusSequences.clear(); this.$lastStartedItemId.set(null); this.$lastCompletedItemId.set(null); this.$items.set([]); diff --git a/invokeai/frontend/web/src/services/api/schema.ts b/invokeai/frontend/web/src/services/api/schema.ts index e1dd2ad361c..4b8e4da95a5 100644 --- a/invokeai/frontend/web/src/services/api/schema.ts +++ b/invokeai/frontend/web/src/services/api/schema.ts @@ -23967,6 +23967,12 @@ export type components = { * @enum {string} */ status: "pending" | "in_progress" | "completed" | "failed" | "canceled"; + /** + * Status Sequence + * @description A monotonically increasing version for this queue item's visible status lifecycle + * @default null + */ + status_sequence: number | null; /** * Error Type * @description The error type, if any @@ -26647,6 +26653,11 @@ export type components = { * @enum {string} */ status: "pending" | "in_progress" | "completed" | "failed" | "canceled"; + /** + * Status Sequence + * @description A monotonically increasing version for this queue item's visible status lifecycle + */ + status_sequence?: number | null; /** * Priority * @description The priority of this queue item diff --git a/invokeai/frontend/web/src/services/events/setEventListeners.tsx b/invokeai/frontend/web/src/services/events/setEventListeners.tsx index fb08fc08dd1..6357db6472f 100644 --- a/invokeai/frontend/web/src/services/events/setEventListeners.tsx +++ b/invokeai/frontend/web/src/services/events/setEventListeners.tsx @@ -392,6 +392,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis const { item_id, status, + status_sequence, batch_status, error_type, error_message, @@ -408,6 +409,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis dispatch( queueApi.util.updateQueryData('getQueueItem', item_id, (draft) => { draft.status = status; + draft.status_sequence = status_sequence; draft.started_at = started_at; draft.updated_at = updated_at; draft.completed_at = completed_at; @@ -425,6 +427,7 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis const item = draft.find((i) => i.item_id === item_id); if (item) { item.status = status; + item.status_sequence = status_sequence; item.started_at = started_at; item.updated_at = updated_at; item.completed_at = completed_at; diff --git a/tests/app/services/session_queue/test_session_queue_status_sequence.py b/tests/app/services/session_queue/test_session_queue_status_sequence.py new file mode 100644 index 00000000000..0dcf8fd0f71 --- /dev/null +++ b/tests/app/services/session_queue/test_session_queue_status_sequence.py @@ -0,0 +1,103 @@ +import uuid + +import pytest + +from invokeai.app.services.events.events_common import QueueItemStatusChangedEvent +from invokeai.app.services.invoker import Invoker +from invokeai.app.services.session_queue.session_queue_sqlite import SqliteSessionQueue +from invokeai.app.services.shared.graph import Graph, GraphExecutionState +from tests.test_nodes import PromptTestInvocation, TestEventService + + +@pytest.fixture +def session_queue(mock_invoker: Invoker) -> SqliteSessionQueue: + db = mock_invoker.services.board_records._db + queue = SqliteSessionQueue(db=db) + queue.start(mock_invoker) + return queue + + +def _insert_queue_item( + session_queue: SqliteSessionQueue, + queue_id: str = "default", + destination: str | None = None, +) -> int: + graph = Graph() + graph.add_node(PromptTestInvocation(id="prompt", prompt="test")) + session = GraphExecutionState(graph=graph) + session_json = session.model_dump_json(warnings=False, exclude_none=True) + batch_id = str(uuid.uuid4()) + with session_queue._db.transaction() as cursor: + cursor.execute( + """--sql + INSERT INTO session_queue ( + queue_id, + session, + session_id, + batch_id, + field_values, + priority, + workflow, + origin, + destination, + retried_from_item_id, + user_id + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (queue_id, session_json, session.id, batch_id, None, 0, None, None, destination, None, "system"), + ) + return cursor.lastrowid + + +def test_status_sequence_increments_for_queue_item_lifecycle( + session_queue: SqliteSessionQueue, mock_invoker: Invoker +) -> None: + item_id = _insert_queue_item(session_queue) + + pending_item = session_queue.get_queue_item(item_id) + assert pending_item.status == "pending" + assert pending_item.status_sequence == 0 + + in_progress_item = session_queue.dequeue() + assert in_progress_item is not None + assert in_progress_item.item_id == item_id + assert in_progress_item.status == "in_progress" + assert in_progress_item.status_sequence == 1 + + completed_item = session_queue.complete_queue_item(item_id) + assert completed_item.status == "completed" + assert completed_item.status_sequence == 2 + + event_bus: TestEventService = mock_invoker.services.events + status_events = [event for event in event_bus.events if isinstance(event, QueueItemStatusChangedEvent)] + + assert len(status_events) == 2 + assert [event.status for event in status_events] == ["in_progress", "completed"] + assert [event.status_sequence for event in status_events] == [1, 2] + + +def test_status_sequence_increments_for_bulk_cancel_paths(session_queue: SqliteSessionQueue) -> None: + first_item_id = _insert_queue_item(session_queue) + second_item_id = _insert_queue_item(session_queue) + + result = session_queue.cancel_all_except_current("default") + + assert result.canceled == 2 + assert session_queue.get_queue_item(first_item_id).status == "canceled" + assert session_queue.get_queue_item(first_item_id).status_sequence == 1 + assert session_queue.get_queue_item(second_item_id).status == "canceled" + assert session_queue.get_queue_item(second_item_id).status_sequence == 1 + + +def test_status_sequence_continues_after_dequeue_then_cancel(session_queue: SqliteSessionQueue) -> None: + item_id = _insert_queue_item(session_queue) + + in_progress_item = session_queue.dequeue() + assert in_progress_item is not None + assert in_progress_item.item_id == item_id + assert in_progress_item.status_sequence == 1 + + canceled_item = session_queue.cancel_queue_item(item_id) + assert canceled_item.status == "canceled" + assert canceled_item.status_sequence == 2