Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/comptext_v7/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Replay graph deterministic core helpers."""

from .evidence import ReplayGraphDiff, compare_edges
from .ordering import find_order_violations
from .reachability import has_path, reachable_nodes
from .topology import adjacency_map, nodes_from_edges, normalize_edges

__all__ = [
"ReplayGraphDiff",
"adjacency_map",
"compare_edges",
"find_order_violations",
"has_path",
"nodes_from_edges",
"normalize_edges",
"reachable_nodes",
]
49 changes: 49 additions & 0 deletions src/comptext_v7/graph/evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Deterministic evidence helpers for replay graph diffs."""

from __future__ import annotations

from collections.abc import Iterable
from dataclasses import dataclass

from .topology import normalize_edges, nodes_from_edges


Edge = tuple[str, str]


@dataclass(frozen=True)
class ReplayGraphDiff:
"""Immutable graph-diff evidence for replay validation."""

missing_edges: tuple[Edge, ...]
added_edges: tuple[Edge, ...]
missing_nodes: tuple[str, ...]
added_nodes: tuple[str, ...]


def compare_edges(
original_edges: Iterable[Edge],
replay_edges: Iterable[Edge],
) -> ReplayGraphDiff:
"""Compare original and replay edges and return deterministic diff evidence."""
original = normalize_edges(original_edges)
replay = normalize_edges(replay_edges)

original_set = set(original)
replay_set = set(replay)

missing_edges = tuple(sorted(original_set - replay_set))
added_edges = tuple(sorted(replay_set - original_set))

original_nodes = set(nodes_from_edges(original))
replay_nodes = set(nodes_from_edges(replay))

missing_nodes = tuple(sorted(original_nodes - replay_nodes))
added_nodes = tuple(sorted(replay_nodes - original_nodes))
Comment on lines +38 to +42

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling nodes_from_edges here is redundant because it internally calls normalize_edges, which has already been performed on lines 29-30. Since original and replay are already normalized tuples of edges, you can extract the nodes more efficiently using a set comprehension.

Suggested change
original_nodes = set(nodes_from_edges(original))
replay_nodes = set(nodes_from_edges(replay))
missing_nodes = tuple(sorted(original_nodes - replay_nodes))
added_nodes = tuple(sorted(replay_nodes - original_nodes))
original_nodes = {n for edge in original for n in edge}
replay_nodes = {n for edge in replay for n in edge}
missing_nodes = tuple(sorted(original_nodes - replay_nodes))
added_nodes = tuple(sorted(replay_nodes - original_nodes))


return ReplayGraphDiff(
missing_edges=missing_edges,
added_edges=added_edges,
missing_nodes=missing_nodes,
added_nodes=added_nodes,
)
25 changes: 25 additions & 0 deletions src/comptext_v7/graph/ordering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Deterministic ordering checks for replay sequences."""

from __future__ import annotations

from collections.abc import Iterable, Sequence


Edge = tuple[str, str]


def find_order_violations(
sequence: Sequence[str],
required_before: Iterable[Edge],
) -> tuple[Edge, ...]:
"""Return lexicographically sorted order violations."""
positions = {node: index for index, node in enumerate(sequence)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation overwrites the index for duplicate nodes in the sequence, effectively using the last occurrence. In replay sequences where events or states might repeat, this can lead to incorrect violation reports. For example, in a sequence ['A', 'B', 'A'], a requirement that A precedes B would be flagged as a violation because the last A (index 2) is after B (index 1). Using the first occurrence is generally more appropriate for causal ordering checks.

Suggested change
positions = {node: index for index, node in enumerate(sequence)}
positions: dict[str, int] = {}
for index, node in enumerate(sequence):
if node not in positions:
positions[node] = index

violations: set[Edge] = set()

for before, after in required_before:
if before not in positions or after not in positions:
continue
if positions[before] > positions[after]:
violations.add((before, after))

return tuple(sorted(violations))
39 changes: 39 additions & 0 deletions src/comptext_v7/graph/reachability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Deterministic reachability helpers for directed graphs."""

from __future__ import annotations

from collections import deque
from collections.abc import Iterable

from .topology import adjacency_map


Edge = tuple[str, str]


def reachable_nodes(edges: Iterable[Edge], start: str) -> tuple[str, ...]:
"""Return sorted reachable nodes from start.

The start node is excluded unless it is reachable through a cycle.
"""
adjacency = adjacency_map(edges)
Comment on lines +14 to +19

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Reconstructing the adjacency map on every call to reachable_nodes (and has_path) is inefficient, especially for multiple queries on the same graph. Consider allowing an optional pre-computed adjacency map to be passed in to avoid the $O(E \log E)$ overhead of normalization and map construction.

Suggested change
def reachable_nodes(edges: Iterable[Edge], start: str) -> tuple[str, ...]:
"""Return sorted reachable nodes from start.
The start node is excluded unless it is reachable through a cycle.
"""
adjacency = adjacency_map(edges)
def reachable_nodes(
edges: Iterable[Edge],
start: str,
adjacency: dict[str, tuple[str, ...]] | None = None,
) -> tuple[str, ...]:
"""Return sorted reachable nodes from start.
The start node is excluded unless it is reachable through a cycle.
"""
if adjacency is None:
adjacency = adjacency_map(edges)

queue: deque[str] = deque(adjacency.get(start, ()))
seen: set[str] = set()

while queue:
node = queue.popleft()
if node in seen:
continue
seen.add(node)
for neighbor in adjacency.get(node, ()): # deterministic ordering from adjacency_map
if neighbor not in seen:
queue.append(neighbor)

if start in seen:
return tuple(sorted(seen))
return tuple(sorted(node for node in seen if node != start))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The conditional check node != start is redundant. If start is not in seen (no cycle), then the condition is always true for all elements in seen. If start is in seen (cycle), it is already handled by the if start in seen block on line 32. Simplifying this to return tuple(sorted(seen)) improves clarity.

Suggested change
return tuple(sorted(node for node in seen if node != start))
return tuple(sorted(seen))



def has_path(edges: Iterable[Edge], start: str, target: str) -> bool:
"""Return True when a directed path exists from start to target."""
return target in reachable_nodes(edges, start)
40 changes: 40 additions & 0 deletions src/comptext_v7/graph/topology.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Deterministic topology helpers for replay relation graphs."""

from __future__ import annotations

from collections.abc import Iterable


Edge = tuple[str, str]


def normalize_edges(edges: Iterable[Edge]) -> tuple[Edge, ...]:
"""Return sorted unique edges and reject self-loops."""
unique_edges: set[Edge] = set()
for source, target in edges:
if source == target:
raise ValueError(f"self-loop edge is not allowed: {source!r} -> {target!r}")
unique_edges.add((source, target))
return tuple(sorted(unique_edges))


def nodes_from_edges(edges: Iterable[Edge]) -> tuple[str, ...]:
"""Return sorted unique node ids derived from edges."""
normalized = normalize_edges(edges)
nodes = {source for source, _ in normalized}
nodes.update(target for _, target in normalized)
return tuple(sorted(nodes))


def adjacency_map(edges: Iterable[Edge]) -> dict[str, tuple[str, ...]]:
"""Return deterministic adjacency lists keyed by node id."""
normalized = normalize_edges(edges)
adjacency: dict[str, list[str]] = {}
for source, target in normalized:
adjacency.setdefault(source, []).append(target)
adjacency.setdefault(target, [])

return {
node: tuple(sorted(neighbors))
for node, neighbors in sorted(adjacency.items())
}
85 changes: 85 additions & 0 deletions tests/test_replay_graph_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import annotations

import pytest

from src.comptext_v7.graph import (
ReplayGraphDiff,
adjacency_map,
compare_edges,
find_order_violations,
has_path,
nodes_from_edges,
normalize_edges,
reachable_nodes,
)


def test_normalize_edges_removes_duplicates_and_sorts() -> None:
edges = [("b", "c"), ("a", "b"), ("b", "c")]
assert normalize_edges(edges) == (("a", "b"), ("b", "c"))


def test_normalize_edges_rejects_self_loop() -> None:
with pytest.raises(ValueError):
normalize_edges([("n1", "n1")])


def test_nodes_from_edges_returns_sorted_nodes() -> None:
edges = [("b", "c"), ("a", "b")]
assert nodes_from_edges(edges) == ("a", "b", "c")


def test_adjacency_map_is_deterministic() -> None:
edges = [("b", "c"), ("a", "b"), ("a", "c")]
assert adjacency_map(edges) == {
"a": ("b", "c"),
"b": ("c",),
"c": (),
}


def test_find_order_violations_detects_reversed_and_sorts() -> None:
sequence = ["c", "b", "a"]
required = [("a", "b"), ("b", "c"), ("x", "a")]
assert find_order_violations(sequence, required) == (("a", "b"), ("b", "c"))


def test_find_order_violations_ignores_missing_nodes() -> None:
sequence = ["a", "b"]
required = [("x", "b"), ("a", "y")]
assert find_order_violations(sequence, required) == ()


def test_reachable_nodes_and_path_on_connected_graph() -> None:
edges = [("a", "b"), ("b", "d"), ("a", "c")]
assert reachable_nodes(edges, "a") == ("b", "c", "d")
assert has_path(edges, "a", "d") is True
assert has_path(edges, "c", "d") is False


def test_reachable_nodes_handles_disconnected_graph() -> None:
edges = [("a", "b"), ("x", "y")]
assert reachable_nodes(edges, "a") == ("b",)
assert reachable_nodes(edges, "z") == ()


def test_reachable_nodes_includes_start_when_cycle_exists() -> None:
edges = [("a", "b"), ("b", "a")]
assert reachable_nodes(edges, "a") == ("a", "b")
assert has_path(edges, "a", "a") is True


def test_compare_edges_detects_edge_and_node_diffs_deterministically() -> None:
original = [("a", "b"), ("b", "c"), ("d", "e")]
replay = [("a", "b"), ("b", "d"), ("x", "y")]

diff = compare_edges(original, replay)

assert diff == ReplayGraphDiff(
missing_edges=(("b", "c"), ("d", "e")),
added_edges=(("b", "d"), ("x", "y")),
missing_nodes=("c", "e"),
added_nodes=("x", "y"),
)
assert isinstance(diff.missing_edges, tuple)
assert isinstance(diff.added_nodes, tuple)
Loading