Skip to content

Commit 93e8984

Browse files
nemarjancursoragent
authored andcommitted
[env_op_images] Add unit tests for verify_pulled_report_crio
Add focused unit coverage for CRI-O pulled report verification, including successful enrichment, cross-node evidence accounting, and failure when no logs are provided. Reuse shared test utilities with a local fallback so tests run in both collection-style and local environments. Co-authored-by: Cursor <cursoragent@cursor.com> Signed-off-by: nemarjan <nemarjan@redhat.com>
1 parent 03e101e commit 93e8984

4 files changed

Lines changed: 318 additions & 1 deletion

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../../../../../../plugins/modules/verify_pulled_report_crio.py
File renamed without changes.

roles/env_op_images/tasks/verify_pulled_report_crio.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106

107107
- name: Enrich pulled report with CRI-O evidence
108108
when: _verify_crio_log_files.matched | int > 0
109-
verify_pulled_report_crio:
109+
cifmw.general.verify_pulled_report_crio:
110110
report_path: "{{ cifmw_env_op_images_pulled_report_path }}"
111111
log_dir: "{{ cifmw_env_op_images_crio_logs_dir }}"
112112
output_path: "{{ cifmw_env_op_images_verified_report_path }}"
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
# Copyright: (c) 2025, Red Hat
2+
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
from __future__ import absolute_import, division, print_function
9+
10+
import os
11+
import tempfile
12+
13+
import yaml
14+
15+
from ansible_collections.cifmw.general.tests.unit.utils import (
16+
AnsibleExitJson,
17+
AnsibleFailJson,
18+
ModuleBaseTestCase,
19+
set_module_args,
20+
)
21+
from ansible_collections.cifmw.general.plugins.modules import (
22+
verify_pulled_report_crio,
23+
)
24+
25+
26+
class TestVerifyPulledReportCrio(ModuleBaseTestCase):
27+
def test_enriches_report_and_counts_cross_node(self):
28+
"""
29+
GIVEN a pulled-images report with two digest entries across two nodes
30+
and CRI-O logs showing each image pulled on its own node
31+
WHEN the module processes the report against the logs
32+
THEN it enriches every image with log evidence, identifies trusted
33+
mirrors, and reports zero cross-node entries
34+
"""
35+
report_data = {
36+
"summary": {
37+
"mirror_rules": [
38+
{"mirror": "mirror.registry.example:5000/ns"},
39+
{"mirror": "other.example/ns"},
40+
]
41+
},
42+
"images": [
43+
{
44+
"image_id": (
45+
"quay.io/demo/app@sha256:"
46+
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
47+
),
48+
"node": "node-a",
49+
},
50+
{
51+
"image_id": (
52+
"quay.io/demo/other@sha256:"
53+
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
54+
),
55+
"node": "node-b",
56+
},
57+
{"image_id": "no-digest-here", "node": "node-a"},
58+
],
59+
}
60+
61+
with tempfile.TemporaryDirectory() as td:
62+
report_path = os.path.join(td, "pulled_images_report.yaml")
63+
output_path = os.path.join(td, "verified.yaml")
64+
log_a = os.path.join(td, "node-a.crio.log")
65+
log_b = os.path.join(td, "node-b.crio.log")
66+
67+
with open(report_path, "w") as f:
68+
yaml.safe_dump(
69+
report_data, f, default_flow_style=False, sort_keys=False
70+
)
71+
72+
with open(log_a, "w") as f:
73+
f.write(
74+
'level=info msg="Pulled image: '
75+
"mirror.registry.example:5000/ns/app@sha256:"
76+
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
77+
'"\n'
78+
)
79+
80+
with open(log_b, "w") as f:
81+
f.write(
82+
'level=info msg="Pulled image: '
83+
"quay.io/demo/other@sha256:"
84+
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
85+
'"\n'
86+
)
87+
88+
set_module_args(
89+
dict(
90+
report_path=report_path,
91+
output_path=output_path,
92+
log_paths=[log_a, log_b],
93+
)
94+
)
95+
96+
with self.assertRaises(AnsibleExitJson) as rst:
97+
verify_pulled_report_crio.run_module()
98+
99+
result = rst.exception.args[0]
100+
self.assertTrue(result["changed"])
101+
self.assertEqual(result["log_files"], 2)
102+
self.assertEqual(result["entries_with_digest"], 2)
103+
self.assertEqual(result["cross_node_entries"], 0)
104+
self.assertIn("node-a", result["nodes_with_evidence"])
105+
self.assertIn("node-b", result["nodes_with_evidence"])
106+
self.assertIn("mirror.registry.example:5000", result["trusted_mirrors"])
107+
108+
with open(output_path, "r") as f:
109+
enriched = yaml.safe_load(f)
110+
111+
img0 = enriched["images"][0]
112+
self.assertEqual(img0["log_evidence_node"], "node-a")
113+
self.assertEqual(
114+
img0["log_evidence_uri"],
115+
"mirror.registry.example:5000/ns/app",
116+
)
117+
self.assertEqual(img0["node_verified_image_origin"], "mirror")
118+
119+
img1 = enriched["images"][1]
120+
self.assertEqual(img1["log_evidence_node"], "node-b")
121+
self.assertEqual(img1["log_evidence_uri"], "quay.io/demo/other")
122+
self.assertEqual(img1["node_verified_image_origin"], "source")
123+
124+
def test_cross_node_match_increments_counter(self):
125+
"""
126+
GIVEN a pulled-images report listing an image on node-a
127+
and a CRI-O log that records the same digest on node-b
128+
WHEN the module processes the report against the logs
129+
THEN the cross_node_entries counter is incremented and the
130+
evidence node is set to the log's node (node-b)
131+
"""
132+
report_data = {
133+
"summary": {"mirror_rules": [{"mirror": "mirror.example/ns"}]},
134+
"images": [
135+
{
136+
"image_id": (
137+
"quay.io/demo/app@sha256:"
138+
"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
139+
),
140+
"node": "node-a",
141+
}
142+
],
143+
}
144+
145+
with tempfile.TemporaryDirectory() as td:
146+
report_path = os.path.join(td, "pulled_images_report.yaml")
147+
output_path = os.path.join(td, "verified.yaml")
148+
log_b = os.path.join(td, "node-b.crio.log")
149+
150+
with open(report_path, "w") as f:
151+
yaml.safe_dump(
152+
report_data, f, default_flow_style=False, sort_keys=False
153+
)
154+
155+
with open(log_b, "w") as f:
156+
f.write(
157+
'level=info msg="Pulled image: '
158+
"mirror.example/ns/app@sha256:"
159+
"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
160+
'"\n'
161+
)
162+
163+
set_module_args(
164+
dict(
165+
report_path=report_path,
166+
output_path=output_path,
167+
log_paths=[log_b],
168+
)
169+
)
170+
171+
with self.assertRaises(AnsibleExitJson) as rst:
172+
verify_pulled_report_crio.run_module()
173+
174+
result = rst.exception.args[0]
175+
self.assertEqual(result["entries_with_digest"], 1)
176+
self.assertEqual(result["cross_node_entries"], 1)
177+
178+
with open(output_path, "r") as f:
179+
enriched = yaml.safe_load(f)
180+
181+
img0 = enriched["images"][0]
182+
self.assertEqual(img0["log_evidence_node"], "node-b")
183+
self.assertEqual(img0["node_verified_image_origin"], "mirror")
184+
185+
def test_fails_when_no_log_files(self):
186+
"""
187+
GIVEN an empty list of CRI-O log paths
188+
WHEN the module is invoked
189+
THEN it fails with an error indicating no log files were provided
190+
"""
191+
set_module_args(
192+
dict(
193+
report_path="/tmp/in.yaml",
194+
output_path="/tmp/out.yaml",
195+
log_paths=[],
196+
)
197+
)
198+
199+
with self.assertRaises(AnsibleFailJson) as rst:
200+
verify_pulled_report_crio.run_module()
201+
202+
self.assertIn("No CRI-O log files", rst.exception.args[0]["msg"])
203+
204+
def test_fails_when_log_file_unreadable(self):
205+
"""
206+
GIVEN a log_paths entry that points to a non-existent file
207+
WHEN the module tries to open it
208+
THEN it fails with an error mentioning the file path
209+
"""
210+
with tempfile.TemporaryDirectory() as td:
211+
report_path = os.path.join(td, "report.yaml")
212+
with open(report_path, "w") as f:
213+
yaml.safe_dump(
214+
{"summary": {}, "images": []},
215+
f,
216+
default_flow_style=False,
217+
)
218+
219+
missing_log = os.path.join(td, "ghost.crio.log")
220+
set_module_args(
221+
dict(
222+
report_path=report_path,
223+
output_path=os.path.join(td, "out.yaml"),
224+
log_paths=[missing_log],
225+
)
226+
)
227+
228+
with self.assertRaises(AnsibleFailJson) as rst:
229+
verify_pulled_report_crio.run_module()
230+
231+
self.assertIn("Cannot read CRI-O log file", rst.exception.args[0]["msg"])
232+
self.assertIn("ghost.crio.log", rst.exception.args[0]["msg"])
233+
234+
def test_fails_when_report_unreadable(self):
235+
"""
236+
GIVEN a report_path that does not exist on disk
237+
WHEN the module tries to open it
238+
THEN it fails with an error mentioning the report path
239+
"""
240+
with tempfile.TemporaryDirectory() as td:
241+
log_path = os.path.join(td, "node-a.crio.log")
242+
with open(log_path, "w") as f:
243+
f.write("")
244+
245+
set_module_args(
246+
dict(
247+
report_path=os.path.join(td, "no_such_report.yaml"),
248+
output_path=os.path.join(td, "out.yaml"),
249+
log_paths=[log_path],
250+
)
251+
)
252+
253+
with self.assertRaises(AnsibleFailJson) as rst:
254+
verify_pulled_report_crio.run_module()
255+
256+
self.assertIn("Cannot read report", rst.exception.args[0]["msg"])
257+
258+
def test_fails_when_report_has_invalid_yaml(self):
259+
"""
260+
GIVEN a report file whose contents are not valid YAML
261+
WHEN the module tries to parse it
262+
THEN it fails with an error about invalid YAML
263+
"""
264+
with tempfile.TemporaryDirectory() as td:
265+
report_path = os.path.join(td, "bad.yaml")
266+
with open(report_path, "w") as f:
267+
f.write("{{: not: valid: yaml: [}")
268+
269+
log_path = os.path.join(td, "node-a.crio.log")
270+
with open(log_path, "w") as f:
271+
f.write("")
272+
273+
set_module_args(
274+
dict(
275+
report_path=report_path,
276+
output_path=os.path.join(td, "out.yaml"),
277+
log_paths=[log_path],
278+
)
279+
)
280+
281+
with self.assertRaises(AnsibleFailJson) as rst:
282+
verify_pulled_report_crio.run_module()
283+
284+
self.assertIn("Invalid YAML in report", rst.exception.args[0]["msg"])
285+
286+
def test_fails_when_report_root_is_not_a_dict(self):
287+
"""
288+
GIVEN a report file whose YAML root is a list instead of a mapping
289+
WHEN the module checks the structure
290+
THEN it fails with an error about the root type
291+
"""
292+
with tempfile.TemporaryDirectory() as td:
293+
report_path = os.path.join(td, "list.yaml")
294+
with open(report_path, "w") as f:
295+
yaml.safe_dump(
296+
["item1", "item2"],
297+
f,
298+
default_flow_style=False,
299+
)
300+
301+
log_path = os.path.join(td, "node-a.crio.log")
302+
with open(log_path, "w") as f:
303+
f.write("")
304+
305+
set_module_args(
306+
dict(
307+
report_path=report_path,
308+
output_path=os.path.join(td, "out.yaml"),
309+
log_paths=[log_path],
310+
)
311+
)
312+
313+
with self.assertRaises(AnsibleFailJson) as rst:
314+
verify_pulled_report_crio.run_module()
315+
316+
self.assertIn("Report root must be a mapping", rst.exception.args[0]["msg"])

0 commit comments

Comments
 (0)