Skip to content

Commit ddd8c4c

Browse files
committed
Merge resource state results into managing minion and add tests
For state functions (state.apply, state.highstate, state.sls, etc.) run against a wildcard glob, resource jobs are now executed inline by the managing minion and their results folded into a single response block, producing one Summary section instead of one per resource. Key changes: - salt/utils/minions.py: add _MERGE_RESOURCE_FUNS; check_minions accepts fun= and skips resource augmentation for merge-mode functions so the master does not wait for separate per-resource responses - salt/master.py: pass fun= to check_minions at publish time - salt/minion.py: add _MERGE_RESOURCE_FUNS and _prefix_resource_state_key (refactored to @staticmethod); _handle_payload skips separate resource dispatch for merge-mode functions; _thread_return executes resources inline and merges prefixed state keys into the managing minion's ret, reporting Result: False for unsupported functions and missing loaders - salt/modules/sshresource_state.py: highstate() returns a no_|-states state dict (Result: False) when the top file has no match; _exec_state_pkg recovers valid state dicts from SSHCommandExecutionError instead of re-raising and losing the result Tests: - test_minions_resources.py: merge-fun skips augmentation, non-merge and no-fun still augment, compound targets unaffected - test_minion_resources.py: _MERGE_RESOURCE_FUNS membership and sync invariant, _prefix_resource_state_key correctness and fallback, merge block no-loader/unsupported/dict/string branches - test_sshresource_state.py (new): empty-chunks no-top-file return, SSHCommandExecutionError recovery and re-raise cases, normal envelope path
1 parent bd79674 commit ddd8c4c

7 files changed

Lines changed: 1223 additions & 186 deletions

File tree

salt/master.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,20 +1654,21 @@ def _mine_flush(self, load):
16541654

16551655
def _register_resources(self, load):
16561656
"""
1657-
Store the resource list for a minion in the ``minion_resources`` cache
1658-
bank. Called by the minion on startup via ``cmd: "_register_resources"``
1657+
Update the flat resource index for a minion and persist it to the
1658+
cache. Called by the minion on startup via ``cmd: "_register_resources"``
16591659
so that the master knows which resource IDs each minion manages.
16601660
1661-
This allows :meth:`salt.utils.minions.CkMinions._augment_with_resources`
1662-
to include resource IDs when expanding glob / non-compound targets, making
1663-
``salt '*' test.ping`` return results for both the minion and its resources.
1661+
Uses :func:`salt.utils.minions._update_resource_index` which atomically
1662+
updates both the in-process index (so this worker sees the change
1663+
immediately) and the single flat ``resource_index`` cache file (so
1664+
other workers pick it up within their TTL window).
16641665
"""
16651666
load = self.__verify_load(load, ("id", "resources"))
16661667
if load is False:
16671668
return {}
16681669
if self.opts.get("minion_data_cache", True):
1669-
self.masterapi.cache.store(
1670-
"minion_resources", load["id"], load["resources"]
1670+
salt.utils.minions._update_resource_index(
1671+
self.masterapi.cache, load["id"], load["resources"]
16711672
)
16721673
log.debug(
16731674
"Registered resources for minion '%s': %s",
@@ -2366,7 +2367,10 @@ async def publish(self, clear_load):
23662367
delimiter = extra.get("delimiter", DEFAULT_TARGET_DELIM)
23672368

23682369
_res = self.ckminions.check_minions(
2369-
clear_load["tgt"], clear_load.get("tgt_type", "glob"), delimiter
2370+
clear_load["tgt"],
2371+
clear_load.get("tgt_type", "glob"),
2372+
delimiter,
2373+
fun=clear_load.get("fun"),
23702374
)
23712375
minions = _res.get("minions", list())
23722376
missing = _res.get("missing", list())

salt/minion.py

Lines changed: 167 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2868,6 +2868,129 @@ def _thread_return(cls, minion_instance, opts, data):
28682868
ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
28692869
ret["out"] = "nested"
28702870

2871+
# -------------------------------------------------------------------
2872+
# Merge-mode: for state functions the managing minion runs each
2873+
# resource's function inline here and folds the results into its own
2874+
# state dict. The operator then sees ONE combined block + ONE Summary
2875+
# section instead of a separate block per resource.
2876+
# -------------------------------------------------------------------
2877+
if (
2878+
not data.get("resource_target")
2879+
and data.get("fun") in cls._MERGE_RESOURCE_FUNS
2880+
and data.get("resource_targets")
2881+
and isinstance(ret.get("return"), dict)
2882+
):
2883+
import salt.loader.context as _loader_ctx # noqa: PLC0415
2884+
2885+
_prefix_state_key = minion_instance._prefix_resource_state_key
2886+
2887+
run_num_base = (
2888+
max(
2889+
(
2890+
v.get("__run_num__", 0)
2891+
for v in ret["return"].values()
2892+
if isinstance(v, dict)
2893+
),
2894+
default=0,
2895+
)
2896+
+ 1
2897+
)
2898+
2899+
for resource in data["resource_targets"]:
2900+
rid = resource["id"]
2901+
rtype = resource["type"]
2902+
resource_loader = getattr(
2903+
minion_instance, "resource_loaders", {}
2904+
).get(rtype)
2905+
if resource_loader is None:
2906+
ret["return"][f"no_|-{rid}_|-{rid}_|-None"] = {
2907+
"result": False,
2908+
"comment": (
2909+
f"No resource loader for type '{rtype}'. "
2910+
"Ensure the resource module exists."
2911+
),
2912+
"name": rid,
2913+
"changes": {},
2914+
"__run_num__": run_num_base,
2915+
}
2916+
run_num_base += 1
2917+
if ret.get("retcode") == salt.defaults.exitcodes.EX_OK:
2918+
ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
2919+
continue
2920+
2921+
if function_name not in resource_loader:
2922+
# Function not implemented for this resource type —
2923+
# same message the separate-job path would return.
2924+
resource_return = (
2925+
f"Function '{function_name}' is not supported for "
2926+
f"resource type '{rtype}'. Implement it in a "
2927+
f"'{rtype}resource_*' execution module."
2928+
)
2929+
else:
2930+
token = _loader_ctx.resource_ctxvar.set(resource)
2931+
try:
2932+
resource_return = minion_instance._execute_job_function(
2933+
function_name,
2934+
function_args,
2935+
executors,
2936+
opts,
2937+
data,
2938+
functions=resource_loader,
2939+
)
2940+
except Exception as exc: # pylint: disable=broad-except
2941+
log.error(
2942+
"Inline resource execution for '%s' raised: %s",
2943+
rid,
2944+
exc,
2945+
exc_info=True,
2946+
)
2947+
resource_return = (
2948+
f"ERROR running {function_name} for '{rid}': {exc}"
2949+
)
2950+
finally:
2951+
_loader_ctx.resource_ctxvar.reset(token)
2952+
2953+
if isinstance(resource_return, dict):
2954+
for state_id, state_val in resource_return.items():
2955+
if isinstance(state_val, dict):
2956+
entry = dict(state_val)
2957+
entry["__run_num__"] = run_num_base
2958+
else:
2959+
entry = {
2960+
"result": True,
2961+
"comment": str(state_val),
2962+
"name": f"[{rid}]",
2963+
"changes": {},
2964+
"__run_num__": run_num_base,
2965+
}
2966+
run_num_base += 1
2967+
ret["return"][_prefix_state_key(state_id, rid)] = entry
2968+
r_retcode = resource_loader.pack["__context__"].get(
2969+
"retcode", 0
2970+
)
2971+
if (
2972+
r_retcode
2973+
and ret.get("retcode") == salt.defaults.exitcodes.EX_OK
2974+
):
2975+
ret["retcode"] = r_retcode
2976+
else:
2977+
# String result means the resource couldn't fulfill the
2978+
# operation (e.g. "not supported" for dummy resources).
2979+
# Mark as False — the state was NOT applied, so reporting
2980+
# True would silently mask unactioned resources.
2981+
ret["return"][f"no_|-{rid}_|-{rid}_|-None"] = {
2982+
"result": False,
2983+
"comment": str(resource_return),
2984+
"name": rid,
2985+
"changes": {},
2986+
"__run_num__": run_num_base,
2987+
}
2988+
run_num_base += 1
2989+
if ret.get("retcode") == salt.defaults.exitcodes.EX_OK:
2990+
ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
2991+
2992+
ret["success"] = ret.get("retcode") == salt.defaults.exitcodes.EX_OK
2993+
28712994
if isinstance(ret["return"], dict) and ret["return"].get("__no_return__"):
28722995
# This is used to suppress the return for queued jobs
28732996
# The job will be executed later and will return then
@@ -4379,14 +4502,20 @@ async def _handle_payload(self, payload):
43794502
if load.get("minion_is_target", True):
43804503
await self._handle_decoded_payload(load)
43814504

4382-
for resource in load.get("resource_targets", []):
4383-
resource_load = dict(load)
4384-
resource_load["resource_target"] = resource
4385-
# Flag so _handle_decoded_payload_impl can bypass JID
4386-
# deduplication — resource jobs share the parent JID by
4387-
# design but are independent executions.
4388-
resource_load["resource_job"] = True
4389-
await self._handle_decoded_payload(resource_load)
4505+
# For merge-mode functions (state.apply etc.) resources are
4506+
# executed inline inside _thread_return and folded into the
4507+
# managing minion's own response. Dispatching them as
4508+
# separate jobs would send duplicate responses the master is
4509+
# no longer waiting for.
4510+
if load.get("fun") not in self._MERGE_RESOURCE_FUNS:
4511+
for resource in load.get("resource_targets", []):
4512+
resource_load = dict(load)
4513+
resource_load["resource_target"] = resource
4514+
# Flag so _handle_decoded_payload_impl can bypass JID
4515+
# deduplication — resource jobs share the parent JID by
4516+
# design but are independent executions.
4517+
resource_load["resource_job"] = True
4518+
await self._handle_decoded_payload(resource_load)
43904519

43914520
elif self.opts["zmq_filtering"]:
43924521
# In the filtering enabled case, we'd like to know when minion sees something it shouldn't
@@ -4473,6 +4602,36 @@ def _is_pure_resource_target(self, load):
44734602
}
44744603
)
44754604

4605+
# Functions where resource results are merged into the managing minion's
4606+
# own response rather than dispatched as independent jobs. This produces
4607+
# ONE combined block + Summary section per managing minion instead of
4608+
# separate blocks per resource, matching how any other minion looks.
4609+
_MERGE_RESOURCE_FUNS = frozenset(
4610+
{
4611+
"state.apply",
4612+
"state.highstate",
4613+
"state.sls",
4614+
"state.sls_id",
4615+
"state.single",
4616+
}
4617+
)
4618+
4619+
@staticmethod
4620+
def _prefix_resource_state_key(sid, rid):
4621+
"""Re-label the ID/name components of a state result key with rid.
4622+
4623+
Key format: {module}_|-{id}_|-{name}_|-{function}
4624+
Only comps[1] and comps[2] (id and name) are prefixed so the
4625+
highstate formatter still reads {comps[0]}.{comps[3]} correctly,
4626+
preserving ``Function: pkg.installed`` while showing ``ID: node1 curl``.
4627+
"""
4628+
parts = sid.split("_|-", 3)
4629+
if len(parts) == 4:
4630+
parts[1] = f"{rid} {parts[1]}"
4631+
parts[2] = f"{rid} {parts[2]}"
4632+
return "_|-".join(parts)
4633+
return f"no_|-{rid}_|-{rid}_|-None"
4634+
44764635
def _resolve_resource_targets(self, load):
44774636
"""
44784637
Return the list of per-resource dicts ``{"id": ..., "type": ...}`` that

salt/modules/sshresource_state.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,24 @@ def highstate(test=None, **kwargs):
281281
if not isinstance(chunk, dict):
282282
return chunks_or_errors
283283

284+
if not chunks_or_errors:
285+
# Top file has no match for this resource ID — no SSH round-trip needed.
286+
# Return a state dict using the same key format salt uses for a regular
287+
# minion's "No Top file" entry so the merged output is consistent.
288+
rid = _resource_id()
289+
return {
290+
"no_|-states_|-states_|-None": {
291+
"result": False,
292+
"comment": (
293+
f"No Top file or master_tops data matches found for"
294+
f" resource '{rid}'."
295+
),
296+
"name": "states",
297+
"changes": {},
298+
"__run_num__": 0,
299+
}
300+
}
301+
284302
file_refs = salt.client.ssh.state.lowstate_file_refs(
285303
chunks_or_errors,
286304
_merge_extra_filerefs(
@@ -436,16 +454,28 @@ def _exec_state_pkg(opts, trans_tar, test):
436454
except OSError:
437455
pass
438456

439-
# parse_ret returns data["local"] = {"jid": ..., "return": {states}, "retcode": N}
440-
# The minion dispatcher expects the state dict directly (not the envelope).
441-
envelope = salt.client.ssh.wrapper.parse_ret(stdout, stderr, retcode)
457+
# parse_ret raises SSHCommandExecutionError on any non-zero retcode, even
458+
# when the remote ran states and produced a valid result dict (e.g. some
459+
# states failed → retcode 2). Catch that case and surface the result dict
460+
# normally so operators see the full state tree rather than raw JSON.
461+
try:
462+
envelope = salt.client.ssh.wrapper.parse_ret(stdout, stderr, retcode)
463+
except salt.client.ssh.wrapper.SSHCommandExecutionError as exc:
464+
local = (exc.parsed or {}).get("local", {})
465+
if isinstance(local.get("return"), dict):
466+
ret = local["return"]
467+
__context__["retcode"] = local.get( # pylint: disable=undefined-variable
468+
"retcode", salt.defaults.exitcodes.EX_STATE_FAILURE
469+
)
470+
return ret
471+
raise
472+
442473
if isinstance(envelope, dict) and "return" in envelope:
443474
ret = envelope["return"]
444-
# Propagate non-zero retcode into context so caller can signal failure.
445475
remote_retcode = envelope.get("retcode", 0)
446476
if remote_retcode:
447-
__context__["retcode"] = (
448-
remote_retcode # pylint: disable=undefined-variable
477+
__context__["retcode"] = ( # pylint: disable=undefined-variable
478+
remote_retcode
449479
)
450480
return ret
451481
return envelope

0 commit comments

Comments
 (0)