Skip to content

Commit dbf0236

Browse files
authored
Move runtimes to json journal store (#766)
1 parent 548164b commit dbf0236

10 files changed

Lines changed: 386 additions & 70 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
77

88
## Unreleased
99

10-
- Nothing yet.
10+
- {pull}`766` moves runtime profiling persistence from SQLite to a JSON snapshot plus
11+
append-only journal in `.pytask/`, keeping runtime data resilient to crashes and
12+
compacted on normal build exits.
1113

1214
## 0.5.8 - 2025-12-30
1315

justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ test-cov *FLAGS:
1212

1313
# Run type checking
1414
typing:
15-
uv run --group typing --group test --isolated ty check src/ tests/
15+
uv run --group typing --group test --isolated ty check
1616

1717
# Run linting
1818
lint:

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies = [
2424
"attrs>=21.3.0",
2525
"click>=8.1.8,!=8.2.0",
2626
"click-default-group>=1.2.4",
27+
"msgspec>=0.18.6",
2728
"networkx>=2.4.0",
2829
"optree>=0.9.0",
2930
"packaging>=23.0.0",
@@ -178,6 +179,7 @@ include = [
178179
unused-ignore-comment = "ignore"
179180

180181
[tool.ty.src]
182+
include = ["src", "tests"]
181183
exclude = ["src/_pytask/_hashlib.py"]
182184

183185
[tool.ty.terminal]

src/_pytask/journal.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""Helpers for append-only JSONL journals."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
from typing import TYPE_CHECKING
7+
from typing import Generic
8+
from typing import TypeVar
9+
10+
import msgspec
11+
12+
if TYPE_CHECKING:
13+
from pathlib import Path
14+
15+
T = TypeVar("T")
16+
17+
18+
@dataclass(frozen=True)
19+
class JsonlJournal(Generic[T]):
20+
"""Append-only JSONL journal with best-effort recovery."""
21+
22+
path: Path
23+
type_: type[T]
24+
25+
def append(self, payload: msgspec.Struct) -> None:
26+
"""Append a JSON line to the journal."""
27+
with self.path.open("ab") as journal_file:
28+
journal_file.write(msgspec.json.encode(payload) + b"\n")
29+
30+
def read(self) -> list[T]:
31+
"""Read entries, keeping valid entries on decode errors."""
32+
if not self.path.exists():
33+
return []
34+
35+
entries: list[T] = []
36+
data = self.path.read_bytes()
37+
offset = 0
38+
for line in data.splitlines(keepends=True):
39+
stripped = line.strip()
40+
if not stripped:
41+
offset += len(line)
42+
continue
43+
try:
44+
entries.append(msgspec.json.decode(stripped, type=self.type_))
45+
except msgspec.DecodeError:
46+
with self.path.open("rb+") as journal_file:
47+
journal_file.truncate(offset)
48+
return entries
49+
offset += len(line)
50+
return entries
51+
52+
def delete(self) -> None:
53+
"""Delete the journal if it exists."""
54+
if self.path.exists():
55+
self.path.unlink()

src/_pytask/profile.py

Lines changed: 45 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,18 @@
88
import sys
99
import time
1010
from contextlib import suppress
11+
from dataclasses import dataclass
1112
from typing import TYPE_CHECKING
1213
from typing import Any
1314

1415
import click
1516
from rich.table import Table
16-
from sqlalchemy.orm import Mapped
17-
from sqlalchemy.orm import mapped_column
1817

1918
from _pytask.click import ColoredCommand
2019
from _pytask.click import EnumChoice
2120
from _pytask.console import console
2221
from _pytask.console import format_task_name
2322
from _pytask.dag import create_dag
24-
from _pytask.database_utils import BaseTable
25-
from _pytask.database_utils import DatabaseSession
2623
from _pytask.exceptions import CollectionError
2724
from _pytask.exceptions import ConfigurationError
2825
from _pytask.node_protocols import PPathNode
@@ -31,6 +28,7 @@
3128
from _pytask.outcomes import TaskOutcome
3229
from _pytask.pluginmanager import hookimpl
3330
from _pytask.pluginmanager import storage
31+
from _pytask.runtime_store import RuntimeState
3432
from _pytask.session import Session
3533
from _pytask.traceback import Traceback
3634

@@ -48,16 +46,6 @@ class _ExportFormats(enum.Enum):
4846
CSV = "csv"
4947

5048

51-
class Runtime(BaseTable):
52-
"""Record of runtimes of tasks."""
53-
54-
__tablename__ = "runtime"
55-
56-
task: Mapped[str] = mapped_column(primary_key=True)
57-
date: Mapped[float]
58-
duration: Mapped[float]
59-
60-
6149
@hookimpl(tryfirst=True)
6250
def pytask_extend_command_line_interface(cli: click.Group) -> None:
6351
"""Extend the command line interface."""
@@ -67,8 +55,10 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
6755
@hookimpl
6856
def pytask_post_parse(config: dict[str, Any]) -> None:
6957
"""Register the export option."""
58+
runtime_state = RuntimeState.from_root(config["root"])
59+
config["pm"].register(ProfilePlugin(runtime_state))
60+
config["pm"].register(DurationNameSpace(runtime_state))
7061
config["pm"].register(ExportNameSpace)
71-
config["pm"].register(DurationNameSpace)
7262
config["pm"].register(FileSizeNameSpace)
7363

7464

@@ -82,27 +72,50 @@ def pytask_execute_task(task: PTask) -> Generator[None, None, None]:
8272
return result
8373

8474

85-
@hookimpl
86-
def pytask_execute_task_process_report(report: ExecutionReport) -> None:
87-
"""Store runtime of successfully finishing tasks in database."""
88-
task = report.task
89-
duration = task.attributes.get("duration")
90-
if report.outcome == TaskOutcome.SUCCESS and duration is not None:
91-
_create_or_update_runtime(task.signature, *duration)
75+
@dataclass
76+
class ProfilePlugin:
77+
"""Collect and persist runtime profiling data."""
78+
79+
runtime_state: RuntimeState
80+
81+
@hookimpl
82+
def pytask_execute_task_process_report(
83+
self, session: Session, report: ExecutionReport
84+
) -> None:
85+
"""Store runtime of successfully finishing tasks."""
86+
_ = session
87+
task = report.task
88+
duration = task.attributes.get("duration")
89+
if report.outcome == TaskOutcome.SUCCESS and duration is not None:
90+
self.runtime_state.update_task(task, *duration)
9291

92+
@hookimpl
93+
def pytask_unconfigure(self, session: Session) -> None:
94+
"""Flush runtime information on normal build exits."""
95+
if session.config.get("command") != "build":
96+
return
97+
if session.config.get("dry_run") or session.config.get("explain"):
98+
return
99+
self.runtime_state.flush()
93100

94-
def _create_or_update_runtime(task_signature: str, start: float, end: float) -> None:
95-
"""Create or update a runtime entry."""
96-
with DatabaseSession() as session:
97-
runtime = session.get(Runtime, task_signature)
98101

99-
if not runtime:
100-
session.add(Runtime(task=task_signature, date=start, duration=end - start))
101-
else:
102-
for attr, val in (("date", start), ("duration", end - start)):
103-
setattr(runtime, attr, val)
102+
@dataclass
103+
class DurationNameSpace:
104+
"""A namespace for adding durations to the profile."""
105+
106+
def __init__(self, runtime_state: RuntimeState) -> None:
107+
self.runtime_state = runtime_state
104108

105-
session.commit()
109+
@hookimpl
110+
def pytask_profile_add_info_on_task(
111+
self, session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]]
112+
) -> None:
113+
"""Add the runtime for tasks to the profile."""
114+
_ = session
115+
for task in tasks:
116+
duration = self.runtime_state.get_duration(task)
117+
if duration is not None:
118+
profile[task.name]["Duration (in s)"] = round(duration, 2)
106119

107120

108121
@click.command(cls=ColoredCommand)
@@ -183,29 +196,6 @@ def _print_profile_table(
183196
console.print("No information is stored on the collected tasks.")
184197

185198

186-
class DurationNameSpace:
187-
"""A namespace for adding durations to the profile."""
188-
189-
@staticmethod
190-
@hookimpl
191-
def pytask_profile_add_info_on_task(
192-
tasks: list[PTask], profile: dict[str, dict[str, Any]]
193-
) -> None:
194-
"""Add the runtime for tasks to the profile."""
195-
runtimes = _collect_runtimes(tasks)
196-
for name, duration in runtimes.items():
197-
profile[name]["Duration (in s)"] = round(duration, 2)
198-
199-
200-
def _collect_runtimes(tasks: list[PTask]) -> dict[str, float]:
201-
"""Collect runtimes."""
202-
with DatabaseSession() as session:
203-
runtimes = [session.get(Runtime, task.signature) for task in tasks]
204-
return {
205-
task.name: r.duration for task, r in zip(tasks, runtimes, strict=False) if r
206-
}
207-
208-
209199
class FileSizeNameSpace:
210200
"""A namespace for adding the total file size of products to a task."""
211201

src/_pytask/runtime_store.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""Runtime storage with an append-only journal."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
from dataclasses import field
7+
from typing import TYPE_CHECKING
8+
9+
import msgspec
10+
11+
from _pytask.journal import JsonlJournal
12+
13+
if TYPE_CHECKING:
14+
from pathlib import Path
15+
16+
from _pytask.node_protocols import PTask
17+
18+
19+
class _RuntimeEntry(msgspec.Struct):
20+
id: str
21+
date: float
22+
duration: float
23+
24+
25+
class _RuntimeFile(msgspec.Struct, forbid_unknown_fields=False):
26+
task: list[_RuntimeEntry] = msgspec.field(default_factory=list)
27+
28+
29+
class _RuntimeJournalEntry(msgspec.Struct, forbid_unknown_fields=False):
30+
id: str
31+
date: float
32+
duration: float
33+
34+
35+
def _read_runtimes(path: Path) -> _RuntimeFile | None:
36+
if not path.exists():
37+
return None
38+
try:
39+
data = msgspec.json.decode(path.read_bytes(), type=_RuntimeFile)
40+
except msgspec.DecodeError:
41+
path.unlink()
42+
return None
43+
return data
44+
45+
46+
def _write_runtimes(path: Path, runtimes: _RuntimeFile) -> None:
47+
data = msgspec.json.encode(runtimes)
48+
tmp = path.with_suffix(f"{path.suffix}.tmp")
49+
tmp.write_bytes(data)
50+
tmp.replace(path)
51+
52+
53+
def _read_journal(
54+
journal: JsonlJournal[_RuntimeJournalEntry],
55+
) -> list[_RuntimeJournalEntry]:
56+
return journal.read()
57+
58+
59+
def _apply_journal(
60+
runtimes: _RuntimeFile, entries: list[_RuntimeJournalEntry]
61+
) -> _RuntimeFile:
62+
if not entries:
63+
return runtimes
64+
index = {entry.id: entry for entry in runtimes.task}
65+
for entry in entries:
66+
index[entry.id] = _RuntimeEntry(
67+
id=entry.id, date=entry.date, duration=entry.duration
68+
)
69+
return _RuntimeFile(
70+
task=list(index.values()),
71+
)
72+
73+
74+
@dataclass
75+
class RuntimeState:
76+
path: Path
77+
runtimes: _RuntimeFile
78+
journal: JsonlJournal[_RuntimeJournalEntry]
79+
_index: dict[str, _RuntimeEntry] = field(init=False, default_factory=dict)
80+
_dirty: bool = field(init=False, default=False)
81+
82+
def __post_init__(self) -> None:
83+
self._rebuild_index()
84+
85+
@classmethod
86+
def from_root(cls, root: Path) -> RuntimeState:
87+
path = root / ".pytask" / "runtimes.json"
88+
journal = JsonlJournal(
89+
path=path.with_suffix(".journal"), type_=_RuntimeJournalEntry
90+
)
91+
existing = _read_runtimes(path)
92+
journal_entries = _read_journal(journal)
93+
if existing is None:
94+
runtimes = _RuntimeFile(
95+
task=[],
96+
)
97+
runtimes = _apply_journal(runtimes, journal_entries)
98+
state = cls(path=path, runtimes=runtimes, journal=journal)
99+
else:
100+
runtimes = _apply_journal(existing, journal_entries)
101+
state = cls(path=path, runtimes=runtimes, journal=journal)
102+
103+
if journal_entries:
104+
state._dirty = True
105+
return state
106+
107+
def _rebuild_index(self) -> None:
108+
self._index = {entry.id: entry for entry in self.runtimes.task}
109+
110+
def update_task(self, task: PTask, start: float, end: float) -> None:
111+
task_id = task.name
112+
entry = _RuntimeEntry(id=task_id, date=start, duration=end - start)
113+
self._index[entry.id] = entry
114+
self.runtimes = _RuntimeFile(
115+
task=list(self._index.values()),
116+
)
117+
self._rebuild_index()
118+
journal_entry = _RuntimeJournalEntry(
119+
id=entry.id,
120+
date=entry.date,
121+
duration=entry.duration,
122+
)
123+
self.journal.append(journal_entry)
124+
self._dirty = True
125+
126+
def get_duration(self, task: PTask) -> float | None:
127+
task_id = task.name
128+
entry = self._index.get(task_id)
129+
if entry is None:
130+
return None
131+
return entry.duration
132+
133+
def flush(self) -> None:
134+
if not self._dirty:
135+
return
136+
_write_runtimes(self.path, self.runtimes)
137+
self.journal.delete()
138+
self._dirty = False

0 commit comments

Comments
 (0)