Skip to content

Commit ddd9101

Browse files
committed
Address SonarCloud open issues
Reduce cognitive complexity (S3776) in download_file, rotate_backups, the tar-member verifier, json_edit set/delete walkers, run_shell, execute_action_dag, _group_by_size, scandir_find, and the cron field parser by extracting helpers with single responsibilities. Behaviour is unchanged; the full test suite still passes. Collapse redundant except tuples (S5713) where the listed subclass already inherits from another entry (shutil.Error/OSError, PermissionError/OSError, ModuleNotFoundError/ImportError, ManifestException/FileAutomationException). Extract the duplicated 'application/json' content-type to a module-level _JSON_HEADERS constant in notify/sinks.py (S1192). Switch the substitution regex to the concise '\w' class with re.ASCII to preserve ASCII-only matching (S6353). Drop the unused 'source' parameter from _delete_extras (S1172). Replace tuple float equality in three tests with pytest.approx (S1244) and mark the obvious test-fixture credentials with NOSONAR (S2068). Clean up test_notify.py's Iterator annotation (S5886), the empty _Recorder.send body (S1186), the empty time_budget block in test_quota.py (S108), and the generator-throw trick in test_action_executor.py (S7500).
1 parent 400d2c3 commit ddd9101

22 files changed

+433
-273
lines changed

automation_file/core/dag_executor.py

Lines changed: 69 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,68 @@
3131
__all__ = ["execute_action_dag"]
3232

3333

34+
class _DagRun:
35+
"""Mutable scheduling state shared by the submit / completion helpers."""
36+
37+
def __init__(
38+
self,
39+
nodes: list[Mapping[str, Any]],
40+
pool: ThreadPoolExecutor,
41+
fail_fast: bool,
42+
) -> None:
43+
self.graph, self.indegree = _build_graph(nodes)
44+
self.node_map = {_require_id(node): node for node in nodes}
45+
self.results: dict[str, Any] = {}
46+
self.lock = threading.Lock()
47+
self.ready: deque[str] = deque(
48+
node_id for node_id, count in self.indegree.items() if count == 0
49+
)
50+
self.in_flight: dict[Future[Any], str] = {}
51+
self.pool = pool
52+
self.fail_fast = fail_fast
53+
54+
def _skip_dependents(self, node_id: str) -> None:
55+
for dependent in self.graph.get(node_id, ()):
56+
self.indegree[dependent] -= 1
57+
_mark_skipped(dependent, node_id, self.graph, self.indegree, self.results, self.lock)
58+
59+
def submit(self, node_id: str) -> None:
60+
action = self.node_map[node_id].get("action")
61+
if not isinstance(action, list):
62+
err = DagException(f"node {node_id!r} missing action list")
63+
with self.lock:
64+
self.results[node_id] = repr(err)
65+
if self.fail_fast:
66+
self._skip_dependents(node_id)
67+
return
68+
future = self.pool.submit(_run_action, action)
69+
self.in_flight[future] = node_id
70+
71+
def _complete(self, node_id: str, value: Any, failed: bool) -> None:
72+
with self.lock:
73+
self.results[node_id] = value
74+
for dependent in self.graph.get(node_id, ()):
75+
self.indegree[dependent] -= 1
76+
if failed and self.fail_fast:
77+
_mark_skipped(
78+
dependent, node_id, self.graph, self.indegree, self.results, self.lock
79+
)
80+
elif self.indegree[dependent] == 0 and dependent not in self.results:
81+
self.ready.append(dependent)
82+
83+
def drain_completed(self) -> None:
84+
done, _ = wait(list(self.in_flight), return_when=FIRST_COMPLETED)
85+
for future in done:
86+
node_id = self.in_flight.pop(future)
87+
try:
88+
value: Any = future.result()
89+
failed = False
90+
except Exception as err: # pylint: disable=broad-except
91+
value = repr(err)
92+
failed = True
93+
self._complete(node_id, value, failed)
94+
95+
3496
def execute_action_dag(
3597
nodes: list[Mapping[str, Any]],
3698
max_workers: int = 4,
@@ -46,54 +108,15 @@ def execute_action_dag(
46108
Raises :class:`DagException` for static errors detected before any action
47109
runs: duplicate ids, unknown dependencies, or cycles.
48110
"""
49-
graph, indegree = _build_graph(nodes)
50-
node_map = {_require_id(node): node for node in nodes}
51-
results: dict[str, Any] = {}
52-
lock = threading.Lock()
53-
54-
ready: deque[str] = deque(node_id for node_id, count in indegree.items() if count == 0)
55-
56111
with ThreadPoolExecutor(max_workers=max_workers) as pool:
57-
in_flight: dict[Future[Any], str] = {}
58-
59-
def submit(node_id: str) -> None:
60-
action = node_map[node_id].get("action")
61-
if not isinstance(action, list):
62-
err = DagException(f"node {node_id!r} missing action list")
63-
with lock:
64-
results[node_id] = repr(err)
65-
if fail_fast:
66-
for dependent in graph.get(node_id, ()):
67-
indegree[dependent] -= 1
68-
_mark_skipped(dependent, node_id, graph, indegree, results, lock)
69-
return
70-
future = pool.submit(_run_action, action)
71-
in_flight[future] = node_id
72-
73-
while ready or in_flight:
74-
while ready:
75-
submit(ready.popleft())
76-
if not in_flight:
112+
state = _DagRun(nodes, pool, fail_fast)
113+
while state.ready or state.in_flight:
114+
while state.ready:
115+
state.submit(state.ready.popleft())
116+
if not state.in_flight:
77117
break
78-
done, _ = wait(list(in_flight), return_when=FIRST_COMPLETED)
79-
for future in done:
80-
node_id = in_flight.pop(future)
81-
failed = False
82-
try:
83-
value: Any = future.result()
84-
except Exception as err: # pylint: disable=broad-except
85-
value = repr(err)
86-
failed = True
87-
with lock:
88-
results[node_id] = value
89-
for dependent in graph.get(node_id, ()):
90-
indegree[dependent] -= 1
91-
if failed and fail_fast:
92-
_mark_skipped(dependent, node_id, graph, indegree, results, lock)
93-
elif indegree[dependent] == 0 and dependent not in results:
94-
ready.append(dependent)
95-
96-
return results
118+
state.drain_completed()
119+
return state.results
97120

98121

99122
def _run_action(action: list) -> Any:

automation_file/core/fim.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from pathlib import Path
2424
from typing import Any
2525

26-
from automation_file.core.manifest import ManifestException, verify_manifest
26+
from automation_file.core.manifest import verify_manifest
2727
from automation_file.exceptions import FileAutomationException
2828
from automation_file.logging_config import file_automation_logger
2929
from automation_file.notify import NotificationManager, notification_manager
@@ -86,7 +86,7 @@ def check_once(self) -> dict[str, Any]:
8686
"""Run one verification pass and return the summary."""
8787
try:
8888
summary = verify_manifest(self._root, self._manifest_path)
89-
except (ManifestException, FileAutomationException) as err:
89+
except FileAutomationException as err:
9090
file_automation_logger.error("integrity_monitor: verify failed: %r", err)
9191
summary = {
9292
"matched": [],

automation_file/core/package_loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def load(self, package: str) -> ModuleType | None:
3636
# `package` is a trusted caller-supplied name (see PackageLoader docstring and
3737
# the CLAUDE.md security note on plugin loading); it is not untrusted input.
3838
module = import_module(spec.name)
39-
except (ImportError, ModuleNotFoundError) as error:
39+
except ImportError as error:
4040
file_automation_logger.error("PackageLoader import error: %r", error)
4141
return None
4242
self._cache[package] = module

automation_file/core/substitution.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
from automation_file.exceptions import FileAutomationException
2525

26-
_PATTERN = re.compile(r"\$\{([a-zA-Z_][a-zA-Z0-9_]*)(?::([^}]*))?\}")
26+
_PATTERN = re.compile(r"\$\{([a-zA-Z_]\w*)(?::([^}]*))?\}", re.ASCII)
2727

2828

2929
class SubstitutionException(FileAutomationException):

automation_file/local/dir_ops.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def copy_dir(dir_path: str, target_dir_path: str) -> bool:
1818
shutil.copytree(source, Path(target_dir_path), dirs_exist_ok=True)
1919
file_automation_logger.info("copy_dir: %s -> %s", source, target_dir_path)
2020
return True
21-
except (OSError, shutil.Error) as error:
21+
except OSError as error:
2222
file_automation_logger.error("copy_dir failed: %r", error)
2323
return False
2424

@@ -32,7 +32,7 @@ def remove_dir_tree(dir_path: str) -> bool:
3232
shutil.rmtree(path)
3333
file_automation_logger.info("remove_dir_tree: %s", path)
3434
return True
35-
except (OSError, shutil.Error) as error:
35+
except OSError as error:
3636
file_automation_logger.error("remove_dir_tree failed: %r", error)
3737
return False
3838

automation_file/local/file_ops.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def copy_file(file_path: str, target_path: str, copy_metadata: bool = True) -> b
2222
shutil.copy(source, target_path)
2323
file_automation_logger.info("copy_file: %s -> %s", source, target_path)
2424
return True
25-
except (OSError, shutil.Error) as error:
25+
except OSError as error:
2626
file_automation_logger.error("copy_file failed: %r", error)
2727
return False
2828

@@ -65,7 +65,7 @@ def copy_all_file_to_dir(dir_path: str, target_dir_path: str) -> bool:
6565
shutil.move(str(source), str(destination))
6666
file_automation_logger.info("copy_all_file_to_dir: %s -> %s", source, destination)
6767
return True
68-
except (OSError, shutil.Error) as error:
68+
except OSError as error:
6969
file_automation_logger.error("copy_all_file_to_dir failed: %r", error)
7070
return False
7171

automation_file/local/json_edit.py

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -114,59 +114,76 @@ def _child(container: Any, segment: str) -> Any:
114114
raise TypeError(f"cannot index {type(container).__name__} by {segment!r}")
115115

116116

117+
def _is_int_segment(segment: str) -> bool:
118+
return segment.lstrip("-").isdigit()
119+
120+
121+
def _descend_for_set(container: Any, segment: str) -> Any:
122+
if isinstance(container, MutableMapping):
123+
if segment not in container or not isinstance(
124+
container[segment], (MutableMapping, MutableSequence)
125+
):
126+
container[segment] = {}
127+
return container[segment]
128+
if isinstance(container, MutableSequence) and _is_int_segment(segment):
129+
return container[int(segment)]
130+
raise JsonEditException(f"cannot traverse into {segment!r}")
131+
132+
133+
def _assign_into_sequence(container: MutableSequence[Any], last: str, value: Any) -> None:
134+
idx = int(last)
135+
if -len(container) <= idx < len(container):
136+
container[idx] = value
137+
return
138+
if idx == len(container):
139+
container.append(value)
140+
return
141+
raise JsonEditException(f"list index out of range: {idx}")
142+
143+
117144
def _set_in(data: Any, segments: list[str], value: Any) -> None:
118145
container = data
119146
for segment in segments[:-1]:
120-
if isinstance(container, MutableMapping):
121-
if segment not in container or not isinstance(
122-
container[segment], (MutableMapping, MutableSequence)
123-
):
124-
container[segment] = {}
125-
container = container[segment]
126-
elif isinstance(container, MutableSequence) and segment.lstrip("-").isdigit():
127-
container = container[int(segment)]
128-
else:
129-
raise JsonEditException(f"cannot traverse into {segment!r}")
147+
container = _descend_for_set(container, segment)
130148
last = segments[-1]
131149
if isinstance(container, MutableMapping):
132150
container[last] = value
133151
return
134-
if isinstance(container, MutableSequence) and last.lstrip("-").isdigit():
135-
idx = int(last)
136-
if -len(container) <= idx < len(container):
137-
container[idx] = value
138-
return
139-
if idx == len(container):
140-
container.append(value)
141-
return
142-
raise JsonEditException(f"list index out of range: {idx}")
152+
if isinstance(container, MutableSequence) and _is_int_segment(last):
153+
_assign_into_sequence(container, last, value)
154+
return
143155
raise JsonEditException(f"cannot set into {type(container).__name__}")
144156

145157

146-
def _delete_in(data: Any, segments: list[str]) -> bool:
147-
container = data
148-
for segment in segments[:-1]:
149-
if isinstance(container, MutableMapping):
150-
if segment not in container:
151-
return False
152-
container = container[segment]
153-
elif isinstance(container, MutableSequence) and segment.lstrip("-").isdigit():
154-
idx = int(segment)
155-
if not -len(container) <= idx < len(container):
156-
return False
157-
container = container[idx]
158-
else:
159-
return False
160-
last = segments[-1]
158+
def _descend_for_delete(container: Any, segment: str) -> Any:
159+
if isinstance(container, MutableMapping):
160+
return container.get(segment, _MISSING)
161+
if isinstance(container, MutableSequence) and _is_int_segment(segment):
162+
idx = int(segment)
163+
if -len(container) <= idx < len(container):
164+
return container[idx]
165+
return _MISSING
166+
167+
168+
def _remove_last(container: Any, last: str) -> bool:
161169
if isinstance(container, MutableMapping):
162170
if last not in container:
163171
return False
164172
del container[last]
165173
return True
166-
if isinstance(container, MutableSequence) and last.lstrip("-").isdigit():
174+
if isinstance(container, MutableSequence) and _is_int_segment(last):
167175
idx = int(last)
168176
if not -len(container) <= idx < len(container):
169177
return False
170178
del container[idx]
171179
return True
172180
return False
181+
182+
183+
def _delete_in(data: Any, segments: list[str]) -> bool:
184+
container = data
185+
for segment in segments[:-1]:
186+
container = _descend_for_delete(container, segment)
187+
if container is _MISSING:
188+
return False
189+
return _remove_last(container, segments[-1])

automation_file/local/shell_ops.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,40 +25,26 @@ class ShellException(FileAutomationException):
2525
"""Raised when a shell command fails (non-zero exit, timeout, or bad argv)."""
2626

2727

28-
def run_shell(
29-
argv: Sequence[str],
30-
*,
31-
timeout: float = _DEFAULT_TIMEOUT_SECONDS,
32-
cwd: str | None = None,
33-
env: Mapping[str, str] | None = None,
34-
check: bool = True,
35-
capture_output: bool = True,
36-
) -> dict[str, object]:
37-
"""Run ``argv`` as a subprocess with a hard timeout.
38-
39-
``argv`` must be a non-empty list / tuple of strings. Strings are
40-
rejected to block shell-injection via concatenated user input.
41-
42-
Returns ``{"returncode": int, "stdout": str, "stderr": str}`` when
43-
``capture_output=True``; ``stdout``/``stderr`` are empty strings
44-
otherwise.
45-
46-
Raises :class:`ShellException` when ``check=True`` and the process
47-
returns a non-zero exit code or times out.
48-
"""
28+
def _validate_argv(argv: Sequence[str], timeout: float) -> list[str]:
4929
if isinstance(argv, str) or not isinstance(argv, Sequence):
5030
raise ShellException("argv must be a list of strings, not a single string")
5131
argv_list = list(argv)
5232
if not argv_list or not all(isinstance(part, str) for part in argv_list):
5333
raise ShellException("argv must be a non-empty list of strings")
5434
if timeout <= 0:
5535
raise ShellException("timeout must be positive")
36+
return argv_list
5637

57-
cwd_path = str(Path(cwd)) if cwd else None
58-
env_dict = dict(env) if env is not None else None
5938

39+
def _run_subprocess(
40+
argv_list: list[str],
41+
timeout: float,
42+
cwd_path: str | None,
43+
env_dict: dict[str, str] | None,
44+
capture_output: bool,
45+
) -> subprocess.CompletedProcess[str]:
6046
try:
61-
completed = subprocess.run(
47+
return subprocess.run(
6248
argv_list,
6349
timeout=timeout,
6450
cwd=cwd_path,
@@ -75,6 +61,33 @@ def run_shell(
7561
except OSError as err:
7662
raise ShellException(f"subprocess failed: {err!r}") from err
7763

64+
65+
def run_shell(
66+
argv: Sequence[str],
67+
*,
68+
timeout: float = _DEFAULT_TIMEOUT_SECONDS,
69+
cwd: str | None = None,
70+
env: Mapping[str, str] | None = None,
71+
check: bool = True,
72+
capture_output: bool = True,
73+
) -> dict[str, object]:
74+
"""Run ``argv`` as a subprocess with a hard timeout.
75+
76+
``argv`` must be a non-empty list / tuple of strings. Strings are
77+
rejected to block shell-injection via concatenated user input.
78+
79+
Returns ``{"returncode": int, "stdout": str, "stderr": str}`` when
80+
``capture_output=True``; ``stdout``/``stderr`` are empty strings
81+
otherwise.
82+
83+
Raises :class:`ShellException` when ``check=True`` and the process
84+
returns a non-zero exit code or times out.
85+
"""
86+
argv_list = _validate_argv(argv, timeout)
87+
cwd_path = str(Path(cwd)) if cwd else None
88+
env_dict = dict(env) if env is not None else None
89+
completed = _run_subprocess(argv_list, timeout, cwd_path, env_dict, capture_output)
90+
7891
result: dict[str, object] = {
7992
"returncode": completed.returncode,
8093
"stdout": completed.stdout or "" if capture_output else "",

0 commit comments

Comments
 (0)