Skip to content

Commit 3a010ad

Browse files
Cl 1935 scheduling workflows which run on a specific time (#217)
* feat(messages): add support for injecting cancelled tool results in message creation Enhanced the `create_message` method in `MessageService` to include an optional parameter `inject_cancelled_tool_results`. When set to `True`, it creates cancelled tool results if the parent message contains pending tool_use blocks. This change improves the handling of tool execution interruptions by providing relevant feedback in the message flow. * fix(messages): update tool result content type in MessageService Changed the type of `tool_result_content` in the `create_message` method from `list[ToolResultBlockParam]` to `list[ContentBlockParam]` to ensure mypy compatability. * feat(scheduled_jobs): implement scheduled job management with APScheduler Added functionality for managing scheduled jobs, including creation, listing, and cancellation. Introduced `ScheduledJobService` for handling job operations and integrated it into the FastAPI application. Updated the application to start and shutdown the scheduler during lifespan events. Added necessary models and dependencies for scheduled job data handling. * refactor(scheduled_jobs): remove unused query parameter from list_scheduled_jobs Eliminated the unused `query` parameter from the `list_scheduled_jobs` function in the router. Cleaned up imports in `scheduler.py` by removing type ignore comments for better clarity. Updated the `list_` method in `ScheduledJobService` to reflect the removal of the `query` parameter. * chore: update .gitignore and enhance SQLite configuration Added entries for SQLite shared memory and write-ahead logging files to `.gitignore`. Enhanced SQLite connection settings in `engine.py` by enabling WAL mode for concurrent access and setting a busy timeout to prevent database locking issues. Removed unused return type documentation in `executor.py` for clarity. * refactor(scheduled_jobs): update ScheduledJob creation to use params Refactored the `create` method in `ScheduledJob` to accept `params` of type `ScheduledJobCreate` instead of individual parameters. Updated the `create_scheduled_job` function in the router to pass `params` directly to the service. Cleaned up imports and removed unnecessary data construction in the router for improved clarity. * feat(scheduled_jobs): add name field to MessageRerunnerData and update ScheduledJob creation Introduced a new `name` field in the `MessageRerunnerDataCreate` model to enhance the data structure. Updated the `ScheduledJob` creation process to include the `name` parameter from `params.data`, ensuring that the new field is properly utilized during job creation. * feat(scheduled_jobs): enhance job execution with ASKUI token management Updated the `execute_job` function to support the new `askui_token` field in `ScheduledJobData`. Implemented environment variable management for `ASKUI_TOKEN` and `AUTHORIZATION` headers to accommodate different authentication methods. Refactored the job execution logic to ensure proper handling of new job types and restore previous environment states after execution. Enhanced the `ScheduledJobCreate` model to include the `askui_token` parameter for authenticated API calls. * feat(scheduled_jobs): add scheduler database support and improve SQLite configuration, get around the write lock when scheduler and Runner wants to write to the same DB Introduced a new `scheduler_url` field in `DbSettings` for APScheduler job storage, allowing separate database management. Updated the `engine.py` to utilize the new scheduler database and removed unused SQLite connection settings. Enhanced the `scheduler.py` to create a dedicated engine for the scheduler database and ensure proper SQLite configuration for foreign key support. Updated logging to use a private logger for better encapsulation. * refactor(scheduled_jobs): update job execution logic and improve error handling Refactored the `execute_job` function to utilize a new `_execute_message_rerunner_job` helper function for better clarity and separation of concerns. Updated the handling of `MessageRerunnerData` and improved error messages for better debugging. Removed unused imports and ensured proper type hints throughout the code. Enhanced the `ScheduledJob` model to streamline the extraction of `next_fire_time` and improved the handling of workspace validation in the `ScheduledJobService` class. * refactor: remove functools.cache decorator from create_api_client function for unauthorized issues after token expiry * refactor(scheduled_jobs): enhance job execution and token management Updated the `execute_job` function to improve the handling of `askui_token` by utilizing `SecretStr` for better security. Refactored the logic to manage the `ASKUI__AUTHORIZATION` environment variable, ensuring proper encoding of the token. Enhanced the extraction of `next_fire_time` in the `ScheduledJob` model for clearer error handling. Updated the `ScheduledJobService` to pass the decoded token securely during job execution. This refactor aims to streamline job execution and improve overall code clarity. * refactor(scheduled_jobs): update execute_job to return ScheduledJobExecutionResult Modified the `execute_job` function to return a `ScheduledJobExecutionResult`, encapsulating job data and optional error messages. Enhanced error handling during job execution, ensuring that the previous `ASKUI__AUTHORIZATION` environment variable is restored after execution. Improved documentation for the function's parameters and return type to clarify its behavior and expected output. * refactor(settings, engine): optimize SQLite configuration and update database settings Refactored the `DbSettings` class to consolidate the database URL for SQLAlchemy connections, removing the separate `scheduler_url` field. Updated the database engine configuration in `engine.py` to utilize optimized SQLite pragmas for better performance and concurrency. Adjusted the `scheduler.py` to use the shared engine, ensuring that APScheduler operates with the same database settings. Enhanced the `.gitignore` to include SQLite shared memory and write-ahead logging files for improved project cleanliness. * refactor(scheduled_jobs): sort scheduled jobs by next fire time Updated the `ScheduledJobService` to return a sorted list of scheduled jobs based on their `next_fire_time`. This change enhances the order of job execution, ensuring that jobs are processed in a timely manner.
1 parent 1cb4cf8 commit 3a010ad

15 files changed

Lines changed: 743 additions & 15 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ reports/
166166
.DS_Store
167167
/chat
168168
/askui_chat.db
169+
/askui_chat.db-shm
170+
/askui_chat.db-wal
169171
.cache/
170172

171173
bom.json

pdm.lock

Lines changed: 33 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = [
3232
"aiofiles>=24.1.0",
3333
"anyio==4.10.0", # We need to pin this version otherwise listing mcp tools using fastmcp within runner fails
3434
"sqlalchemy[mypy]>=2.0.44",
35+
"apscheduler==4.0.0a6",
3536
]
3637
requires-python = ">=3.10,<3.14"
3738
readme = "README.md"

src/askui/chat/api/app.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from askui.chat.api.mcp_servers.utility import mcp as utility_mcp
2323
from askui.chat.api.messages.router import router as messages_router
2424
from askui.chat.api.runs.router import router as runs_router
25+
from askui.chat.api.scheduled_jobs.router import router as scheduled_jobs_router
26+
from askui.chat.api.scheduled_jobs.scheduler import shutdown_scheduler, start_scheduler
2527
from askui.chat.api.threads.router import router as threads_router
2628
from askui.chat.api.workflows.router import router as workflows_router
2729
from askui.chat.migrations.runner import run_migrations
@@ -49,7 +51,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001
4951
session = next(get_session())
5052
mcp_config_service = get_mcp_config_service(session=session, settings=settings)
5153
mcp_config_service.seed()
54+
55+
# Start the scheduler for scheduled jobs
56+
logger.info("Starting scheduled job scheduler...")
57+
await start_scheduler()
58+
5259
yield
60+
61+
# Shutdown scheduler
62+
logger.info("Shutting down scheduled job scheduler...")
63+
await shutdown_scheduler()
64+
5365
logger.info("Disconnecting all MCP clients...")
5466
await get_mcp_client_manager_manager(mcp_config_service).disconnect_all(force=True)
5567

@@ -70,6 +82,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001
7082
v1_router.include_router(mcp_configs_router)
7183
v1_router.include_router(files_router)
7284
v1_router.include_router(workflows_router)
85+
v1_router.include_router(scheduled_jobs_router)
7386
v1_router.include_router(health_router)
7487
app.include_router(v1_router)
7588

src/askui/chat/api/db/engine.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,40 @@
22
from sqlite3 import Connection as SQLite3Connection
33
from typing import Any
44

5-
from sqlalchemy import Engine, create_engine, event
5+
from sqlalchemy import create_engine, event
66

77
from askui.chat.api.dependencies import get_settings
88

9-
logger = logging.getLogger(__name__)
9+
_logger = logging.getLogger(__name__)
1010

11-
settings = get_settings()
12-
connect_args = {"check_same_thread": False}
13-
echo = logger.isEnabledFor(logging.DEBUG)
14-
engine = create_engine(settings.db.url, connect_args=connect_args, echo=echo)
11+
_settings = get_settings()
12+
_connect_args = {"check_same_thread": False}
13+
_echo = _logger.isEnabledFor(logging.DEBUG)
1514

15+
# Create engine with optimized settings
16+
engine = create_engine(
17+
_settings.db.url,
18+
connect_args=_connect_args,
19+
echo=_echo,
20+
)
1621

17-
@event.listens_for(Engine, "connect")
18-
def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001
22+
23+
@event.listens_for(engine, "connect")
24+
def _set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001
25+
"""
26+
Configure SQLite pragmas for optimal web application performance.
27+
28+
Applied on each new connection:
29+
- foreign_keys=ON: Enable foreign key constraint enforcement
30+
- journal_mode=WAL: Write-Ahead Logging for better concurrency (readers don't block writers)
31+
- synchronous=NORMAL: Sync every 1000 pages instead of every write (faster, still durable with WAL)
32+
- busy_timeout=30000: Wait up to 30 seconds for locks instead of failing immediately
33+
"""
1934
cursor = dbapi_conn.cursor()
20-
cursor.execute("PRAGMA foreign_keys=ON")
35+
36+
cursor.execute("PRAGMA foreign_keys = ON")
37+
cursor.execute("PRAGMA journal_mode = WAL")
38+
cursor.execute("PRAGMA synchronous = NORMAL")
39+
cursor.execute("PRAGMA busy_timeout = 30000")
40+
2141
cursor.close()

src/askui/chat/api/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
FileId = Annotated[str, IdField("file")]
1111
MessageId = Annotated[str, IdField("msg")]
1212
RunId = Annotated[str, IdField("run")]
13+
ScheduledJobId = Annotated[str, IdField("schedjob")]
1314
ThreadId = Annotated[str, IdField("thread")]
1415
WorkspaceId = UUID4
1516

src/askui/chat/api/runs/dependencies.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
from fastapi import Depends
2+
from pydantic import UUID4
3+
from sqlalchemy.orm import Session
24

3-
from askui.chat.api.assistants.dependencies import AssistantServiceDep
5+
from askui.chat.api.assistants.dependencies import (
6+
AssistantServiceDep,
7+
get_assistant_service,
8+
)
49
from askui.chat.api.assistants.service import AssistantService
510
from askui.chat.api.db.session import SessionDep
6-
from askui.chat.api.dependencies import SettingsDep
7-
from askui.chat.api.mcp_clients.dependencies import McpClientManagerManagerDep
11+
from askui.chat.api.dependencies import SettingsDep, get_settings
12+
from askui.chat.api.files.dependencies import get_file_service
13+
from askui.chat.api.mcp_clients.dependencies import (
14+
McpClientManagerManagerDep,
15+
get_mcp_client_manager_manager,
16+
)
817
from askui.chat.api.mcp_clients.manager import McpClientManagerManager
18+
from askui.chat.api.mcp_configs.dependencies import get_mcp_config_service
919
from askui.chat.api.messages.chat_history_manager import ChatHistoryManager
10-
from askui.chat.api.messages.dependencies import ChatHistoryManagerDep
20+
from askui.chat.api.messages.dependencies import (
21+
ChatHistoryManagerDep,
22+
get_chat_history_manager,
23+
get_message_service,
24+
get_message_translator,
25+
get_truncation_strategy_factory,
26+
)
1127
from askui.chat.api.runs.models import RunListQuery
1228
from askui.chat.api.settings import Settings
1329

@@ -23,6 +39,12 @@ def get_runs_service(
2339
mcp_client_manager_manager: McpClientManagerManager = McpClientManagerManagerDep,
2440
settings: Settings = SettingsDep,
2541
) -> RunService:
42+
"""
43+
Get RunService instance for FastAPI dependency injection.
44+
45+
This function is designed for use with FastAPI's DI system.
46+
For manual construction outside of a request context, use `create_run_service()`.
47+
"""
2648
return RunService(
2749
session=session,
2850
assistant_service=assistant_service,
@@ -33,3 +55,42 @@ def get_runs_service(
3355

3456

3557
RunServiceDep = Depends(get_runs_service)
58+
59+
60+
def create_run_service(session: Session, workspace_id: UUID4) -> RunService:
61+
"""
62+
Create a RunService with all required dependencies manually.
63+
64+
Use this function when you need a `RunService` outside of FastAPI's
65+
dependency injection context (e.g. APScheduler callbacks).
66+
67+
Args:
68+
session (Session): Database session.
69+
workspace_id (UUID4): The workspace ID for the run execution.
70+
71+
Returns:
72+
RunService: Configured run service.
73+
"""
74+
settings = get_settings()
75+
76+
assistant_service = get_assistant_service(session)
77+
file_service = get_file_service(session, settings)
78+
mcp_config_service = get_mcp_config_service(session, settings)
79+
mcp_client_manager_manager = get_mcp_client_manager_manager(mcp_config_service)
80+
81+
message_service = get_message_service(session)
82+
message_translator = get_message_translator(file_service, workspace_id)
83+
truncation_strategy_factory = get_truncation_strategy_factory()
84+
chat_history_manager = get_chat_history_manager(
85+
message_service,
86+
message_translator,
87+
truncation_strategy_factory,
88+
)
89+
90+
return RunService(
91+
session=session,
92+
assistant_service=assistant_service,
93+
mcp_client_manager_manager=mcp_client_manager_manager,
94+
chat_history_manager=chat_history_manager,
95+
settings=settings,
96+
)

src/askui/chat/api/scheduled_jobs/__init__.py

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""FastAPI dependencies for scheduled jobs."""
2+
3+
from fastapi import Depends
4+
5+
from askui.chat.api.scheduled_jobs.scheduler import scheduler
6+
from askui.chat.api.scheduled_jobs.service import ScheduledJobService
7+
8+
9+
def get_scheduled_job_service() -> ScheduledJobService:
10+
"""Get ScheduledJobService instance with the singleton scheduler."""
11+
return ScheduledJobService(scheduler=scheduler)
12+
13+
14+
ScheduledJobServiceDep = Depends(get_scheduled_job_service)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
"""Executor for scheduled job callbacks."""
2+
3+
import base64
4+
import logging
5+
import os
6+
from typing import Any
7+
8+
from sqlalchemy.orm import Session
9+
10+
from askui.chat.api.db.engine import engine
11+
from askui.chat.api.messages.dependencies import get_message_service
12+
from askui.chat.api.runs.dependencies import create_run_service
13+
from askui.chat.api.runs.models import RunCreate
14+
from askui.chat.api.scheduled_jobs.models import (
15+
MessageRerunnerData,
16+
ScheduledJobExecutionResult,
17+
scheduled_job_data_adapter,
18+
)
19+
20+
_logger = logging.getLogger(__name__)
21+
22+
23+
async def execute_job(
24+
**_kwargs: Any,
25+
) -> ScheduledJobExecutionResult:
26+
"""
27+
APScheduler callback that creates fresh services and executes the job.
28+
29+
This function is called by APScheduler when a job fires. It creates fresh
30+
database sessions and service instances to avoid stale connections.
31+
32+
Args:
33+
**_kwargs (Any): Keyword arguments containing job data.
34+
35+
Returns:
36+
ScheduledJobExecutionResult: The result containing job data and optional error.
37+
"""
38+
# Validates and returns the correct concrete type based on the `type` discriminator
39+
job_data = scheduled_job_data_adapter.validate_python(_kwargs)
40+
41+
_logger.info(
42+
"Executing scheduled job: workspace=%s, thread=%s",
43+
job_data.workspace_id,
44+
job_data.thread_id,
45+
)
46+
47+
error: str | None = None
48+
49+
try:
50+
# future proofing of new job types
51+
if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance]
52+
# Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars
53+
_previous_authorization = os.environ.get("ASKUI__AUTHORIZATION")
54+
55+
# remove authorization header since it takes precedence over the token and is set when forwarding bearer token
56+
os.environ["ASKUI__AUTHORIZATION"] = (
57+
f"Basic {base64.b64encode(job_data.askui_token.get_secret_value().encode()).decode()}"
58+
)
59+
60+
try:
61+
await _execute_message_rerunner_job(job_data)
62+
finally:
63+
# Restore previous AUTHORIZATION_HEADER env var
64+
if _previous_authorization is not None:
65+
os.environ["ASKUI__AUTHORIZATION"] = _previous_authorization
66+
except Exception as e:
67+
error = f"{type(e).__name__}: {e}"
68+
_logger.exception("Scheduled job failed: %s", error)
69+
70+
# Always return job data with optional error
71+
return ScheduledJobExecutionResult(data=job_data, error=error)
72+
73+
74+
async def _execute_message_rerunner_job(
75+
job_data: MessageRerunnerData,
76+
) -> None:
77+
"""
78+
Execute a message rerunner job.
79+
80+
Args:
81+
job_data: The job data.
82+
"""
83+
with Session(engine) as session:
84+
message_service = get_message_service(session)
85+
run_service = create_run_service(session, job_data.workspace_id)
86+
87+
# Create message
88+
message_service.create(
89+
workspace_id=job_data.workspace_id,
90+
thread_id=job_data.thread_id,
91+
params=job_data.message,
92+
)
93+
94+
# Create and execute run
95+
_logger.debug("Creating run with assistant %s", job_data.assistant_id)
96+
run, generator = await run_service.create(
97+
workspace_id=job_data.workspace_id,
98+
thread_id=job_data.thread_id,
99+
params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model),
100+
)
101+
102+
# Consume generator to completion of run
103+
_logger.debug("Waiting for run %s to complete", run.id)
104+
async for _event in generator:
105+
pass
106+
107+
# Check if run completed with error
108+
completed_run = run_service.retrieve(
109+
workspace_id=job_data.workspace_id,
110+
thread_id=job_data.thread_id,
111+
run_id=run.id,
112+
)
113+
114+
if completed_run.status == "failed":
115+
error_message = (
116+
completed_run.last_error.message
117+
if completed_run.last_error
118+
else "Run failed with unknown error"
119+
)
120+
raise RuntimeError(error_message)
121+
122+
_logger.info("Scheduled job completed: run_id=%s", run.id)

0 commit comments

Comments
 (0)