|
| 1 | +"""Internal policy for hierarchy-level frame map-reduce execution. |
| 2 | +
|
| 3 | +Users provide compute resources. CodeEntropy keeps scheduling choices such as |
| 4 | +chunk size and in-flight task limits internal so the public configuration remains |
| 5 | +stable and simple. |
| 6 | +""" |
| 7 | + |
| 8 | +from __future__ import annotations |
| 9 | + |
| 10 | +import math |
| 11 | +from dataclasses import dataclass |
| 12 | +from typing import Any |
| 13 | + |
| 14 | + |
| 15 | +@dataclass(frozen=True) |
| 16 | +class ExecutionPolicy: |
| 17 | + """Internal policy for scalable, deterministic frame execution.""" |
| 18 | + |
| 19 | + target_frame_chunks_per_worker: int = 2 |
| 20 | + min_frame_chunk_size: int = 1 |
| 21 | + max_frame_chunk_size: int = 32 |
| 22 | + max_frame_in_flight_multiplier: int = 1 |
| 23 | + |
| 24 | + def requested_worker_count(self, shared_data: dict[str, Any]) -> int: |
| 25 | + """Return the requested worker-process count. |
| 26 | +
|
| 27 | + Args: |
| 28 | + shared_data: Shared workflow data that may contain ``args`` with local Dask |
| 29 | + or HPC worker settings. |
| 30 | +
|
| 31 | + Returns: |
| 32 | + The requested worker count, clamped to at least one. |
| 33 | + """ |
| 34 | + args = shared_data.get("args") |
| 35 | + |
| 36 | + dask_workers = getattr(args, "dask_workers", None) |
| 37 | + if dask_workers is not None: |
| 38 | + return max(1, int(dask_workers)) |
| 39 | + |
| 40 | + if bool(getattr(args, "hpc", False)): |
| 41 | + hpc_nodes = max(1, int(getattr(args, "hpc_nodes", 1) or 1)) |
| 42 | + hpc_processes = max(1, int(getattr(args, "hpc_processes", 1) or 1)) |
| 43 | + return hpc_nodes * hpc_processes |
| 44 | + |
| 45 | + return 1 |
| 46 | + |
| 47 | + def frame_chunk_size(self, shared_data: dict[str, Any], *, n_frames: int) -> int: |
| 48 | + """Choose a deterministic frame chunk size. |
| 49 | +
|
| 50 | + Args: |
| 51 | + shared_data: Shared workflow data used to infer requested worker count. |
| 52 | + n_frames: Number of selected frames for the current run. |
| 53 | +
|
| 54 | + Returns: |
| 55 | + The frame chunk size clamped between the policy minimum and maximum. |
| 56 | + """ |
| 57 | + n_frames = max(1, int(n_frames)) |
| 58 | + workers = self.requested_worker_count(shared_data) |
| 59 | + target_chunks = max(1, workers * int(self.target_frame_chunks_per_worker)) |
| 60 | + chunk_size = math.ceil(n_frames / target_chunks) |
| 61 | + |
| 62 | + return max( |
| 63 | + int(self.min_frame_chunk_size), |
| 64 | + min(int(self.max_frame_chunk_size), int(chunk_size)), |
| 65 | + ) |
| 66 | + |
| 67 | + def max_frame_in_flight_tasks( |
| 68 | + self, |
| 69 | + shared_data: dict[str, Any], |
| 70 | + *, |
| 71 | + n_chunks: int, |
| 72 | + ) -> int: |
| 73 | + """Choose the maximum number of active frame-chunk tasks. |
| 74 | +
|
| 75 | + Args: |
| 76 | + shared_data: Shared workflow data used to infer requested worker count. |
| 77 | + n_chunks: Number of frame chunks available for submission. |
| 78 | +
|
| 79 | + Returns: |
| 80 | + The number of frame-chunk tasks allowed to be active at once. |
| 81 | + """ |
| 82 | + workers = self.requested_worker_count(shared_data) |
| 83 | + max_in_flight = workers * int(self.max_frame_in_flight_multiplier) |
| 84 | + return max(1, min(int(n_chunks), int(max_in_flight))) |
0 commit comments