Skip to content

Commit 44d4edf

Browse files
committed
Add support for building CEL content
Add a new build-script along with a new output type that builds the CEL rules into the yaml that can be loaded by Compliance Operator.
1 parent 9a52ff0 commit 44d4edf

5 files changed

Lines changed: 396 additions & 3 deletions

File tree

build-scripts/build_cel_content.py

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
#!/usr/bin/python3
2+
3+
"""
4+
Build CEL content YAML file for compliance scanning.
5+
6+
This module generates a CEL content file containing rules that use
7+
the Common Expression Language (CEL) scanner instead of OVAL checks.
8+
"""
9+
10+
import argparse
11+
import logging
12+
import os
13+
import sys
14+
import yaml
15+
16+
import ssg.build_yaml
17+
import ssg.products
18+
import ssg.utils
19+
20+
MESSAGE_FORMAT = "%(levelname)s: %(message)s"
21+
22+
23+
def parse_args():
24+
parser = argparse.ArgumentParser(
25+
description="Generates CEL content YAML file from resolved rules"
26+
)
27+
parser.add_argument(
28+
"--resolved-rules-dir", required=True,
29+
help="Directory containing resolved rule YAML files. "
30+
"e.g.: ~/scap-security-guide/build/rhel9/rules"
31+
)
32+
parser.add_argument(
33+
"--profiles-dir", required=True,
34+
help="Directory containing resolved profile YAML files. "
35+
"e.g.: ~/scap-security-guide/build/ocp4/profiles"
36+
)
37+
parser.add_argument(
38+
"--product-yaml", required=True,
39+
help="YAML file with information about the product we are building. "
40+
"e.g.: ~/scap-security-guide/build/ocp4/product.yml"
41+
)
42+
parser.add_argument(
43+
"--output", required=True,
44+
help="Output CEL content YAML file. "
45+
"e.g.: ~/scap-security-guide/build/ocp4/ssg-ocp4-cel-content.yaml"
46+
)
47+
parser.add_argument(
48+
"--log",
49+
action="store",
50+
default="WARNING",
51+
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
52+
help="write debug information to the log up to the LOG_LEVEL.",
53+
)
54+
return parser.parse_args()
55+
56+
57+
def setup_logging(log_level_str):
58+
numeric_level = getattr(logging, log_level_str.upper(), None)
59+
if not isinstance(numeric_level, int):
60+
raise ValueError("Invalid log level: {}".format(log_level_str))
61+
logging.basicConfig(format=MESSAGE_FORMAT, level=numeric_level)
62+
63+
64+
def load_cel_rules(rules_dir):
65+
"""
66+
Load all rules that use CEL scanner.
67+
68+
Args:
69+
rules_dir: Directory containing resolved rule JSON files
70+
71+
Returns:
72+
dict: Dictionary of rule_id -> rule object for CEL rules
73+
74+
Raises:
75+
ValueError: If a CEL rule is missing required fields
76+
"""
77+
cel_rules = {}
78+
79+
if not os.path.isdir(rules_dir):
80+
return cel_rules
81+
82+
for rule_file in os.listdir(rules_dir):
83+
rule_path = os.path.join(rules_dir, rule_file)
84+
try:
85+
rule = ssg.build_yaml.Rule.from_compiled_json(rule_path)
86+
87+
# Check if this is a CEL rule
88+
if hasattr(rule, 'scannerType') and rule.scannerType == 'CEL':
89+
# Validate required CEL fields
90+
rule_name = rule_id_to_name(rule.id_)
91+
92+
if not hasattr(rule, 'expression') or not rule.expression:
93+
raise ValueError(
94+
f"CEL rule '{rule_name}' in {rule_file} has no expression"
95+
)
96+
97+
if not hasattr(rule, 'inputs') or not rule.inputs:
98+
raise ValueError(
99+
f"CEL rule '{rule_name}' in {rule_file} has no inputs"
100+
)
101+
102+
cel_rules[rule.id_] = rule
103+
except ssg.build_yaml.DocumentationNotComplete:
104+
# Skip documentation-incomplete rules in non-debug builds
105+
continue
106+
except ValueError:
107+
# Re-raise validation errors
108+
raise
109+
except Exception as e:
110+
logging.warning("Failed to load rule from %s: %s", rule_file, e)
111+
continue
112+
113+
return cel_rules
114+
115+
116+
def load_profiles(profiles_dir, cel_rule_ids):
117+
"""
118+
Load profiles that have scannerType: CEL.
119+
120+
Args:
121+
profiles_dir: Directory containing profile YAML files
122+
cel_rule_ids: Set of CEL rule IDs
123+
124+
Returns:
125+
list: List of CEL profile objects
126+
127+
Raises:
128+
ValueError: If a CEL profile is missing required fields
129+
"""
130+
profiles = []
131+
132+
if not os.path.isdir(profiles_dir):
133+
return profiles
134+
135+
for profile_file in os.listdir(profiles_dir):
136+
profile_path = os.path.join(profiles_dir, profile_file)
137+
try:
138+
profile = ssg.build_yaml.Profile.from_compiled_json(profile_path)
139+
140+
# Only load profiles with scannerType: CEL
141+
if hasattr(profile, 'scannerType') and profile.scannerType == 'CEL':
142+
# Validate required CEL profile fields
143+
profile_name = rule_id_to_name(profile.id_)
144+
145+
if not hasattr(profile, 'selected') or not profile.selected:
146+
raise ValueError(
147+
f"CEL profile '{profile_name}' in {profile_file} has no rules"
148+
)
149+
150+
profiles.append(profile)
151+
except ValueError:
152+
# Re-raise validation errors
153+
raise
154+
except Exception as e:
155+
logging.warning("Failed to load profile from %s: %s", profile_file, e)
156+
continue
157+
158+
return profiles
159+
160+
161+
def rule_id_to_name(rule_id):
162+
"""Convert rule_id with underscores to name with hyphens."""
163+
return rule_id.replace('_', '-')
164+
165+
166+
def extract_controls_from_references(references):
167+
"""
168+
Extract controls from references dict, keeping original keys.
169+
170+
Args:
171+
references: Dictionary of references like {"cis@ocp4": ["1.2.3"], "nist": ["AC-6"]}
172+
173+
Returns:
174+
dict: Controls dictionary grouped by framework
175+
"""
176+
if not references:
177+
return {}
178+
179+
controls = {}
180+
for ref_key, ref_values in references.items():
181+
# Keep the original key format (e.g., "cis@ocp4", "nist")
182+
if isinstance(ref_values, list):
183+
controls[ref_key] = ref_values
184+
elif isinstance(ref_values, str):
185+
controls[ref_key] = [ref_values]
186+
187+
return controls
188+
189+
190+
def rule_to_cel_dict(rule):
191+
"""
192+
Convert a Rule object to CEL content dictionary format.
193+
194+
Args:
195+
rule: Rule object
196+
197+
Returns:
198+
dict: Rule in CEL content format
199+
"""
200+
cel_rule = {
201+
'id': rule.id_, # Keep underscores for id
202+
'name': rule_id_to_name(rule.id_), # Convert to hyphens for name
203+
'title': rule.title,
204+
'description': rule.description,
205+
'rationale': rule.rationale,
206+
'severity': rule.severity,
207+
'checkType': rule.checkType if hasattr(rule, 'checkType') and rule.checkType else 'Platform',
208+
}
209+
210+
# Add instructions from ocil field
211+
if hasattr(rule, 'ocil') and rule.ocil:
212+
cel_rule['instructions'] = rule.ocil
213+
214+
# Add failureReason if present
215+
if hasattr(rule, 'failureReason') and rule.failureReason:
216+
cel_rule['failureReason'] = rule.failureReason
217+
218+
# Add CEL expression
219+
if hasattr(rule, 'expression') and rule.expression:
220+
cel_rule['expression'] = rule.expression
221+
222+
# Add inputs
223+
if hasattr(rule, 'inputs') and rule.inputs:
224+
cel_rule['inputs'] = rule.inputs
225+
226+
# Add controls from references
227+
controls = extract_controls_from_references(rule.references)
228+
if controls:
229+
cel_rule['controls'] = controls
230+
231+
return cel_rule
232+
233+
234+
def profile_to_cel_dict(profile, cel_rule_ids):
235+
"""
236+
Convert a Profile object to CEL content dictionary format.
237+
238+
Args:
239+
profile: Profile object
240+
cel_rule_ids: Set of CEL rule IDs to include
241+
242+
Returns:
243+
dict: Profile in CEL content format
244+
"""
245+
# Filter selected rules to only include CEL rules
246+
profile_cel_rules = [rule_id_to_name(rid) for rid in profile.selected if rid in cel_rule_ids]
247+
248+
if not profile_cel_rules:
249+
return None
250+
251+
cel_profile = {
252+
'id': profile.id_,
253+
'name': rule_id_to_name(profile.id_),
254+
'title': profile.title,
255+
'description': profile.description,
256+
'productType': 'Platform', # Default for OCP4
257+
'rules': sorted(profile_cel_rules)
258+
}
259+
260+
return cel_profile
261+
262+
263+
def generate_cel_content(cel_rules, profiles):
264+
"""
265+
Generate the complete CEL content structure.
266+
267+
Args:
268+
cel_rules: Dictionary of CEL rules
269+
profiles: List of profiles containing CEL rules
270+
271+
Returns:
272+
dict: Complete CEL content structure
273+
274+
Raises:
275+
ValueError: If duplicate rule names found or profile references unknown rules
276+
"""
277+
cel_rule_ids = set(cel_rules.keys())
278+
279+
# Generate rules section and check for duplicates
280+
cel_rules_list = []
281+
rule_names_seen = set()
282+
for rule_id in sorted(cel_rules.keys()):
283+
rule = cel_rules[rule_id]
284+
cel_rule = rule_to_cel_dict(rule)
285+
286+
# Check for duplicate rule names
287+
rule_name = cel_rule['name']
288+
if rule_name in rule_names_seen:
289+
raise ValueError(f"duplicate rule name: {rule_name}")
290+
rule_names_seen.add(rule_name)
291+
292+
cel_rules_list.append(cel_rule)
293+
294+
# Generate profiles section and validate rule references
295+
cel_profiles = []
296+
for profile in profiles:
297+
# First validate that all selected rules exist in CEL rules
298+
profile_name = rule_id_to_name(profile.id_)
299+
for rule_id in profile.selected:
300+
if rule_id not in cel_rule_ids:
301+
rule_name = rule_id_to_name(rule_id)
302+
raise ValueError(
303+
f"profile '{profile_name}' references unknown rule '{rule_name}'"
304+
)
305+
306+
cel_profile = profile_to_cel_dict(profile, cel_rule_ids)
307+
if cel_profile:
308+
cel_profiles.append(cel_profile)
309+
310+
# Build the complete structure
311+
content = {
312+
'profiles': cel_profiles,
313+
'rules': cel_rules_list
314+
}
315+
316+
return content
317+
318+
319+
def main():
320+
args = parse_args()
321+
setup_logging(args.log)
322+
323+
# Load CEL rules
324+
cel_rules = load_cel_rules(args.resolved_rules_dir)
325+
326+
if not cel_rules:
327+
content = {'profiles': [], 'rules': []}
328+
else:
329+
# Load profiles
330+
profiles = load_profiles(args.profiles_dir, set(cel_rules.keys()))
331+
332+
# Generate CEL content
333+
content = generate_cel_content(cel_rules, profiles)
334+
335+
# Write output YAML
336+
os.makedirs(os.path.dirname(args.output), exist_ok=True)
337+
338+
with open(args.output, 'w') as f:
339+
yaml.dump(content, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
340+
341+
342+
if __name__ == "__main__":
343+
main()

cmake/SSGCommon.cmake

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,20 @@ macro(ssg_build_sds PRODUCT)
528528
endif()
529529
endmacro()
530530

531+
# Build CEL content YAML for products that support CEL scanning
532+
macro(ssg_build_cel_content PRODUCT)
533+
add_custom_command(
534+
OUTPUT "${CMAKE_BINARY_DIR}/${PRODUCT}-cel-content.yaml"
535+
COMMAND env "PYTHONPATH=$ENV{PYTHONPATH}" "${Python_EXECUTABLE}" "${SSG_BUILD_SCRIPTS}/build_cel_content.py" --resolved-rules-dir "${CMAKE_CURRENT_BINARY_DIR}/rules" --profiles-dir "${CMAKE_CURRENT_BINARY_DIR}/profiles" --product-yaml "${CMAKE_CURRENT_BINARY_DIR}/product.yml" --output "${CMAKE_BINARY_DIR}/${PRODUCT}-cel-content.yaml"
536+
DEPENDS ${PRODUCT}-compile-all "${CMAKE_CURRENT_BINARY_DIR}/ssg_build_compile_all-${PRODUCT}"
537+
COMMENT "[${PRODUCT}-content] generating CEL content YAML"
538+
)
539+
add_custom_target(
540+
generate-${PRODUCT}-cel-content.yaml
541+
DEPENDS "${CMAKE_BINARY_DIR}/${PRODUCT}-cel-content.yaml"
542+
)
543+
endmacro()
544+
531545
# Build per-product HTML guides to see the status of various profiles and
532546
# rules in the generated XCCDF guides.
533547
macro(ssg_build_html_guides PRODUCT)
@@ -740,6 +754,11 @@ macro(ssg_build_product PRODUCT)
740754
ssg_build_xml_final(${PRODUCT} ocil)
741755
ssg_build_sds(${PRODUCT})
742756

757+
# Build CEL content if enabled for this product
758+
if(PRODUCT_CEL_ENABLED)
759+
ssg_build_cel_content(${PRODUCT})
760+
endif()
761+
743762
define_validate_product("${PRODUCT}")
744763
if("${VALIDATE_PRODUCT}" OR "${FORCE_VALIDATE_EVERYTHING}")
745764
add_test(
@@ -764,6 +783,15 @@ macro(ssg_build_product PRODUCT)
764783

765784
add_dependencies(zipfile generate-ssg-${PRODUCT}-ds.xml)
766785

786+
# Add CEL content to dependencies if enabled
787+
if(PRODUCT_CEL_ENABLED)
788+
add_dependencies(
789+
${PRODUCT}-content
790+
generate-${PRODUCT}-cel-content.yaml
791+
)
792+
add_dependencies(zipfile generate-${PRODUCT}-cel-content.yaml)
793+
endif()
794+
767795
if("${PRODUCT_ANSIBLE_REMEDIATION_ENABLED}" AND SSG_ANSIBLE_PLAYBOOKS_ENABLED)
768796
ssg_build_profile_playbooks(${PRODUCT})
769797
add_custom_target(

products/ocp4/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ endif()
55

66
set(PRODUCT "ocp4")
77
set(PRODUCT_REMEDIATION_LANGUAGES "ignition;kubernetes")
8+
set(PRODUCT_CEL_ENABLED TRUE)
89

910
ssg_build_product(${PRODUCT})

0 commit comments

Comments
 (0)