-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathsession_journal.py
More file actions
1447 lines (1285 loc) · 63.9 KB
/
Copy pathsession_journal.py
File metadata and controls
1447 lines (1285 loc) · 63.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Location: pact-plugin/hooks/shared/session_journal.py
Summary: Append-only JSONL event store for GC-proof workflow state persistence.
Used by: session_init.py, session_end.py, agent_handoff_emitter.py (hooks);
orchestrate.md, comPACT.md, peer-review.md, wrap-up.md, pause.md
(commands invoke via CLI: python3 session_journal.py write|read|read-last).
Write path: O_APPEND append, protected by an exclusive advisory lock
(`fcntl.flock(LOCK_EX)`) around the short-write loop in `_atomic_write`.
POSIX only guarantees `os.write` atomicity up to PIPE_BUF (512 bytes on
macOS, 4096 on Linux); for larger events the short-write loop would
otherwise open an interleaving window between iterations where another
O_APPEND writer could splice its bytes into the middle of ours. The
flock closes that window, so concurrent writers (hooks + orchestrator
CLI calls) are safe for events of any size on single-host local
filesystems. The guarantee is single-host only — advisory locks do not
cross machine boundaries and NFS flock semantics are unreliable — which
is fine because pact-sessions is per-host already. The short-write
loop itself remains the guard against partial writes from signal
interruption.
Read path: Sequential scan with type filtering. For typical sessions
(<200 events, <80KB), full scan completes in <5ms. For crash recovery,
read from end to find last checkpoint, then replay forward.
Durability: Best-effort. The write+rename+lock cycle protects against
interleaving and partial writes from concurrent writers, but `_atomic_write`
does NOT call `fsync` after the write. After a hard crash (power loss,
kernel panic), the most recent event may be lost even though
`append_event` returned True. This is intentional — the journal lives on
the orchestrate hot path (every checkpoint, phase transition, dispatch)
and a per-write fsync is too expensive there. Durability "to OS buffers"
is the contract; cross-process visibility is immediate after the lock
releases.
Dual API pattern:
- Implicit API (hooks): append_event(), read_events(), read_last_event(),
get_journal_path() — derive path via pact_context.get_session_dir().
- Explicit API (resume/CLI): read_events_from(session_dir), read_last_event_from(session_dir)
— caller provides session directory path.
File location: ~/.claude/pact-sessions/{slug}/{session_id}/session-journal.jsonl
Permissions: 0o600 (owner read/write only)
Directory permissions: 0o700 (owner only)
"""
from __future__ import annotations
import fcntl
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# Schema version for journal events.
_SCHEMA_VERSION = 1
# Tail window size for `_read_last_event_at`. Reverse-scan reads at most
# this many trailing bytes before falling back to a full-file slurp. In a
# typical active session 32 KB covers ~100-300 JSONL entries, which is
# the common-case window for the most-recent event of any tracked type.
# Worst case (target event older than 32 KB from EOF) falls back to the
# pre-optimization full-slurp cost.
_TAIL_WINDOW_BYTES = 32 * 1024
# Per-type required fields, derived from actual writer call sites. Every
# entry here reflects what a production writer ACTUALLY produces (grep
# `make_event("{type}"` in hooks/ and `write --type {type}` in commands/),
# not an aspirational schema. Unknown types (e.g. "test" in unit tests)
# bypass per-type validation by design and only get baseline v/type/ts
# checks — this preserves test ergonomics without loosening production
# safety. Note the trade-off: a typo in `event_type` at a writer call site
# (e.g. `make_event("phse_transition", ...)`) will silently bypass per-type
# validation rather than raise. The mitigation is the test in
# TestValidateEventSchemaPerType — every production type MUST have a test
# entry, which catches typos in tests rather than at runtime.
#
# Each required field maps to its expected Python type. At validate time,
# the validator checks presence AND `isinstance(value, expected_type)` AND
# — for str fields — rejects empty or whitespace-only values. The expected
# types reflect what the writer literally produces: unquoted values in the
# --data JSON become int/bool/list/dict, quoted values become str.
#
# When adding a new event type, add it here with its required field → type
# mapping AND add a test to TestValidateEventSchemaPerType in
# test_session_journal.py.
#
# Trust boundary: write path validates events against this dict; read path
# trusts disk content. Loosening this dict without auditing all readers
# will silently break extractors assuming validated shape.
_REQUIRED_FIELDS_BY_TYPE: dict[str, dict[str, type]] = {
# hooks/session_init.py writes session_start with team, session_id,
# project_dir, worktree on the valid-stdin path only (under R3, the event
# is dropped entirely when stdin lacks session_id to avoid an unreapable
# `unknown-*` directory leak). Of these, session_id and project_dir are
# the load-bearing fields downstream consumers depend on; team is
# redundant with CLAUDE.md and worktree is empty at write time.
"session_start": {"session_id": str, "project_dir": str},
# commands/orchestrate.md writes variety_assessed with task_id (quoted
# string) and variety (nested JSON object → dict). This is the FEATURE-level
# variety (written once for the feature task) — distinct from the
# per-dispatch dispatch_variety below.
"variety_assessed": {"task_id": str, "variety": dict},
# hooks/task_lifecycle_gate.py emits dispatch_variety on the TaskCreate of a
# Task-B carrying metadata.variety (one per dispatch). The GC-immune mirror
# of the per-dispatch variety stamp (#955) — the task store that holds
# metadata.variety is reaped by the teams/tasks reaper, so wrap-up Q5 read
# false-empty after GC; this journal event is the durable source.
# task_id is the Task-B id; variety is the 5-key dict (4 dims + total).
# The emitter PROJECTS metadata.variety to exactly these 5 keys
# (DISPATCH_VARIETY_KEYS) before append — the *_rationale strings are NOT
# mirrored (pact-variety.md §5.1). Read by wrap-up Q5 as the GC-immune
# source for compute_variety_divergence's dispatch_varieties list, which
# consumes only .total. (variety is typed `dict`, so the schema check
# enforces only the top-level task_id+variety keys — the projection lives
# at the emit site, not here.)
"dispatch_variety": {"task_id": str, "variety": dict},
# hooks/task_lifecycle_gate.py emits teachback_ack on the lead's
# TaskUpdate(A, status="completed") accepting a teachback whose Task-A
# metadata carries teachback_submit.variety_acknowledgment (#955). task_id is
# the Task-A id; rationale_articulates_this_dispatch is the teammate's
# "yes"|"no"|"concern" flag. The GC-immune mirror read by wrap-up Q6's
# cargo_cult_signal_rate (the task store goes false-empty after GC).
"teachback_ack": {"task_id": str, "rationale_articulates_this_dispatch": str},
# commands/orchestrate.md + comPACT.md write phase_transition with
# phase + status (both quoted strings). session_resume._build_journal_resume
# subscripts `p["phase"]` — this schema check is the defensive bulwark
# against F1.
"phase_transition": {"phase": str, "status": str},
# commands/orchestrate.md writes checkpoint with phase (quoted string) +
# completed_phases + active_agents + variety + pending_phases + safe_to_retry.
# Only `phase` is universally required; the rest vary per checkpoint context.
"checkpoint": {"phase": str},
# commands/orchestrate.md + comPACT.md write agent_dispatch with agent,
# task_id, phase (all quoted strings) + scope (list).
"agent_dispatch": {"agent": str, "task_id": str, "phase": str},
# hooks/agent_handoff_emitter.py writes agent_handoff with agent, task_id,
# task_subject (all strings) and handoff (dict from task metadata).
# All four are load-bearing for the secretary.
"agent_handoff": {
"agent": str,
"task_id": str,
"task_subject": str,
"handoff": dict,
},
# hooks/missed_wake_scan.py writes missed_wake (the #903 deferred
# missed-wake alarm) with task_id, agent (the idle teammate / task owner),
# and since (the intentional_wait timestamp — also the re-arm discriminator
# for the per-(team,task_id,since) marker). All three are load-bearing:
# which task is stuck, who is waiting, and since when.
"missed_wake": {
"task_id": str,
"agent": str,
"since": str,
},
# commands/orchestrate.md writes s2_state_seeded with worktree (quoted
# string), agents (JSON list), and boundaries (JSON object → dict).
# No hook-based writer; CLI-only event.
"s2_state_seeded": {"worktree": str, "agents": list, "boundaries": dict},
# commands/orchestrate.md + comPACT.md write commit with sha, message,
# phase (all quoted strings).
"commit": {"sha": str, "message": str, "phase": str},
# commands/peer-review.md writes review_dispatch with pr_number (unquoted
# int), pr_url (quoted string), reviewers (JSON list).
"review_dispatch": {"pr_number": int, "pr_url": str, "reviewers": list},
# commands/peer-review.md writes review_finding with severity, finding,
# reviewer, task_id (all quoted strings).
"review_finding": {"severity": str, "finding": str, "reviewer": str},
# commands/peer-review.md writes remediation with cycle (unquoted int),
# items (JSON list), fixer (quoted string).
"remediation": {"cycle": int, "items": list, "fixer": str},
# commands/peer-review.md writes pr_ready with pr_number (unquoted int),
# pr_url (quoted string), commits (unquoted int).
"pr_ready": {"pr_number": int, "pr_url": str, "commits": int},
# commands/pause.md writes session_paused with pr_number (unquoted int),
# pr_url/branch/worktree_path (quoted strings),
# consolidation_completed (unquoted bool), team_name (quoted string).
"session_paused": {
"pr_number": int,
"pr_url": str,
"branch": str,
"worktree_path": str,
"consolidation_completed": bool,
},
# hooks/session_end.py writes session_end with NO required fields — one
# writer passes an optional `warning` (line 119), the other passes
# nothing (line 316). commands/wrap-up.md CLI also writes session_end
# with no --data. Baseline v/type/ts validation is the only requirement.
"session_end": {},
# hooks/session_end.py writes cleanup_summary after the teams/tasks
# reaper runs (#412 Fix B). No required fields — the event is a counts
# audit trail and every field is optional by design. The empty-dict
# entry is structurally necessary: _validate_event_schema short-circuits
# on unknown types and skips the _OPTIONAL_FIELDS_BY_TYPE loop, so a
# type must be registered here to activate optional-field type checks.
"cleanup_summary": {},
# commands/wrap-up.md + pause.md write session_consolidated after the
# secretary's memory-consolidation Pass 2 completes (#453 Fix B). No
# required fields — the event's mere existence is the detector signal
# consumed by session_end.check_unpaused_pr. Empty-dict registration
# activates the _OPTIONAL_FIELDS_BY_TYPE enforcement below (same pattern
# as session_end and cleanup_summary).
"session_consolidated": {},
# The lead-frame command emit sites (plan-mode.md, peer-review.md, and
# orchestrate.md's PREPARE/ARCHITECT/CODE phase-output validation) write
# artifact_paths via the CLI write path — a path-only, GC-durable pointer
# to each phase's on-disk artifact(s). It outlives `git worktree remove`
# because the journal lives under ~/.claude/pact-sessions/, outside the
# worktree; only the pointed-at file is worktree-ephemeral. The secretary
# resolves these events at harvest and distills the artifact substance into
# pact-memory. `workflow` is the lowercase phase/workflow tag (one of
# plan-mode / prepare / architect / peer-review / code-auditor) and the
# dedup/precedence axis; `feature` is the slug scoping the event to one arc;
# `paths` is the PLURAL full-enumeration list of worktree-absolute artifact
# paths (a phase may write >1 file).
#
# Validator-depth caveat: `paths` is typed `list`, so the schema check
# enforces only isinstance(value, list) — it does NOT descend into the list
# to require each element be a non-empty str (the per-field empty-string
# guard applies to `str` fields only). Per-element path validity is the
# WRITER's responsibility: the emit sites drop empty/invalid entries and
# drop the whole emit when the glob found nothing (an empty paths list
# passes isinstance but is meaningless — that "missing artifact" case is the
# task_lifecycle_gate backstop's job, NOT a zero-length event).
"artifact_paths": {"workflow": str, "feature": str, "paths": list},
}
# Per-type optional fields, with expected Python type. Fields listed here
# are NOT required — an event missing them still passes validation — but
# when they ARE present, the validator enforces type. This is the schema
# contract counterpart to runtime clamps (e.g. the `_VALID_SOURCES` clamp
# in session_init.py): if a future writer bypasses the normalization
# path and emits the wrong type directly to `make_event`, the event is
# rejected at validate time rather than landing on disk.
# Same type-symmetry rules as _REQUIRED_FIELDS_BY_TYPE: `int` fields
# reject `bool` because bool subclasses int.
#
# When adding a new optional field, add it here and add a matching
# happy-path + wrong-type case to TestValidateOptionalFieldTypes in
# test_session_journal.py.
_OPTIONAL_FIELDS_BY_TYPE: dict[str, dict[str, type]] = {
# hooks/session_init.py writes session_start with an optional `source`
# drawn from stdin. The session_init normalization path clamps non-str
# inputs to "unknown" before the journal write; this schema contract
# catches any future writer that bypasses that path.
"session_start": {"source": str},
# hooks/session_end.py writes session_end with an optional `warning`
# string when check_unpaused_pr detects an open PR that was NOT
# paused (no memory consolidation). The empty-dict registration in
# _REQUIRED_FIELDS_BY_TYPE above ("session_end": {}) is what
# ACTIVATES this optional check — _validate_event_schema
# short-circuits on unknown event types and would otherwise skip
# the optional loop. Symmetric with the cleanup_summary registration
# shipped in the same PR (#412 Fix B).
"session_end": {"warning": str},
# hooks/session_end.py writes cleanup_summary after the teams/tasks
# reaper runs (#412 Fix B). Counts-only payload; no identifying names
# (audit surface area minimization). `teams_ran`/`tasks_ran`
# discriminate "reaper executed and found nothing" (True, 0/0) from
# "reaper short-circuited at callsite" (False, 0/0) per side; without
# them the two states are indistinguishable in the journal. Cycle-8
# split these from the older single `reaper_ran` bool and the single
# `ttl_days` int into per-reaper fields so an auditor can tell WHICH
# side short-circuited and which TTL applied on either side
# (currently both default to 30 days; split future-proofs against
# TTL divergence).
"cleanup_summary": {
"teams_reaped": int,
"teams_skipped": int,
"tasks_reaped": int,
"tasks_skipped": int,
"teams_ttl_days": int,
"tasks_ttl_days": int,
"teams_ran": bool,
"tasks_ran": bool,
},
# commands/wrap-up.md + pause.md write session_consolidated after the
# secretary's consolidation Pass 2 completes (#453 Fix B). The existence
# of the event is the signal consumed by session_end.check_unpaused_pr;
# these fields are audit trail for session_resume summaries and future
# observability. `pass` distinguishes which consolidation pass ran
# (1 or 2); the two count fields are advisory and may be 0 when the
# secretary cannot produce exact numbers.
"session_consolidated": {
"pass": int,
"task_count": int,
"memories_saved": int,
},
# hooks/missed_wake_scan.py writes missed_wake with optional task_subject
# (human-readable task label) and reason (the intentional_wait reason —
# always "awaiting_lead_completion" for this alarm, recorded for journal
# readers). The required-fields registration above ("missed_wake": {...})
# is what ACTIVATES this optional check — _validate_event_schema
# short-circuits on unknown types and would otherwise skip the optional loop.
"missed_wake": {
"task_subject": str,
"reason": str,
},
# hooks/task_lifecycle_gate.py writes teachback_ack with an optional concern
# string — the teammate's variety_acknowledgment.concern, present only when
# rationale_articulates_this_dispatch is "no"/"concern" (per pact-variety.md;
# a "yes" ack legitimately omits it). The required-fields registration above
# ("teachback_ack": {...}) is what ACTIVATES this optional check —
# _validate_event_schema short-circuits on unknown types and would otherwise
# skip the optional loop (same activation pattern as session_end /
# missed_wake).
"teachback_ack": {
"concern": str,
},
# commands/peer-review.md writes remediation with an optional task_id —
# the fixer's Task-B task id. The Q5 coverage denominator
# (variety_divergence.count_task_b_dispatch_sites) uses it to dedup a
# comPACT/orchestrate-dispatched remediation that ALSO emits
# agent_dispatch for the same task_id, so the site is counted once.
# Optional because a remediation may omit it; an id-less remediation is
# counted as a distinct site (fail-safe — never undercounts the
# denominator). The required-fields registration above
# ("remediation": {...}) activates this optional check.
"remediation": {
"task_id": str,
},
# The lead-frame emit sites write artifact_paths with an optional `task_id`
# — the phase task id the lead completed when emitting (provenance /
# cross-link). Absent for plan-mode/peer-review syntheses that have no phase
# task; present (and a non-empty str) for the PREPARE/ARCHITECT/CODE phase
# emits. The required-fields registration above ("artifact_paths": {...})
# is what ACTIVATES this optional check — _validate_event_schema
# short-circuits on unknown types and would otherwise skip the optional loop
# (same activation pattern as session_end / missed_wake / teachback_ack).
"artifact_paths": {
"task_id": str,
},
}
# --- Write API ---
def make_event(event_type: str, **fields: Any) -> dict[str, Any]:
"""
Construct a journal event dict with common fields pre-filled.
Sets v=1 and ts=current UTC time. Caller provides type-specific fields.
A caller-supplied `ts` (in **fields) is honored — it is only auto-set
when the caller does not provide one. This lets test fixtures and
backfill tooling stamp deterministic timestamps without round-tripping
through the journal.
Args:
event_type: Event type string (e.g., "agent_handoff", "session_start")
**fields: Type-specific fields to include in the event. May include
an explicit `ts` to override the auto-set timestamp.
Returns:
Complete event dict ready for append_event()
"""
event: dict[str, Any] = {
"v": _SCHEMA_VERSION,
"type": event_type,
}
event.update(fields)
# Use setdefault so a caller-supplied ts in **fields is preserved.
# Without setdefault, the previous unconditional assignment silently
# discarded any caller ts and contradicted the docstring.
event.setdefault(
"ts", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
)
return event
def _validate_event_schema(event: dict[str, Any]) -> tuple[bool, str]:
"""
Validate that an event dict has the required schema fields.
Baseline (all event types):
- 'v' is an int (and NOT a bool — Python bool is a subclass of int,
so it must be rejected explicitly)
- 'type' is a non-empty str (whitespace-only is rejected)
Per-type (only for types in _REQUIRED_FIELDS_BY_TYPE):
- Every required field is present and not None.
- Every required field has the expected Python type (isinstance check).
`int` fields reject `bool` explicitly because bool is an int subclass;
a writer passing `pr_number=True` would otherwise slip through.
- `str` fields additionally reject empty and whitespace-only values —
a blank `phase` or `agent` is functionally indistinguishable from
missing for every downstream consumer, and the baseline `type` check
already uses the same semantics for consistency.
- Unknown event types (e.g. free-form "test" used in unit tests) pass
per-type validation by design — the whitelist is opt-in enforcement
for known types. The trade-off: a typo in a production
`make_event("…")` call site silently bypasses per-type checks. The
TestValidateEventSchemaPerType suite catches that at test time.
Optional fields (for types in _OPTIONAL_FIELDS_BY_TYPE):
- Absent fields pass (the field is optional by definition).
- Present fields must match the declared type, applying the same
bool-in-int + empty-str rules as required fields. This is the
schema-level counterpart to runtime clamping paths such as the
`source` isinstance guard in session_init.py — a future writer
that bypasses the clamp and emits the wrong type directly to
`make_event` is rejected at validate time.
This is the bulwark that prevents BugF1: a malformed `phase_transition`
event (missing `phase` field, or `phase=""`, or `phase=42`) from any
writer causes `append_event` or the CLI write path to return False
BEFORE the bad line reaches disk, so `_build_journal_resume` in the
next session never has to deal with it. The defensive consumer is
still a backstop for anything that slips past this (e.g. events from
prior schema versions).
Returns:
A `(ok, reason)` tuple. `ok` is True only when every check passes;
`reason` is a short human-readable string. On success `reason` is
"ok". On failure `reason` identifies the first failing check so
callers (notably the CLI write path) can surface a precise error
to stderr instead of a generic "invalid event schema" line.
"""
v = event.get("v")
if not isinstance(v, int) or isinstance(v, bool):
return False, "v must be int"
event_type = event.get("type")
if not isinstance(event_type, str) or not event_type.strip():
return False, "type must be non-empty str"
required = _REQUIRED_FIELDS_BY_TYPE.get(event_type)
if required is None:
# Unknown event type — opt-in enforcement, pass through.
return True, "ok"
for field, expected_type in required.items():
if field not in event or event[field] is None:
return (
False,
f"missing required field '{field}' for type '{event_type}'",
)
value = event[field]
# int fields must reject bool even though bool subclasses int —
# symmetric with the baseline v check above.
if expected_type is int and isinstance(value, bool):
return (
False,
f"field '{field}' for type '{event_type}' must be int, "
f"got bool",
)
if not isinstance(value, expected_type):
return (
False,
f"field '{field}' for type '{event_type}' must be "
f"{expected_type.__name__}, got {type(value).__name__}",
)
# Fix B (RG2): str fields additionally reject empty or
# whitespace-only values — a blank phase/agent/task_id would
# pass the isinstance check but break every downstream consumer.
if expected_type is str and not value.strip():
return (
False,
f"field '{field}' for type '{event_type}' must be "
f"non-empty string",
)
# artifact_paths element-level guard (belt-and-suspenders): the generic
# per-field check above is SHALLOW — it confirms `paths` is a `list` but
# does not descend into its elements. The locked design makes per-element
# validity the writer's responsibility (the emit sites drop empty/invalid
# paths and skip an empty glob); this schema-layer check augments that with
# a defensive backstop so a writer that bypasses the emit-site discipline
# cannot land a `paths` list holding a non-str or empty/whitespace-only
# element on disk, where a downstream reader would treat it as a real path.
# Scoped to artifact_paths ONLY — other list-typed required fields
# (s2_state_seeded.agents, review_dispatch.reviewers, remediation.items)
# keep their existing shallow contract and are untouched.
if event_type == "artifact_paths":
for element in event["paths"]:
if not isinstance(element, str) or not element.strip():
return (
False,
"field 'paths' for type 'artifact_paths' must contain "
"only non-empty strings",
)
# Per-type optional field checks. Absent fields pass (that's what
# "optional" means); present fields must match the declared type.
# Symmetric with required-field checks: rejects bool in int fields,
# rejects empty/whitespace-only str. Event types with no optional
# declarations (the common case) get a no-op empty dict from .get()
# and skip the loop entirely.
optional = _OPTIONAL_FIELDS_BY_TYPE.get(event_type, {})
for field, expected_type in optional.items():
if field not in event or event[field] is None:
continue # Absent optional field — pass through.
value = event[field]
if expected_type is int and isinstance(value, bool):
return (
False,
f"optional field '{field}' for type '{event_type}' must "
f"be int, got bool",
)
if not isinstance(value, expected_type):
return (
False,
f"optional field '{field}' for type '{event_type}' must "
f"be {expected_type.__name__}, got {type(value).__name__}",
)
if expected_type is str and not value.strip():
return (
False,
f"optional field '{field}' for type '{event_type}' must "
f"be non-empty string",
)
return True, "ok"
def append_event(event: dict[str, Any]) -> bool:
"""
Append a single event to the current session's journal.
Path is derived implicitly via pact_context.get_session_dir().
Creates the session directory if it doesn't exist (mkdir -p, 0o700).
Serializes event to JSON, appends newline, writes atomically via
O_WRONLY | O_APPEND | O_CREAT with 0o600 permissions.
Args:
event: Event dict. Must include 'v' (int) and 'type' (non-empty str).
'ts' is auto-set if missing. Invalid events cause a silent
return False (fail-open).
Returns:
True if write succeeded, False on any error (fail-open).
"""
try:
# Validate required schema fields (shared with CLI write path).
# In-process API is fail-open: the caller gets a bool and the
# reason is intentionally discarded — hooks never surface per-type
# validator messages to end users. The CLI write path below has a
# symmetric call site that DOES print the reason.
ok, _reason = _validate_event_schema(event)
if not ok:
return False
# Auto-set timestamp if missing
if "ts" not in event:
event["ts"] = datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
# Derive path from session context (implicit — current session)
journal = _journal_path()
if journal is None:
# AdvF2 Approach 4: warn (but do not fail) when the implicit API
# is invoked before pact_context.init(). The return value still
# honors the existing fail-open contract — the warning is purely
# additive so a missing init() in a hook surfaces as a visible
# signal during development instead of a silent no-op in
# production. The is_initialized() check pinpoints the missing
# init root cause; if pact_context IS initialized but the path is
# still unavailable, that's a different failure mode (e.g.
# missing session_id) and we leave the existing silent fail-open
# in place to avoid noise.
if not _pact_context_is_initialized():
print(
"session_journal: append_event called before "
"pact_context.init() — returning False (this may "
"indicate a hook missing session_id)",
file=sys.stderr,
)
return False
# Ensure directory exists (mkdir -p with 0o700)
journal.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
# Serialize and write atomically via O_APPEND
entry = json.dumps(event, separators=(",", ":")) + "\n"
if not _atomic_write(journal, entry.encode("utf-8")):
print(
"session_journal: append_event failed: write error",
file=sys.stderr,
)
return False
return True
except Exception as e:
print(
f"session_journal: append_event failed: {e}",
file=sys.stderr,
)
return False
# --- Read API ---
def read_events(
event_type: str | None = None,
since: str | None = None,
) -> list[dict[str, Any]]:
"""
Read events from the current session's journal, optionally filtered by type.
Path is derived implicitly via pact_context.get_session_dir().
Reads the full journal file, parses each line as JSON, and returns
events matching the filter. Malformed lines are silently skipped
(each event is self-contained — one bad line doesn't affect others).
Args:
event_type: If provided, only return events with this type.
If None, return all events.
Returns:
List of event dicts, in chronological order (oldest first).
Empty list if journal doesn't exist or on any error.
"""
try:
journal = _journal_path()
if journal is None:
# AdvF2 Approach 4: see append_event for rationale. Warns only
# when the path is unavailable AND pact_context was never
# initialized — the canonical "hook forgot to call init()" bug.
if not _pact_context_is_initialized():
print(
"session_journal: read_events called before "
"pact_context.init() — returning [] (this may indicate "
"a hook missing session_id)",
file=sys.stderr,
)
return []
return _read_events_at(journal, event_type, since)
except Exception:
return []
def read_events_from(
session_dir: str,
event_type: str | None = None,
since: str | None = None,
) -> list[dict[str, Any]]:
"""
Read events from a specific session's journal (explicit path).
Used for cross-session reads (resume, CLI) where the caller knows
the session directory path.
Args:
session_dir: Absolute path to the session directory.
event_type: If provided, only return events with this type.
If None, return all events.
Returns:
List of event dicts, in chronological order (oldest first).
Empty list if journal doesn't exist or on any error.
"""
if not session_dir:
# AdvF2 Approach 4 (parity with implicit API): emit a stderr
# warning before the silent fallback so an unset/empty
# session_dir at the call site surfaces as a visible signal
# rather than a mute empty result. The empty list is preserved
# so callers see the same return contract.
print(
"session_journal: read_events_from called with empty session_dir",
file=sys.stderr,
)
return []
journal = _journal_path_from(session_dir)
return _read_events_at(journal, event_type, since)
def resolve_latest_artifacts(
events: list[dict[str, Any]],
feature: str,
) -> dict[str, list[str]]:
"""Resolve the superseded artifact path-list per workflow for one feature.
Pure (no I/O): the caller supplies already-read `artifact_paths` events
(e.g. `read_events_from(session_dir, "artifact_paths")`) and the feature
slug; this returns one entry per workflow that emitted artifacts for that
feature, valued by that workflow's latest path-list.
Supersede semantics: filter to `e["feature"] ==
feature`; group by `e["workflow"]`; within each group keep the
latest-`ts` event. Each `artifact_paths` event carries the COMPLETE
path-list for its `(workflow, feature)` (a full enumeration per emit,
not a delta), so the latest event is self-sufficient — paths are NEVER
merged across events. A phase re-run that regenerates its doc in place
therefore supersedes the prior emit instead of duplicating it.
Tie-break = LAST-wins: when two events for the same `(workflow, feature)`
carry an equal `ts`, the one iterated later (the more-recently-written in
journal order) wins — see `_ts_supersedes`. `make_event` stamps `ts` at
second granularity, so a same-second double-emit is resolved to the last
write, which is the authoritative complete snapshot.
Defensive: non-dict entries and events missing `workflow`/`feature`/
`paths` are skipped (parity with the `_read_events_at` `isinstance(...,
dict)` guard), and any non-string element inside a surviving event's
`paths` list is dropped from the emitted list (the same isinstance-guard
discipline applied at element granularity, so a malformed path entry can
never flow through to the JSON output). Timestamp handling is FAIL-OPEN
(see `_ts_supersedes`): a
missing/unparseable `ts` on a candidate does not let it supersede a
well-formed incumbent, and an unresolved incumbent ts is replaced by any
well-formed candidate. A pair of parseable-but-incomparable timestamps
(e.g. one tz-aware, one tz-naive) is caught at the comparison and keeps
the incumbent rather than raising — the resolution never crashes on a
malformed `ts`.
Args:
events: Candidate journal events (typically all `artifact_paths`
events for the session). Any list of dicts is accepted; the
feature/type filtering happens here.
feature: The feature slug to resolve (matched against `e["feature"]`).
Returns:
`{workflow: paths}` — one key per workflow with a surviving event,
valued by that workflow's superseded (latest-`ts`, last-wins on a
tie) complete path-list, with any non-string element filtered out.
Empty dict if no event matches.
"""
latest_by_workflow: dict[str, dict[str, Any]] = {}
for event in events:
if not isinstance(event, dict):
continue
if event.get("feature") != feature:
continue
workflow = event.get("workflow")
paths = event.get("paths")
if not isinstance(workflow, str) or not isinstance(paths, list):
continue
prior = latest_by_workflow.get(workflow)
if prior is None or _ts_supersedes(event.get("ts"), prior.get("ts")):
latest_by_workflow[workflow] = event
return {
workflow: [p for p in event["paths"] if isinstance(p, str)]
for workflow, event in latest_by_workflow.items()
}
def _ts_supersedes(candidate_ts: Any, incumbent_ts: Any) -> bool:
"""Return True if a later-iterated `candidate_ts` should supersede the
incumbent — i.e. the candidate is newer than OR EQUAL TO the incumbent.
Used by `resolve_latest_artifacts` to pick the surviving event per
workflow. Timestamps are PARSED (via `_parse_ts`), never lexically
string-compared — see `_parse_ts` for the `Z` vs `+00:00` rationale.
Tie-break = LAST-wins (`>=`, not `>`): on an equal `ts`, the candidate
(iterated later, hence the more-recently-written event in journal order)
supersedes. Each `artifact_paths` emit is a COMPLETE snapshot, so the
last write for a `(workflow, feature)` is the authoritative one even when
two emits collide in the same wall-clock second (`make_event` stamps `ts`
at second granularity).
Fail-open comparison (matches the in-house `_ts_ge` pattern): the parse
AND the comparison are guarded. A missing/unparseable `candidate_ts`
returns False, so a malformed candidate never supersedes a well-formed
incumbent. A missing/unparseable `incumbent_ts` returns True, so a
well-formed candidate replaces a malformed incumbent.
Naive/aware coercion: if a parsed value is tz-NAIVE (only possible from a
corrupted/externally-merged journal — `make_event` always stamps aware
`...Z`), it is assumed UTC and coerced to tz-aware before the comparison,
so a naive-vs-aware pair compares by actual INSTANT (the later instant
wins) instead of raising `TypeError`. The comparison stays wrapped in a
`try/except TypeError` that fail-opens (returns False, keeps the incumbent)
for any residual uncomparable pair, so the resolution never crashes.
"""
try:
candidate = _parse_ts(candidate_ts)
except (ValueError, TypeError):
return False
try:
incumbent = _parse_ts(incumbent_ts)
except (ValueError, TypeError):
return True
# Coerce a tz-naive value to tz-aware UTC so a naive-vs-aware pair compares
# by instant rather than raising TypeError (which the outer guard would turn
# into a fail-open keep-stale). Assuming UTC is the safe interpretation; a
# naive ts only arises from a corrupted journal (make_event always stamps Z).
candidate = candidate if candidate.tzinfo is not None else candidate.replace(tzinfo=timezone.utc)
incumbent = incumbent if incumbent.tzinfo is not None else incumbent.replace(tzinfo=timezone.utc)
try:
return candidate >= incumbent
except TypeError:
return False
def _normalize_trailing_z(value: Any) -> str:
"""Rewrite a SINGLE trailing `Z` UTC designator to `+00:00`, leaving any
interior `Z` intact.
The anchor is TRAILING-ONLY (`str.endswith`, no `re` dependency): an
interior `Z` is not a valid ISO-8601 field, so leaving it intact lets the
downstream `fromisoformat` reject the whole string rather than a blanket
`.replace("Z", "+00:00")` rewriting it mid-string. On `_parse_ts`'s
return/raise the trailing-only and replace-all forms are observationally
identical (any interior `Z` is unparseable either way); the anchor matters
at the STRING layer, which this helper isolates and makes testable.
"""
s = str(value)
return s[:-1] + "+00:00" if s.endswith("Z") else s
def _parse_ts(value: Any) -> datetime:
"""Parse an ISO-8601 timestamp, normalizing a trailing `Z` to `+00:00`
(via `_normalize_trailing_z`).
`make_event` stamps `ts` as `...Z` while `canonical_since()` emits
`...+00:00`; normalizing lets the two compare as equal-instant
datetimes. A lexical string compare would be WRONG — `'+'` 0x2B sorts
before `'Z'` 0x5A, so a `+00:00` ts would sort before an equal-instant
`Z` ts. Raises ValueError/TypeError on missing/malformed input; callers
decide the fail-open policy.
"""
return datetime.fromisoformat(_normalize_trailing_z(value))
def _ts_ge(event_ts: Any, since: str | None) -> bool:
"""Return True if `event_ts >= since`, compared as parsed datetimes.
The arc-scope filter (`--since`) MUST parse the timestamps (via
`_parse_ts`), not string-compare them — see `_parse_ts` for the
format-mismatch rationale.
Fail-open: when `since` is falsy the event is included (no filtering);
when either timestamp is missing or unparseable the event is INCLUDED
(returns True) so a malformed/absent ts is never silently dropped from
a scoped read.
"""
if not since:
return True
try:
return _parse_ts(event_ts) >= _parse_ts(since)
except (ValueError, TypeError):
return True
def _read_events_at(
journal: Path,
event_type: str | None = None,
since: str | None = None,
) -> list[dict[str, Any]]:
"""Shared read implementation for both implicit and explicit APIs.
Reads with `errors="replace"` so a single invalid byte sequence
(e.g., from a botched write or a truncated multibyte character)
substitutes U+FFFD for the bad bytes instead of raising
UnicodeDecodeError. A bad line corrupts at most its own line; every
other event in the file is still returned. Two per-line hazards are
isolated: (1) a line that fails to parse as JSON is dropped by the
`except (json.JSONDecodeError, ValueError)` below; (2) a line that
parses as valid JSON but is NOT a dict (e.g. `[1,2,3]`, `"str"`,
`42`, `null`) is dropped by the `isinstance(event, dict)` guard —
without that guard, `.get()` on a non-dict value raises
AttributeError (not in the except tuple), which would propagate to
the outer `except Exception` and drop the WHOLE file, hiding every
otherwise-valid event behind one bad line.
`since`: when set, only events whose `ts` is >= `since` (inclusive,
parsed via `_ts_ge`) are returned — the arc-scope filter. None/empty
returns the whole journal (single-arc behavior unchanged).
"""
try:
if not journal.exists():
return []
events: list[dict[str, Any]] = []
for line in journal.read_text(
encoding="utf-8", errors="replace"
).splitlines():
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
# A line can be valid JSON yet NOT a dict (e.g. `[1,2,3]`,
# `"str"`, `42`, `null`). `.get()` on such a value raises
# AttributeError — which is NOT in the except tuple below, so it
# would propagate to the outer `except Exception` and drop the
# WHOLE file's events (every event hidden behind one bad line).
# Skip a non-dict line exactly like a malformed one so it
# corrupts at most itself, preserving the per-line isolation the
# docstring promises.
if not isinstance(event, dict):
continue
if event_type and event.get("type") != event_type:
continue
if not _ts_ge(event.get("ts"), since):
continue
events.append(event)
except (json.JSONDecodeError, ValueError):
continue # Skip malformed lines
return events
except Exception:
return []
def read_last_event(
event_type: str,
since: str | None = None,
) -> dict[str, Any] | None:
"""
Read the most recent event of a given type from the current session's journal.
Path is derived implicitly via pact_context.get_session_dir().
Scans lines in reverse for efficiency — returns as soon as the
first (most recent) match is found.
Args:
event_type: Event type to search for.
Returns:
The last matching event dict, or None if not found.
"""
try:
journal = _journal_path()
if journal is None:
# AdvF2 Approach 4: see append_event for rationale. Warns only
# when the path is unavailable AND pact_context was never
# initialized — the canonical "hook forgot to call init()" bug.
if not _pact_context_is_initialized():
print(
"session_journal: read_last_event called before "
"pact_context.init() — returning None (this may "
"indicate a hook missing session_id)",
file=sys.stderr,
)
return None
return _read_last_event_at(journal, event_type, since)
except Exception:
return None
def read_last_event_from(
session_dir: str,
event_type: str,
since: str | None = None,
) -> dict[str, Any] | None:
"""
Read the most recent event of a given type from a specific session's journal.
Used for cross-session reads (resume, CLI) where the caller knows
the session directory path.
Args:
session_dir: Absolute path to the session directory.
event_type: Event type to search for.
Returns:
The last matching event dict, or None if not found.
"""
if not session_dir:
# AdvF2 Approach 4 (parity with implicit API): emit a stderr
# warning before the silent fallback so an unset/empty
# session_dir at the call site surfaces as a visible signal
# rather than a mute None result. The None is preserved so
# callers see the same return contract.
print(
"session_journal: read_last_event_from called with "
"empty session_dir",
file=sys.stderr,
)
return None
journal = _journal_path_from(session_dir)
return _read_last_event_at(journal, event_type, since)
def _scan_lines_for_event(
lines: list[str],
event_type: str,
since: str | None = None,
) -> dict[str, Any] | None:
"""Reverse-iterate decoded lines, returning the first matching event.
Shared by tail-window and full-slurp scan paths. Skips blank lines and
silently drops malformed JSON (symmetric with the pre-optimization
contract: corrupted lines never poison the scan). A line that parses as
valid JSON but is NOT a dict (e.g. `[1,2,3]`, `"str"`, `42`, `null`) is
skipped by the `isinstance(event, dict)` guard — parity with
`_read_events_at`: without it, `.get()` on a non-dict value raises
AttributeError (not in the except tuple), which would propagate to the
caller's outer `except Exception` and abort the whole reverse scan,
making `read_last_event*` return None (e.g. `session_end` would conclude
the session was never paused).
`since`: when set, an event matching `event_type` whose `ts` is < `since`
(parsed via `_ts_ge`) is skipped — the reverse scan then returns the
most recent matching event at/after `since`, or None.
"""
for line in reversed(lines):
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
if not isinstance(event, dict):
continue
if event.get("type") == event_type and _ts_ge(
event.get("ts"), since