Skip to content

Commit 7354453

Browse files
committed
docs: add google style doc-strings and update auto-docs in relation to branch #358
1 parent c42d3af commit 7354453

17 files changed

Lines changed: 480 additions & 231 deletions

CodeEntropy/levels/execution/__init__.py

Whitespace-only changes.

CodeEntropy/levels/execution/chunks.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,18 @@ def chunk_frame_indices(
77
frame_indices: list[int],
88
chunk_size: int,
99
) -> list[tuple[int, ...]]:
10-
"""Split selected frame indices into deterministic fixed-size chunks."""
10+
"""Split frame indices into deterministic fixed-size chunks.
11+
12+
Args:
13+
frame_indices: Ordered selected frame indices to split.
14+
chunk_size: Maximum number of frame indices per chunk.
15+
16+
Returns:
17+
A list of frame-index tuples preserving input order.
18+
19+
Raises:
20+
ValueError: If ``chunk_size`` is less than one.
21+
"""
1122
if chunk_size < 1:
1223
raise ValueError("chunk_size must be >= 1")
1324

CodeEntropy/levels/execution/policy.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,15 @@ class ExecutionPolicy:
2222
max_frame_in_flight_multiplier: int = 1
2323

2424
def requested_worker_count(self, shared_data: dict[str, Any]) -> int:
25-
"""Return the worker-process count requested by the current run."""
25+
"""Return the requested worker-process count.
26+
27+
Args:
28+
shared_data: Shared workflow data that may contain ``args`` with local Dask
29+
or HPC worker settings.
30+
31+
Returns:
32+
The requested worker count, clamped to at least one.
33+
"""
2634
args = shared_data.get("args")
2735

2836
dask_workers = getattr(args, "dask_workers", None)
@@ -37,7 +45,15 @@ def requested_worker_count(self, shared_data: dict[str, Any]) -> int:
3745
return 1
3846

3947
def frame_chunk_size(self, shared_data: dict[str, Any], *, n_frames: int) -> int:
40-
"""Choose a deterministic frame chunk size for the current run."""
48+
"""Choose a deterministic frame chunk size.
49+
50+
Args:
51+
shared_data: Shared workflow data used to infer requested worker count.
52+
n_frames: Number of selected frames for the current run.
53+
54+
Returns:
55+
The frame chunk size clamped between the policy minimum and maximum.
56+
"""
4157
n_frames = max(1, int(n_frames))
4258
workers = self.requested_worker_count(shared_data)
4359
target_chunks = max(1, workers * int(self.target_frame_chunks_per_worker))
@@ -54,7 +70,15 @@ def max_frame_in_flight_tasks(
5470
*,
5571
n_chunks: int,
5672
) -> int:
57-
"""Choose how many frame chunks may be active at once."""
73+
"""Choose the maximum number of active frame-chunk tasks.
74+
75+
Args:
76+
shared_data: Shared workflow data used to infer requested worker count.
77+
n_chunks: Number of frame chunks available for submission.
78+
79+
Returns:
80+
The number of frame-chunk tasks allowed to be active at once.
81+
"""
5882
workers = self.requested_worker_count(shared_data)
5983
max_in_flight = workers * int(self.max_frame_in_flight_multiplier)
6084
return max(1, min(int(n_chunks), int(max_in_flight)))

CodeEntropy/levels/execution/reducers.py

Lines changed: 107 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,30 @@
88

99

1010
def stable_keys(mapping: dict[Any, Any]) -> list[Any]:
11-
"""Return mapping keys in a deterministic order across Python processes."""
11+
"""Return mapping keys in deterministic order.
12+
13+
Args:
14+
mapping: Mapping whose keys should be ordered independently of process hash
15+
randomisation.
16+
17+
Returns:
18+
A list of keys sorted by key type name and representation.
19+
"""
1220
return sorted(mapping.keys(), key=lambda key: (type(key).__name__, repr(key)))
1321

1422

1523
def merge_means(old_mean: Any, old_n: int, new_mean: Any, new_n: int) -> Any:
16-
"""Merge two means with their sample counts."""
24+
"""Merge two running means using their sample counts.
25+
26+
Args:
27+
old_mean: Existing mean value, or ``None`` if no samples have been seen.
28+
old_n: Number of samples represented by ``old_mean``.
29+
new_mean: New mean value to merge.
30+
new_n: Number of samples represented by ``new_mean``.
31+
32+
Returns:
33+
The merged mean. If ``new_n`` is zero or negative, ``old_mean`` is returned.
34+
"""
1735
if new_n <= 0:
1836
return old_mean
1937
if old_mean is None or old_n <= 0:
@@ -23,7 +41,16 @@ def merge_means(old_mean: Any, old_n: int, new_mean: Any, new_n: int) -> Any:
2341

2442

2543
def incremental_mean(old: Any, new: Any, n: int) -> Any:
26-
"""Compute an incremental mean."""
44+
"""Update a running mean with one new sample.
45+
46+
Args:
47+
old: Existing running mean, or ``None`` for the first sample.
48+
new: New sample to incorporate.
49+
n: One-based sample count after adding ``new``.
50+
51+
Returns:
52+
The updated running mean.
53+
"""
2754
if old is None:
2855
return new.copy() if hasattr(new, "copy") else new
2956
return old + (new - old) / float(n)
@@ -34,7 +61,12 @@ class NeighborReducer:
3461

3562
@staticmethod
3663
def initialise(shared_data: dict[str, Any]) -> None:
37-
"""Initialise parent-side neighbour reduction state."""
64+
"""Initialise parent-side neighbour accumulators.
65+
66+
Args:
67+
shared_data: Shared workflow data containing ``groups``. The method writes
68+
``neighbor_totals`` and ``neighbor_samples``.
69+
"""
3870
shared_data["neighbor_totals"] = {
3971
group_id: 0 for group_id in shared_data["groups"].keys()
4072
}
@@ -47,7 +79,13 @@ def reduce_frame_output(
4779
shared_data: dict[str, Any],
4880
frame_neighbors: dict[int, tuple[int, int]] | None,
4981
) -> None:
50-
"""Reduce one frame's neighbour-count payload."""
82+
"""Merge one frame's neighbour-count payload.
83+
84+
Args:
85+
shared_data: Shared workflow data containing neighbour total/sample
86+
accumulators.
87+
frame_neighbors: Optional mapping of group id to ``(count, sample_count)``.
88+
"""
5189
if frame_neighbors is None:
5290
return
5391

@@ -64,7 +102,13 @@ def merge_chunk_partial(
64102
neighbor_totals: dict[int, int],
65103
neighbor_samples: dict[int, int],
66104
) -> None:
67-
"""Merge chunk-level neighbour totals/samples into parent accumulators."""
105+
"""Merge chunk-level neighbour totals and samples.
106+
107+
Args:
108+
shared_data: Shared workflow data containing neighbour accumulators.
109+
neighbor_totals: Mapping of group id to additive neighbour totals.
110+
neighbor_samples: Mapping of group id to additive sample counts.
111+
"""
68112
totals = shared_data.get("neighbor_totals")
69113
samples = shared_data.get("neighbor_samples")
70114
if totals is None or samples is None:
@@ -79,7 +123,13 @@ def merge_chunk_partial(
79123

80124
@staticmethod
81125
def finalise(shared_data: dict[str, Any]) -> None:
82-
"""Convert reduced neighbour totals into average neighbour counts."""
126+
"""Compute average neighbour counts from reduced totals.
127+
128+
Args:
129+
shared_data: Shared workflow data containing ``groups``,
130+
``neighbor_totals``, and ``neighbor_samples``. The method writes
131+
``neighbors``.
132+
"""
83133
neighbors = {}
84134
for group_id in stable_keys(shared_data["groups"]):
85135
sample_count = shared_data["neighbor_samples"].get(group_id, 0)
@@ -100,7 +150,13 @@ def reduce_frame_output(
100150
shared_data: dict[str, Any],
101151
frame_out: dict[str, Any],
102152
) -> None:
103-
"""Reduce one frame's covariance outputs into shared running means."""
153+
"""Reduce one frame covariance payload into parent accumulators.
154+
155+
Args:
156+
shared_data: Shared workflow data containing covariance accumulators.
157+
frame_out: Frame covariance payload with force, torque, and optional
158+
force-torque sections.
159+
"""
104160
self._reduce_force_and_torque(shared_data, frame_out)
105161
self._reduce_forcetorque(shared_data, frame_out)
106162

@@ -109,7 +165,12 @@ def merge_chunk_partial(
109165
shared_data: dict[str, Any],
110166
partial: CovarianceChunkPartial,
111167
) -> None:
112-
"""Merge a compact worker covariance partial into parent accumulators."""
168+
"""Merge a worker covariance partial into parent accumulators.
169+
170+
Args:
171+
shared_data: Shared workflow data containing covariance accumulators.
172+
partial: Compact covariance partial returned by a worker frame chunk.
173+
"""
113174
self._merge_force_and_torque_partial(shared_data, partial)
114175
self._merge_forcetorque_partial(shared_data, partial)
115176

@@ -118,7 +179,14 @@ def reduce_frame_map_output(
118179
shared_data: dict[str, Any],
119180
frame_out: dict[str, Any],
120181
) -> None:
121-
"""Reduce one serial frame's complete MAP output into parent shared_data."""
182+
"""Reduce a complete serial MAP output.
183+
184+
Args:
185+
shared_data: Shared workflow data containing covariance and neighbour
186+
accumulators.
187+
frame_out: MAP output containing optional ``covariance`` and ``neighbors``
188+
entries.
189+
"""
122190
covariance = frame_out.get("covariance")
123191
if covariance is not None:
124192
self.reduce_frame_output(shared_data, covariance)
@@ -132,6 +200,13 @@ def _merge_force_and_torque_partial(
132200
shared_data: dict[str, Any],
133201
partial: CovarianceChunkPartial,
134202
) -> None:
203+
"""Merge chunk force and torque means into parent accumulators.
204+
205+
Args:
206+
shared_data: Shared workflow data containing force/torque accumulators,
207+
frame counts, and ``group_id_to_index``.
208+
partial: Worker covariance partial with force, torque, and count mappings.
209+
"""
135210
f_cov = shared_data["force_covariances"]
136211
t_cov = shared_data["torque_covariances"]
137212
counts = shared_data["frame_counts"]
@@ -183,6 +258,13 @@ def _merge_forcetorque_partial(
183258
shared_data: dict[str, Any],
184259
partial: CovarianceChunkPartial,
185260
) -> None:
261+
"""Merge chunk force-torque block means into parent accumulators.
262+
263+
Args:
264+
shared_data: Shared workflow data containing force-torque accumulators,
265+
force-torque counts, and ``group_id_to_index``.
266+
partial: Worker covariance partial with force-torque matrices and counts.
267+
"""
186268
ft_cov = shared_data["forcetorque_covariances"]
187269
ft_counts = shared_data["forcetorque_counts"]
188270
gid2i = shared_data["group_id_to_index"]
@@ -210,6 +292,13 @@ def _reduce_force_and_torque(
210292
shared_data: dict[str, Any],
211293
frame_out: dict[str, Any],
212294
) -> None:
295+
"""Reduce frame force and torque matrices into running means.
296+
297+
Args:
298+
shared_data: Shared workflow data containing force/torque accumulators,
299+
frame counts, and ``group_id_to_index``.
300+
frame_out: Frame covariance payload with ``force`` and ``torque`` sections.
301+
"""
213302
f_cov = shared_data["force_covariances"]
214303
t_cov = shared_data["torque_covariances"]
215304
counts = shared_data["frame_counts"]
@@ -266,6 +355,14 @@ def _reduce_forcetorque(
266355
shared_data: dict[str, Any],
267356
frame_out: dict[str, Any],
268357
) -> None:
358+
"""Reduce frame force-torque matrices into running means.
359+
360+
Args:
361+
shared_data: Shared workflow data containing force-torque accumulators,
362+
force-torque counts, and ``group_id_to_index``.
363+
frame_out: Frame covariance payload that may contain a ``forcetorque``
364+
section.
365+
"""
269366
if "forcetorque" not in frame_out:
270367
return
271368

CodeEntropy/levels/execution/scheduler.py

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ def __init__(
3535
policy: ExecutionPolicy,
3636
universe_operations: Any | None = None,
3737
) -> None:
38+
"""Initialise the frame scheduler.
39+
40+
Args:
41+
frame_dag: Built or buildable frame-local DAG used for serial execution.
42+
policy: Internal execution policy for chunking and in-flight limits.
43+
universe_operations: Optional universe-operation adapter forwarded to worker
44+
frame graphs.
45+
"""
3846
self._frame_dag = frame_dag
3947
self._policy = policy
4048
self._universe_operations = universe_operations
@@ -47,7 +55,17 @@ def execute(
4755
frame_indices: list[int],
4856
progress: _RichProgressSink | None = None,
4957
) -> None:
50-
"""Execute frame-local MAP work and reduce it into ``shared_data``."""
58+
"""Execute frame-local MAP work and reduce outputs.
59+
60+
Args:
61+
shared_data: Shared workflow data containing serial or Dask execution
62+
inputs.
63+
frame_indices: Ordered selected frame indices to execute.
64+
progress: Optional progress sink used for reporting frame-stage progress.
65+
66+
Raises:
67+
RuntimeError: If Dask execution is requested but unavailable or incomplete.
68+
"""
5169
task: TaskID | None = None
5270
if progress is not None:
5371
task = progress.add_task(
@@ -84,7 +102,14 @@ def _run_serial(
84102
progress: _RichProgressSink | None,
85103
task: TaskID | None,
86104
) -> None:
87-
"""Execute frame-local MAP work serially and reduce immediately."""
105+
"""Execute frame MAP work serially.
106+
107+
Args:
108+
shared_data: Shared workflow data mutated by parent-side reducers.
109+
frame_indices: Ordered frame indices to process.
110+
progress: Optional progress sink.
111+
task: Optional progress task identifier.
112+
"""
88113
neighbor_helper = Neighbors()
89114

90115
for frame_index in frame_indices:
@@ -111,7 +136,21 @@ def _run_dask(
111136
progress: _RichProgressSink | None,
112137
task: TaskID | None,
113138
) -> None:
114-
"""Execute frame-chunk MAP tasks with bounded deterministic reduction."""
139+
"""Execute frame MAP work using bounded Dask futures.
140+
141+
Args:
142+
shared_data: Shared workflow data mutated by parent-side reducers.
143+
frame_indices: Ordered frame indices to process.
144+
client: Dask distributed client-like object.
145+
progress: Optional progress sink.
146+
task: Optional progress task identifier.
147+
148+
Raises:
149+
RuntimeError: If ``dask.distributed`` is unavailable or the number of
150+
reduced frames does not match the selected frame count.
151+
Exception: Propagates worker or Dask client errors after cancelling active
152+
futures.
153+
"""
115154
try:
116155
from distributed import wait
117156
except ImportError as exc:
@@ -139,6 +178,12 @@ def _run_dask(
139178
pending_results: dict[int, FrameChunkResult] = {}
140179

141180
def submit_next() -> bool:
181+
"""Submit the next frame-chunk task if one is available.
182+
183+
Returns:
184+
``True`` if a task was submitted, otherwise ``False`` when all tasks
185+
have already been submitted.
186+
"""
142187
nonlocal submitted
143188
try:
144189
frame_task = next(frame_task_iter)
@@ -157,6 +202,11 @@ def submit_next() -> bool:
157202
return True
158203

159204
def reduce_ready_results() -> None:
205+
"""Reduce completed frame chunks in deterministic chunk-index order.
206+
207+
Mutates enclosing scheduler state by consuming pending results, advancing
208+
the next expected reduce index, and updating the completed-frame count.
209+
"""
160210
nonlocal completed, next_reduce_index
161211
while next_reduce_index in pending_results:
162212
chunk_result = pending_results.pop(next_reduce_index)
@@ -232,7 +282,15 @@ def _make_frame_chunk_tasks(
232282
shared_data: dict[str, Any],
233283
frame_indices: list[int],
234284
) -> list[FrameChunkTask]:
235-
"""Build explicit frame-chunk MAP tasks."""
285+
"""Build frame-chunk task descriptors.
286+
287+
Args:
288+
shared_data: Shared workflow data used by the execution policy.
289+
frame_indices: Ordered selected frame indices to split into chunks.
290+
291+
Returns:
292+
A list of ``FrameChunkTask`` descriptors with deterministic chunk indices.
293+
"""
236294
chunk_size = self._policy.frame_chunk_size(
237295
shared_data,
238296
n_frames=len(frame_indices),

0 commit comments

Comments
 (0)