Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec92069
fix queue item ordering across events and snapshots
JPPhoto Apr 10, 2026
2b6010a
clean up queue item status sequencing
JPPhoto Apr 10, 2026
6f99424
trim white-box queue sequencing tests
JPPhoto Apr 10, 2026
a510262
format sqlite migrator tests
JPPhoto Apr 10, 2026
9134a38
Merge branch 'main' into queue-item-status-sequence
JPPhoto Apr 13, 2026
aa5562c
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 14, 2026
1b00094
Merge branch 'main' into queue-item-status-sequence
JPPhoto Apr 14, 2026
d3eb502
Merge branch 'main' into queue-item-status-sequence
JPPhoto Apr 14, 2026
54ac865
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 14, 2026
88e7db6
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 14, 2026
887128b
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 17, 2026
7cbf0e5
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 17, 2026
69bb918
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 18, 2026
c94a834
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 18, 2026
393f4fb
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 19, 2026
e85eea5
Merge branch 'main' into queue-item-status-sequence
Pfannkuchensack Apr 19, 2026
66f329a
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 19, 2026
f6a40cb
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 20, 2026
69175da
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 20, 2026
ad0cc78
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 20, 2026
1e7ff59
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 20, 2026
72e8c18
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 20, 2026
fb9050c
Merge branch 'main' into queue-item-status-sequence
Pfannkuchensack Apr 21, 2026
ded8c16
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 21, 2026
f260b5d
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 21, 2026
57d520c
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 21, 2026
cdb51ab
Merge remote-tracking branch 'origin/main' into codex-tmp/update-bran…
JPPhoto Apr 22, 2026
da993bf
clarify queue status sequencing intent
JPPhoto Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions invokeai/app/services/events/events_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ class QueueItemStatusChangedEvent(QueueItemEventBase):
__event_name__ = "queue_item_status_changed"

status: QUEUE_ITEM_STATUS = Field(description="The new status of the queue item")
status_sequence: int | None = Field(
default=None,
description="A monotonically increasing version for this queue item's visible status lifecycle",
)
error_type: Optional[str] = Field(default=None, description="The error type, if any")
error_message: Optional[str] = Field(default=None, description="The error message, if any")
error_traceback: Optional[str] = Field(default=None, description="The error traceback, if any")
Expand All @@ -256,6 +260,7 @@ def build(
user_id=queue_item.user_id,
session_id=queue_item.session_id,
status=queue_item.status,
status_sequence=queue_item.status_sequence,
error_type=queue_item.error_type,
error_message=queue_item.error_message,
error_traceback=queue_item.error_traceback,
Expand Down
5 changes: 5 additions & 0 deletions invokeai/app/services/session_queue/session_queue_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ class SessionQueueItem(BaseModel):

item_id: int = Field(description="The identifier of the session queue item")
status: QUEUE_ITEM_STATUS = Field(default="pending", description="The status of this queue item")
status_sequence: int | None = Field(
default=None,
# Fallback for rows serialized before migration_28 added the DB-level default of 0.
description="A monotonically increasing version for this queue item's visible status lifecycle",
)
priority: int = Field(default=0, description="The priority of this queue item")
batch_id: str = Field(description="The ID of the batch associated with this queue item")
origin: str | None = Field(
Expand Down
17 changes: 11 additions & 6 deletions invokeai/app/services/session_queue/session_queue_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def _set_in_progress_to_canceled(self) -> None:
cursor.execute(
"""--sql
UPDATE session_queue
SET status = 'canceled'
SET status = 'canceled',
status_sequence = COALESCE(status_sequence, 0) + 1
WHERE status = 'in_progress';
"""
)
Expand Down Expand Up @@ -307,7 +308,7 @@ def _set_queue_item_status(
cursor.execute(
"""--sql
UPDATE session_queue
SET status = ?, error_type = ?, error_message = ?, error_traceback = ?
SET status = ?, status_sequence = COALESCE(status_sequence, 0) + 1, error_type = ?, error_message = ?, error_traceback = ?
WHERE item_id = ?
""",
(status, error_type, error_message, error_traceback, item_id),
Expand Down Expand Up @@ -489,7 +490,8 @@ def cancel_by_batch_ids(
cursor.execute(
f"""--sql
UPDATE session_queue
SET status = 'canceled'
SET status = 'canceled',
status_sequence = COALESCE(status_sequence, 0) + 1
{where};
""",
tuple(params),
Expand Down Expand Up @@ -537,7 +539,8 @@ def cancel_by_destination(
cursor.execute(
f"""--sql
UPDATE session_queue
SET status = 'canceled'
SET status = 'canceled',
status_sequence = COALESCE(status_sequence, 0) + 1
{where};
""",
tuple(params),
Expand Down Expand Up @@ -649,7 +652,8 @@ def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
cursor.execute(
f"""--sql
UPDATE session_queue
SET status = 'canceled'
SET status = 'canceled',
status_sequence = COALESCE(status_sequence, 0) + 1
{where};
""",
tuple(params),
Expand Down Expand Up @@ -685,7 +689,8 @@ def cancel_all_except_current(self, queue_id: str, user_id: Optional[str] = None
cursor.execute(
f"""--sql
UPDATE session_queue
SET status = 'canceled'
SET status = 'canceled',
status_sequence = COALESCE(status_sequence, 0) + 1
{where};
""",
tuple(params),
Expand Down
2 changes: 2 additions & 0 deletions invokeai/app/services/shared/sqlite/sqlite_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_27 import build_migration_27
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_28 import build_migration_28
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_29 import build_migration_29
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_30 import build_migration_30
from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_impl import SqliteMigrator


Expand Down Expand Up @@ -81,6 +82,7 @@ def init_db(config: InvokeAIAppConfig, logger: Logger, image_files: ImageFileSto
migrator.register_migration(build_migration_27())
migrator.register_migration(build_migration_28())
migrator.register_migration(build_migration_29())
migrator.register_migration(build_migration_30())
migrator.run_migrations()

return db
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Migration 30: Add per-item queue status sequencing.

This migration adds a `status_sequence` column to `session_queue` so queue item
status updates can be ordered across asynchronous event and snapshot channels.
"""

import sqlite3

from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_common import Migration


class Migration30Callback:
"""Add a per-queue-item status sequence for cross-channel ordering."""

def __call__(self, cursor: sqlite3.Cursor) -> None:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='session_queue';")
if cursor.fetchone() is None:
return

cursor.execute("PRAGMA table_info(session_queue);")
columns = [row[1] for row in cursor.fetchall()]

if "status_sequence" not in columns:
cursor.execute("ALTER TABLE session_queue ADD COLUMN status_sequence INTEGER DEFAULT 0;")
cursor.execute("UPDATE session_queue SET status_sequence = 0 WHERE status_sequence IS NULL;")


def build_migration_30() -> Migration:
return Migration(
from_version=29,
to_version=30,
callback=Migration30Callback(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,144 @@ describe('StagingAreaApi', () => {
expect(progressData[1]?.imageDTOs[1]).toBe(imageDTO2);
});

it('should ignore a stale pending snapshot after an item has already completed', async () => {
const imageDTO = createMockImageDTO({ image_name: 'output1.png' });
mockApp._setImageDTO('output1.png', imageDTO);

const completedItems = [
createMockQueueItem({
item_id: 1,
status: 'completed',
session: {
id: sessionId,
source_prepared_mapping: {
'canvas_output:abc': ['prepared-1'],
},
results: {
'prepared-1': { image: { image_name: 'output1.png' } },
},
},
}),
];

const stalePendingItems = [
createMockQueueItem({
item_id: 1,
status: 'pending',
session: {
id: sessionId,
source_prepared_mapping: {},
results: {},
},
}),
];

await api.onItemsChangedEvent(completedItems);
await api.onItemsChangedEvent(stalePendingItems);

expect(api.$items.get()).toHaveLength(1);
expect(api.$items.get()[0]?.status).toBe('completed');
expect(api.$isPending.get()).toBe(false);
expect(api.$progressData.get()[1]?.imageDTOs).toEqual([imageDTO]);
});

it('should ignore a stale pending snapshot after a completed status event arrives first', async () => {
api.onQueueItemStatusChangedEvent(
createMockQueueItemStatusChangedEvent({
item_id: 1,
destination: sessionId,
status: 'completed',
})
);

await api.onItemsChangedEvent([
createMockQueueItem({
item_id: 1,
status: 'pending',
session: {
id: sessionId,
source_prepared_mapping: {},
results: {},
},
}),
]);

expect(api.$items.get()).toEqual([]);
expect(api.$isPending.get()).toBe(false);
});

it('should ignore a stale in_progress snapshot after a completed status event arrives first', async () => {
api.onQueueItemStatusChangedEvent(
createMockQueueItemStatusChangedEvent({
item_id: 1,
destination: sessionId,
status: 'completed',
})
);

await api.onItemsChangedEvent([
createMockQueueItem({
item_id: 1,
status: 'in_progress',
session: {
id: sessionId,
source_prepared_mapping: {},
results: {},
},
}),
]);

expect(api.$items.get()).toEqual([]);
expect(api.$isPending.get()).toBe(false);
});

it('should prefer the higher status_sequence when terminal snapshots disagree', async () => {
const completedItem = createMockQueueItem({
item_id: 1,
status: 'completed',
status_sequence: 2,
});

const failedItem = createMockQueueItem({
item_id: 1,
status: 'failed',
status_sequence: 3,
});

await api.onItemsChangedEvent([completedItem]);
await api.onItemsChangedEvent([failedItem]);

expect(api.$items.get()).toHaveLength(1);
expect(api.$items.get()[0]?.status).toBe('failed');
});

it('should ignore a lower status_sequence on stale in_progress snapshots', async () => {
const completedEvent = createMockQueueItemStatusChangedEvent({
item_id: 1,
destination: sessionId,
status: 'completed',
status_sequence: 3,
});

api.onQueueItemStatusChangedEvent(completedEvent);

const staleInProgressItem = createMockQueueItem({
item_id: 1,
status: 'in_progress',
status_sequence: 2,
session: {
id: sessionId,
source_prepared_mapping: {},
results: {},
},
});

await api.onItemsChangedEvent([staleInProgressItem]);

expect(api.$items.get()).toEqual([]);
expect(api.$isPending.get()).toBe(false);
});

it('should load all images from multiple canvas_output nodes', async () => {
const imageDTO1 = createMockImageDTO({ image_name: 'output1.png' });
const imageDTO2 = createMockImageDTO({ image_name: 'output2.png' });
Expand Down
Loading
Loading