forked from egraphs-good/egglog-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_report.py
More file actions
121 lines (102 loc) · 4.39 KB
/
Copy pathrun_report.py
File metadata and controls
121 lines (102 loc) · 4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import timedelta
from . import bindings
from .declarations import CommandDecl, Declarations
from .egraph_state import EGraphState
from .pretty import pretty_decl
def _format_rule_key(decls: Declarations, key: CommandDecl) -> str:
return pretty_decl(decls, key)
@dataclass
class RuleReport:
plan: bindings.Plan | None
search_and_apply_time: timedelta
num_matches: int
@classmethod
def _from_bindings(cls, report: bindings.RuleReport) -> RuleReport:
return cls(
plan=report.plan,
search_and_apply_time=report.search_and_apply_time,
num_matches=report.num_matches,
)
@dataclass
class RuleSetReport:
changed: bool
rule_reports: dict[CommandDecl, list[RuleReport]]
search_and_apply_time: timedelta
merge_time: timedelta
_decls: Declarations = field(repr=False, default=None)
@classmethod
def _from_bindings(
cls, report: bindings.RuleSetReport, translate_key: Callable[[str], CommandDecl], decls: Declarations
) -> RuleSetReport:
return cls(
changed=report.changed,
rule_reports={
translate_key(k): [RuleReport._from_bindings(rr) for rr in v] for k, v in report.rule_reports.items()
},
search_and_apply_time=report.search_and_apply_time,
merge_time=report.merge_time,
_decls=decls,
)
def __repr__(self) -> str:
rule_reports_str = {_format_rule_key(self._decls, k): v for k, v in self.rule_reports.items()}
return (
f"RuleSetReport(changed={self.changed}, "
f"rule_reports={rule_reports_str}, "
f"search_and_apply_time={self.search_and_apply_time}, "
f"merge_time={self.merge_time})"
)
@dataclass
class IterationReport:
rule_set_report: RuleSetReport
rebuild_time: timedelta
@classmethod
def _from_bindings(
cls, report: bindings.IterationReport, translate_key: Callable[[str], CommandDecl], decls: Declarations
) -> IterationReport:
return cls(
rule_set_report=RuleSetReport._from_bindings(report.rule_set_report, translate_key, decls),
rebuild_time=report.rebuild_time,
)
@dataclass
class RunReport:
"""Python-friendly wrapper around bindings.RunReport."""
iterations: list[IterationReport]
updated: bool
search_and_apply_time_per_rule: dict[CommandDecl, timedelta]
num_matches_per_rule: dict[CommandDecl, int]
search_and_apply_time_per_ruleset: dict[str, timedelta]
merge_time_per_ruleset: dict[str, timedelta]
rebuild_time_per_ruleset: dict[str, timedelta]
_decls: Declarations = field(repr=False, default=None)
def __repr__(self) -> str:
time_per_rule = {_format_rule_key(self._decls, k): v for k, v in self.search_and_apply_time_per_rule.items()}
matches_per_rule = {_format_rule_key(self._decls, k): v for k, v in self.num_matches_per_rule.items()}
return (
f"RunReport(iterations={self.iterations}, "
f"updated={self.updated}, "
f"search_and_apply_time_per_rule={time_per_rule}, "
f"num_matches_per_rule={matches_per_rule}, "
f"search_and_apply_time_per_ruleset={self.search_and_apply_time_per_ruleset}, "
f"merge_time_per_ruleset={self.merge_time_per_ruleset}, "
f"rebuild_time_per_ruleset={self.rebuild_time_per_ruleset})"
)
@classmethod
def _from_bindings(cls, report: bindings.RunReport, state: EGraphState) -> RunReport:
return cls(
iterations=[
IterationReport._from_bindings(it, state.translate_rule_key, state.__egg_decls__)
for it in report.iterations
],
updated=report.updated,
search_and_apply_time_per_rule={
state.translate_rule_key(k): v for k, v in report.search_and_apply_time_per_rule.items()
},
num_matches_per_rule={state.translate_rule_key(k): v for k, v in report.num_matches_per_rule.items()},
search_and_apply_time_per_ruleset=report.search_and_apply_time_per_ruleset,
merge_time_per_ruleset=report.merge_time_per_ruleset,
rebuild_time_per_ruleset=report.rebuild_time_per_ruleset,
_decls=state.__egg_decls__,
)