diff --git a/src/seclang_parser/errors.py b/src/seclang_parser/errors.py index 8c019b9..57b24a8 100644 --- a/src/seclang_parser/errors.py +++ b/src/seclang_parser/errors.py @@ -43,3 +43,18 @@ def _underline_error(self, recognizer, offending_symbol, line, column): for _ in range(start, stop + 1): output += "^" return output + + +class CustomErrorListener(ErrorListener): + """Error listener that collects errors without raising exceptions (for testing).""" + + def __init__(self): + super().__init__() + self.errors = [] + + def syntaxError(self, recognizer, offending_symbol, line, column, msg, e): + if offending_symbol is None: + error = f"Recognition error at line {line}, column {column}: {msg}" + else: + error = f"Syntax error at line {line}, column {column}: {offending_symbol}" + self.errors.append(error) diff --git a/src/seclang_parser/listener.py b/src/seclang_parser/listener.py new file mode 100644 index 0000000..5ee761b --- /dev/null +++ b/src/seclang_parser/listener.py @@ -0,0 +1,121 @@ +# Copyright 2025 OWASP CRS Project +# SPDX-License-Identifier: Apache-2.0 + +"""Test listener for collecting parser results.""" + +from dataclasses import dataclass, field + +from seclang_parser.SecLangParserListener import SecLangParserListener +from seclang_parser.SecLangParser import SecLangParser + + +@dataclass +class ParserResult: + """Stores results from parsing for test validation.""" + + variables: list[str] = field(default_factory=list) + negated_var_count: int = 0 + collection_length_count: int = 0 + collections: list[str] = field(default_factory=list) + collection_args: list[str] = field(default_factory=list) + operator_list: list[str] = field(default_factory=list) + operator_value_list: list[str] = field(default_factory=list) + negated_operator_count: int = 0 + directive_list: list[str] = field(default_factory=list) + directive_values: list[str] = field(default_factory=list) + range_events: list[str] = field(default_factory=list) + range_start_events: list[int] = field(default_factory=list) + range_end_events: list[int] = field(default_factory=list) + setvar_collections: list[str] = field(default_factory=list) + setvar_names: list[str] = field(default_factory=list) + setvar_operations: list[str] = field(default_factory=list) + + +class TreeShapeListener(SecLangParserListener): + """Listener that collects parsing results for test validation.""" + + def __init__(self): + super().__init__() + self.results = ParserResult() + + def enterInt_range(self, ctx: SecLangParser.Int_rangeContext): + self.results.range_events.append(ctx.getText()) + + def enterRange_start(self, ctx: SecLangParser.Range_startContext): + self.results.range_start_events.append(int(ctx.getText())) + + def enterRange_end(self, ctx: SecLangParser.Range_endContext): + self.results.range_end_events.append(int(ctx.getText())) + + def enterRemove_rule_by_id(self, ctx: SecLangParser.Remove_rule_by_idContext): + self.results.directive_list.append(ctx.getText()) + + def enterRemove_rule_by_id_int(self, ctx: SecLangParser.Remove_rule_by_id_intContext): + self.results.directive_values.append(ctx.getText()) + + def enterRemove_rule_by_id_int_range( + self, ctx: SecLangParser.Remove_rule_by_id_int_rangeContext + ): + self.results.directive_values.append(ctx.getText()) + + def enterRemove_rule_by_msg(self, ctx: SecLangParser.Remove_rule_by_msgContext): + self.results.directive_list.append(ctx.getText()) + + def enterRemove_rule_by_tag(self, ctx: SecLangParser.Remove_rule_by_tagContext): + self.results.directive_list.append(ctx.getText()) + + def enterValues(self, ctx: SecLangParser.ValuesContext): + self.results.directive_values.append(ctx.getText()) + + def enterUpdate_target_by_id(self, ctx: SecLangParser.Update_target_by_idContext): + self.results.directive_list.append(ctx.getText()) + + def enterUpdate_target_by_msg(self, ctx: SecLangParser.Update_target_by_msgContext): + self.results.directive_list.append(ctx.getText()) + + def enterUpdate_target_by_tag(self, ctx: SecLangParser.Update_target_by_tagContext): + self.results.directive_list.append(ctx.getText()) + + def enterUpdate_target_rules_values( + self, ctx: SecLangParser.Update_target_rules_valuesContext + ): + self.results.directive_values.append(ctx.getText()) + + def enterVariable_enum(self, ctx: SecLangParser.Variable_enumContext): + self.results.variables.append(ctx.getText()) + + def enterCollection_enum(self, ctx: SecLangParser.Collection_enumContext): + self.results.collections.append(ctx.getText()) + + def enterCollection_value(self, ctx: SecLangParser.Collection_valueContext): + self.results.collection_args.append(ctx.getText()) + + def enterVar_not(self, ctx: SecLangParser.Var_notContext): + self.results.negated_var_count += 1 + + def enterVar_count(self, ctx: SecLangParser.Var_countContext): + self.results.collection_length_count += 1 + + def enterRules_directive(self, ctx: SecLangParser.Rules_directiveContext): + self.results.directive_list.append(ctx.getText()) + + def enterOperator_name(self, ctx: SecLangParser.Operator_nameContext): + self.results.operator_list.append(ctx.getText()) + + def enterOperator_value(self, ctx: SecLangParser.Operator_valueContext): + self.results.operator_value_list.append(ctx.getText()) + + def enterOperator_not(self, ctx: SecLangParser.Operator_notContext): + self.results.negated_operator_count += 1 + + def enterCol_name(self, ctx: SecLangParser.Col_nameContext): + self.results.setvar_collections.append(ctx.getText()) + + def enterSetvar_stmt(self, ctx: SecLangParser.Setvar_stmtContext): + self.results.setvar_names.append(ctx.getText()) + + def enterAssignment(self, ctx: SecLangParser.AssignmentContext): + self.results.setvar_operations.append(ctx.getText()) + + def enterVar_assignment(self, ctx: SecLangParser.Var_assignmentContext): + self.results.directive_values.append(ctx.getText()) diff --git a/tests/test_parser.py b/tests/test_parser.py index d5955be..4e4670d 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -1,24 +1,430 @@ # Copyright 2025 OWASP CRS Project # SPDX-License-Identifier: Apache-2.0 -import sys +"""Comprehensive parser tests matching the Golang test suite.""" -from antlr4 import * +from antlr4 import FileStream, CommonTokenStream, ParseTreeWalker +import pytest from seclang_parser.SecLangLexer import SecLangLexer from seclang_parser.SecLangParser import SecLangParser -from seclang_parser.SecLangParserListener import SecLangParserListener +from seclang_parser.errors import CustomErrorListener +from seclang_parser.listener import TreeShapeListener, ParserResult -class SecLangListener(SecLangParserListener): - def enterStmt(self, ctx): - print("Entering statement:", ctx.getText()) - def enterAction(self, ctx: SecLangParser.ActionContext): - print("Entering action:", ctx.getText()) +# Test file lists matching Go implementation +CRS_TEST_FILES = [ + "testdata/crs/REQUEST-900-EXCLUSION-RULES-BEFORE-CRS.conf", + "testdata/crs/REQUEST-901-INITIALIZATION.conf", + "testdata/crs/REQUEST-905-COMMON-EXCEPTIONS.conf", + "testdata/crs/REQUEST-911-METHOD-ENFORCEMENT.conf", + "testdata/crs/REQUEST-913-SCANNER-DETECTION.conf", + "testdata/crs/REQUEST-920-PROTOCOL-ENFORCEMENT.conf", + "testdata/crs/REQUEST-921-PROTOCOL-ATTACK.conf", + "testdata/crs/REQUEST-922-MULTIPART-ATTACK.conf", + "testdata/crs/REQUEST-930-APPLICATION-ATTACK-LFI.conf", + "testdata/crs/REQUEST-931-APPLICATION-ATTACK-RFI.conf", + "testdata/crs/REQUEST-932-APPLICATION-ATTACK-RCE.conf", + "testdata/crs/REQUEST-933-APPLICATION-ATTACK-PHP.conf", + "testdata/crs/REQUEST-934-APPLICATION-ATTACK-GENERIC.conf", + "testdata/crs/REQUEST-941-APPLICATION-ATTACK-XSS.conf", + "testdata/crs/REQUEST-942-APPLICATION-ATTACK-SQLI.conf", + "testdata/crs/REQUEST-943-APPLICATION-ATTACK-SESSION-FIXATION.conf", + "testdata/crs/REQUEST-944-APPLICATION-ATTACK-JAVA.conf", + "testdata/crs/REQUEST-949-BLOCKING-EVALUATION.conf", + "testdata/crs/RESPONSE-950-DATA-LEAKAGES.conf", + "testdata/crs/RESPONSE-951-DATA-LEAKAGES-SQL.conf", + "testdata/crs/RESPONSE-952-DATA-LEAKAGES-JAVA.conf", + "testdata/crs/RESPONSE-953-DATA-LEAKAGES-PHP.conf", + "testdata/crs/RESPONSE-954-DATA-LEAKAGES-IIS.conf", + "testdata/crs/RESPONSE-955-WEB-SHELLS.conf", + "testdata/crs/RESPONSE-959-BLOCKING-EVALUATION.conf", + "testdata/crs/RESPONSE-980-CORRELATION.conf", +] - def enterComment(self, ctx: SecLangParser.CommentContext): - print("Entering comment:", ctx.getText()) +PLUGINS_TEST_FILES = [ + "testdata/plugins/wordpress-rule-exclusions-before.conf", + "testdata/plugins/wordpress-rule-exclusions-config.conf", + "testdata/plugins/drupal-rule-exclusions-before.conf", + "testdata/plugins/drupal-rule-exclusions-config.conf", + "testdata/plugins/google-oauth2-before.conf", + "testdata/plugins/google-oauth2-config.conf", + "testdata/plugins/antivirus-before.conf", + "testdata/plugins/antivirus-config.conf", + "testdata/plugins/body-decompress-before.conf", + "testdata/plugins/body-decompress-config.conf", + "testdata/plugins/body-decompress-after.conf", +] +GENERIC_TESTS = { + "testdata/REQUEST-901-INITIALIZATION.conf": { + "error_count": 0, + "comment": "Test file for REQUEST-901-INITIALIZATION.conf", + }, + "testdata/crs-setup.conf": { + "error_count": 0, + "comment": "Test file for crs-setup.conf", + }, + "testdata/test1.conf": { + "error_count": 0, + "comment": "Test SecDefaultAction", + }, + "testdata/test2.conf": { + "error_count": 0, + "comment": "Test SecAction and SecCollectionTimeout", + }, + "testdata/test3.conf": { + "error_count": 0, + "comment": "test comment and secaction", + }, + "testdata/test4.conf": { + "error_count": 0, + "comment": "test redefining SecCollectionTimeout", + }, + "testdata/test5.conf": { + "error_count": 0, + "comment": "Test comments only file", + }, + "testdata/test_01_comment.conf": { + "error_count": 0, + "comment": "Test comments only file", + }, + "testdata/test_02_seccompsignature.conf": { + "error_count": 0, + "comment": "test SecComponentSignature", + }, + "testdata/test_03_secruleengine.conf": { + "error_count": 0, + "comment": "test SecRuleEngine", + }, + "testdata/test_04_directives.conf": { + "error_count": 0, + "comment": "test directives", + }, + "testdata/test_05_secaction.conf": { + "error_count": 0, + "comment": "test SecAction", + }, + "testdata/test_06_secaction2.conf": { + "error_count": 0, + "comment": "test SecAction with and without continuation", + }, + "testdata/test_07_secaction3.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_08_secaction4.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_09_secaction_ctl_01.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_10_secaction_ctl_02.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_11_secaction_ctl_03.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_12_secaction_ctl_04.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_13_secaction_ctl_05.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_14_secaction_ctl_06.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_15_secaction_01.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_16_secrule_01.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_17_secrule_02.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_18_secrule_03.conf": { + "error_count": 1, + "comment": "test should fail with non-existent operator", + }, + "testdata/test_19_secrule_04.conf": { + "error_count": 0, + "comment": "test SecAction with ctl", + }, + "testdata/test_20_secrule_05.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_21_secrule_06.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_22_secrule_07.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_23_secrule_08.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_24_secrule_09.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_25_secrule_10.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_26_secrule_11.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_27_secrule_12.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_28_secrule_13.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_29_secrule_14.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_30_secrule_15.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_31_secaction_ctl_07.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_32_secrule_16.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_33_secrule_16.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_directive_unknown.conf": { + "error_count": 1, + "comment": "test should fail with unknown directive", + }, + "testdata/test_34_xml.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_35_all_directives.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_36_chain.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_37_ugly_rules.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_38_update_rules.conf": { + "error_count": 0, + "comment": "", + }, + "testdata/test_39_remove_rules.conf": { + "error_count": 0, + "comment": "", + }, +} -def test_listener(): - assert True +CHECK_OUTPUT_TESTS = { + "testdata/test_39_remove_rules.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + directive_list=["SecRuleRemoveByID", "SecRuleRemoveByMsg", "SecRuleRemoveByTag"], + directive_values=["1", "2", "9000-9010", "FAIL", "attack-dos"], + range_events=["9000-9010"], + range_start_events=[9000], + range_end_events=[9010], + ), + }, + "testdata/test_38_update_rules.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + variables=["REQUEST_FILENAME", "REQUEST_URI", "REQUEST_FILENAME", "REQUEST_URI"], + negated_var_count=6, + collections=["ARGS", "ARGS", "REQUEST_COOKIES", "ARGS"], + collection_args=["foo", "email", "/^appl1_.*/", "email"], + directive_list=[ + "SecRuleUpdateTargetById", + "SecRuleUpdateTargetById", + "SecRuleUpdateTargetById", + "SecRuleUpdateTargetById", + "SecRuleUpdateTargetByTag", + "SecRuleUpdateTargetByMsg", + ], + directive_values=["12345", "958895", "981172", "958895", "WASCTC/WASC-31", "System Command Injection"], + ), + }, + "testdata/test_40_var_operators.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + variables=["REQUEST_URI"], + negated_var_count=2, + collection_length_count=1, + collections=["REQUEST_HEADERS", "REQUEST_HEADERS", "TX"], + collection_args=["User-Agent", "crs_setup_version"], + operator_list=["validateByteRange", "eq"], + operator_value_list=["32,34,38,42-59,61,65-90,95,97-122", "0"], + directive_list=["SecRule", "SecRule", "SecRuleUpdateTargetById"], + directive_values=["120"], + range_events=["42-59", "65-90", "97-122"], + range_start_events=[42, 65, 97], + range_end_events=[59, 90, 122], + ), + }, + "testdata/test_41_negated_operator_0.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + collections=["ARGS", "ARGS_NAMES"], + operator_list=["rx"], + operator_value_list=["foo"], + negated_operator_count=0, + directive_list=["SecRule"], + ), + }, + "testdata/test_41_negated_operator_1.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + collections=["ARGS", "ARGS_NAMES"], + operator_list=["rx"], + operator_value_list=["foo"], + negated_operator_count=1, + directive_list=["SecRule"], + ), + }, + "testdata/test_41_negated_operator_n.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + variables=["REQBODY_PROCESSOR", "REQBODY_PROCESSOR"], + collections=["TX", "TX"], + collection_args=["enforce_bodyproc_urlencoded", "sampling_rnd100"], + operator_list=["rx", "eq", "rx", "lt"], + operator_value_list=[ + "(?:URLENCODED|MULTIPART|XML|JSON)", + "1", + "(?:URLENCODED|MULTIPART|XML|JSON)", + "%{tx.sampling_percentage}", + ], + negated_operator_count=3, + directive_list=["SecRule", "SecRule", "SecRule", "SecRule"], + ), + }, + "testdata/test_42_setvar.conf": { + "error_count": 0, + "comment": "", + "expected_result": ParserResult( + collections=["ARGS", "ARGS_NAMES", "REQUEST_HEADERS_NAMES", "REQUEST_HEADERS_NAMES"], + operator_list=["rx", "rx", "rx"], + operator_value_list=["foo", "^.*$", "^.*$"], + negated_operator_count=0, + directive_list=["SecRule", "SecRule", "SecRule"], + setvar_collections=["tx", "tx", "tx", "tx", "tx", "tx"], + setvar_names=[ + "var1", + "var2", + "var2", + "var2", + "header_name_920450_%{tx.0}", + "inbound_anomaly_score_pl1", + ], + setvar_operations=["=", "=", "=+", "=-", "=", "=+"], + directive_values=["bar", "0", "2", "1", "/%{tx.0}/", "%{tx.critical_anomaly_score}"], + ), + }, +} + + +def parse_file_for_test(file_path: str) -> tuple[TreeShapeListener, int]: + """Parse a file and return the listener and error count.""" + input_stream = FileStream(file_path, encoding="utf-8") + lexer = SecLangLexer(input_stream) + + lexer_errors = CustomErrorListener() + lexer.removeErrorListeners() + lexer.addErrorListener(lexer_errors) + + parser_errors = CustomErrorListener() + stream = CommonTokenStream(lexer) + parser = SecLangParser(stream) + parser.removeErrorListeners() + parser.addErrorListener(parser_errors) + + parser.buildParseTrees = True + tree = parser.configuration() + + listener = TreeShapeListener() + walker = ParseTreeWalker() + walker.walk(listener, tree) + + total_errors = len(lexer_errors.errors) + len(parser_errors.errors) + + if lexer_errors.errors: + print(f"Lexer {len(lexer_errors.errors)} errors found") + print(f"First error: {lexer_errors.errors[0]}") + if parser_errors.errors: + print(f"Parser {len(parser_errors.errors)} errors found") + print(f"First error: {parser_errors.errors[0]}") + + return listener, total_errors + + +@pytest.mark.parametrize("file_path,test_data", GENERIC_TESTS.items()) +def test_seclang(file_path: str, test_data: dict): + """Test generic SecLang configuration files.""" + listener, total_errors = parse_file_for_test(file_path) + assert total_errors == test_data["error_count"], ( + f"Error count mismatch for file {file_path} -> we want: {test_data['comment']}" + ) + + +@pytest.mark.parametrize("file_path,test_data", CHECK_OUTPUT_TESTS.items()) +def test_seclang_output(file_path: str, test_data: dict): + """Test SecLang files with expected output validation.""" + listener, total_errors = parse_file_for_test(file_path) + assert total_errors == test_data["error_count"], ( + f"Error count mismatch for file {file_path} -> we want: {test_data['comment']}" + ) + assert listener.results == test_data["expected_result"], ( + f"Expected result mismatch for file {file_path} -> we want: {test_data['expected_result']}" + ) + + +@pytest.mark.parametrize("file_path", CRS_TEST_FILES) +def test_crs_lang(file_path: str): + """Test CRS (Core Rule Set) configuration files.""" + listener, total_errors = parse_file_for_test(file_path) + assert total_errors == 0, f"Error count mismatch for file {file_path} -> we want no errors" + + +@pytest.mark.parametrize("file_path", PLUGINS_TEST_FILES) +def test_plugins(file_path: str): + """Test plugin configuration files.""" + listener, total_errors = parse_file_for_test(file_path) + assert total_errors == 0, f"Error count mismatch for file {file_path} -> we want no errors"