Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,37 +203,35 @@ jobs:
chmod +x ./etc/scripts/set_copyright_year.sh
./etc/scripts/set_copyright_year.sh --check

# Todo: reset when the compliance-tool was updated (#485)
compliance-tool-test:
# This job runs the unittests on the python versions specified down at the matrix
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.12"]
defaults:
run:
working-directory: ./compliance_tool

# compliance-tool-test:
# # This job runs the unittests on the python versions specified down at the matrix
# runs-on: ubuntu-latest
# strategy:
# matrix:
# python-version: ["3.10", "3.12"]
# defaults:
# run:
# working-directory: ./compliance_tool
#
# steps:
# - uses: actions/checkout@v4
# - name: Set up Python ${{ matrix.python-version }}
# uses: actions/setup-python@v5
# with:
# python-version: ${{ matrix.python-version }}
# - name: Install Python dependencies
# # install the local sdk in editable mode so it does not get overwritten
# run: |
# python -m pip install --upgrade pip
# python -m pip install ../sdk
# python -m pip install .[dev]
# - name: Test with coverage + unittest
# run: |
# python -m coverage run --source=aas_compliance_tool -m unittest
# - name: Report test coverage
# if: ${{ always() }}
# run: |
# python -m coverage report -m
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Python dependencies
# install the local sdk in editable mode so it does not get overwritten
run: |
python -m pip install --upgrade pip
python -m pip install ../sdk
python -m pip install .[dev]
- name: Test with coverage + unittest
run: |
python -m coverage run --source=aas_compliance_tool -m unittest
- name: Report test coverage
if: ${{ always() }}
run: |
python -m coverage report -m

compliance-tool-static-analysis:
# This job runs static code analysis, namely pycodestyle and mypy
Expand Down
11 changes: 5 additions & 6 deletions compliance_tool/aas_compliance_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,19 @@ def parse_cli_arguments() -> argparse.ArgumentParser:
'Asset Administration Shell" specification of Plattform Industrie 4.0. \n\n'
'This tool has five features: \n'
'1. create a xml or json file or an AASX file using xml or json files with example aas elements\n'
'2. check if a given xml or json file is compliant with the official json or xml aas schema and '
'is deserializable\n'
'2. check if a given xml or json file is deserializable and therefore compliant with the schema\n'
'3. check if the data in a given xml, json or aasx file is the same as the example data\n'
'4. check if two given xml, json or aasx files contain the same aas elements in any order\n\n'
'As a first argument, the feature must be specified (create, schema, deserialization, example, '
'files) or in short (c, s, d, e or f).\n'
'As a first argument, the feature must be specified (create, deserialization, example, '
'files) or in short (c, d, e or f).\n'
'Depending the chosen feature, different additional arguments must be specified:\n'
'create or c: path to the file which shall be created (file_1)\n'
'deseriable or d: file to be checked (file_1)\n'
'example or e: file to be checked (file_1)\n'
'file_compare or f: files to compare (file_1, file_2)\n,'
'In any case, it must be specified whether the (given or created) files are json (--json) or '
'xml (--xml).\n'
'All features except "schema" support reading/writing AASX packages instead of plain XML or JSON '
'All features support reading/writing AASX packages instead of plain XML or JSON '
'files via the --aasx option.\n\n'
'Additionally, the tool offers some extra features for more convenient usage:\n\n'
'a. Different levels of verbosity:\n'
Expand All @@ -63,7 +62,7 @@ def parse_cli_arguments() -> argparse.ArgumentParser:
' With -l or --logfile, a path to the file where the logfiles shall be created can be specified.',
formatter_class=argparse.RawTextHelpFormatter)

parser.add_argument('action', choices=['create', 'c', 'schema', 's', 'deserialization', 'd', 'example', 'e',
parser.add_argument('action', choices=['create', 'c', 'deserialization', 'd', 'example', 'e',
'files', 'f'],
help='c or create: creates a file with example data\n'
'd or deserialization: checks if a given file is compliance with the official schema and '
Expand Down
116 changes: 74 additions & 42 deletions compliance_tool/aas_compliance_tool/compliance_check_aasx.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
import datetime
import logging
from typing import Optional, Tuple
from typing import Optional, Tuple, cast
import io
from lxml import etree # type: ignore

Expand Down Expand Up @@ -118,6 +118,8 @@ def check_aas_example(file_path: str, state_manager: ComplianceToolStateManager,
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if core properties are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if supplementary files are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
return

checker = AASDataChecker(raise_immediately=False, **kwargs)
Expand All @@ -130,6 +132,8 @@ def check_aas_example(file_path: str, state_manager: ComplianceToolStateManager,
if state_manager.status in (Status.FAILED, Status.NOT_EXECUTED):
state_manager.add_step('Check if core properties are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if supplementary files are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
return

state_manager.add_step('Check if core properties are equal')
Expand All @@ -145,60 +149,46 @@ def check_aas_example(file_path: str, state_manager: ComplianceToolStateManager,
cp.title = "Test Title"

checker2 = DataChecker(raise_immediately=False)
try:
assert isinstance(cp_new.created, datetime.datetime)
checker2.check(isinstance(cp_new.created, datetime.datetime), "core property created must be of type datetime",
created=type(cp_new.created))
duration = cp_new.created - cp.created
if checker2.check(isinstance(cp_new.created, datetime.datetime), "core property created must be of type datetime",
created=type(cp_new.created)):
duration = cast(datetime.datetime, cp_new.created) - cp.created
checker2.check(duration.microseconds < 20, "created must be {}".format(cp.created), created=cp_new.created)
except AssertionError:
checker2.check(isinstance(cp_new.created, datetime.datetime), "core property created must be of type datetime",
created=type(cp_new.created))

checker2.check(cp_new.creator == cp.creator, "creator must be {}".format(cp.creator), creator=cp_new.creator)
checker2.check(cp_new.description == cp.description, "description must be {}".format(cp.description),
description=cp_new.description)
checker2.check(cp_new.lastModifiedBy == cp.lastModifiedBy, "lastModifiedBy must be {}".format(cp.lastModifiedBy),
lastModifiedBy=cp_new.lastModifiedBy)
try:
assert isinstance(cp_new.modified, datetime.datetime)
checker2.check(isinstance(cp_new.modified, datetime.datetime), "modified bust be of type datetime",
modified=type(cp_new.modified))
duration = cp_new.modified - cp.modified

if checker2.check(isinstance(cp_new.modified, datetime.datetime), "modified must be of type datetime",
modified=type(cp_new.modified)):
duration = cast(datetime.datetime, cp_new.modified) - cp.modified
checker2.check(duration.microseconds < 20, "modified must be {}".format(cp.modified), modified=cp_new.modified)
except AssertionError:
checker2.check(isinstance(cp_new.modified, datetime.datetime), "modified bust be of type datetime",
modified=type(cp_new.modified))

checker2.check(cp_new.revision == cp.revision, "revision must be {}".format(cp.revision), revision=cp_new.revision)
checker2.check(cp_new.version == cp.version, "version must be {}".format(cp.version), version=cp_new.version)
checker2.check(cp_new.title == cp.title, "title must be {}".format(cp.title), title=cp_new.title)

state_manager.add_log_records_from_data_checker(checker2)

# Check if file in file object is the same
state_manager.add_step('Check if supplementary files are equal')
file_checker = DataChecker(raise_immediately=False)

list_of_id_shorts = ["ExampleSubmodelCollection", "ExampleFile"]
identifiable = example_data.get_item("https://example.org/Test_Submodel")
for id_short in list_of_id_shorts:
identifiable = identifiable.get_referable(id_short)
obj2 = identifiable_store.get_item("https://example.org/Test_Submodel")
for id_short in list_of_id_shorts:
obj2 = obj2.get_referable(id_short)
try:
sha_file = files.get_sha256(identifiable.value)
except KeyError as error:
state_manager.add_log_records_from_data_checker(checker2)
logger.error(error)
state_manager.set_step_status(Status.FAILED)
return
file_name = identifiable.value
if file_checker.check(file_name in files, f"Supplementary File {file_name} must exist"):
test_file_checksum = 'b18229b24a4ee92c6c2b6bc6a8018563b17472f1150d35d5a5945afeb447ed44'
file_checker.check(
files.get_sha256(file_name).hex() == test_file_checksum,
f"Supplementary File {file_name} checksum must be '{test_file_checksum}'.",
value=files.get_sha256(file_name)
)

checker2.check(
sha_file == files.get_sha256(obj2.value),
"File of {} must be {}.".format(identifiable.value, obj2.value),
value=obj2.value
)
state_manager.add_log_records_from_data_checker(checker2)
if state_manager.status in (Status.FAILED, Status.NOT_EXECUTED):
state_manager.set_step_status(Status.FAILED)
else:
state_manager.set_step_status(Status.SUCCESS)
state_manager.add_log_records_from_data_checker(file_checker)


def check_aasx_files_equivalence(file_path_1: str, file_path_2: str, state_manager: ComplianceToolStateManager,
Expand All @@ -224,11 +214,13 @@ def check_aasx_files_equivalence(file_path_1: str, file_path_2: str, state_manag

identifiable_store_2, files_2, cp_2 = check_deserialization(file_path_2, state_manager, 'second')

if state_manager.status is Status.FAILED:
if state_manager.status >= Status.FAILED:
state_manager.add_step('Check if data in files are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if core properties are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if supplementary files are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
return

checker = AASDataChecker(raise_immediately=False, **kwargs)
Expand All @@ -240,22 +232,62 @@ def check_aasx_files_equivalence(file_path_1: str, file_path_2: str, state_manag
logger.error(error)
state_manager.add_step('Check if core properties are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if supplementary files are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
return

state_manager.add_log_records_from_data_checker(checker)

if state_manager.status is Status.FAILED:
if state_manager.status >= Status.FAILED:
state_manager.add_step('Check if core properties are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
state_manager.add_step('Check if supplementary files are equal')
state_manager.set_step_status(Status.NOT_EXECUTED)
return

state_manager.add_step('Check if core properties are equal')
checker2 = DataChecker(raise_immediately=False)
assert (isinstance(cp_1.created, datetime.datetime))
assert (isinstance(cp_2.created, datetime.datetime))
duration = cp_1.created - cp_2.created
checker2.check(isinstance(cp_1.created, datetime.datetime),
"core property created of first file must be of type datetime",
created=type(cp_1.created))
checker2.check(isinstance(cp_2.created, datetime.datetime),
"core property created of second file must be of type datetime",
created=type(cp_2.created))

if any(True for _ in checker2.failed_checks):
state_manager.add_log_records_from_data_checker(checker2)
return

duration = cast(datetime.datetime, cp_1.created) - cast(datetime.datetime, cp_2.created)
checker2.check(duration.microseconds < 20, "created must be {}".format(cp_1.created), value=cp_2.created)
checker2.check(cp_1.creator == cp_2.creator, "creator must be {}".format(cp_1.creator), value=cp_2.creator)
checker2.check(cp_1.lastModifiedBy == cp_2.lastModifiedBy, "lastModifiedBy must be {}".format(cp_1.lastModifiedBy),
value=cp_2.lastModifiedBy)
checker2.check(cp_1.revision == cp_2.revision, "revision must be {}".format(cp_2.revision), revision=cp_1.revision)
checker2.check(cp_1.version == cp_2.version, "version must be {}".format(cp_2.version), version=cp_1.version)
checker2.check(cp_1.title == cp_2.title, "title must be {}".format(cp_2.title), title=cp_1.title)
state_manager.add_log_records_from_data_checker(checker2)

state_manager.add_step('Check if supplementary files are equal')

file_checker = DataChecker(raise_immediately=False)
for file_name in files_1:
both_contain = file_checker.check(file_name in files_2,
"second file must contain supplementary file {}".format(file_name))
if both_contain:
expected_type = files_1.get_content_type(file_name)
file_checker.check(expected_type == files_2.get_content_type(file_name),
f"second file must contain supplementary file {file_name}"
" with content-type {expected_type}",
content_type=files_2.get_content_type(file_name))
expected_checksum = files_1.get_sha256(file_name)
file_checker.check(expected_checksum == files_2.get_sha256(file_name),
f"second file must contain supplementary file {file_name}"
f" with sha256 {expected_checksum.hex()}",
checksum=files_2.get_sha256(file_name).hex())

for file_name in files_2:
file_checker.check(file_name in files_1,
"first file must contain supplementary file {}".format(file_name))

state_manager.add_log_records_from_data_checker(file_checker)
4 changes: 2 additions & 2 deletions compliance_tool/aas_compliance_tool/state_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2025 the Eclipse BaSyx Authors
# Copyright (c) 2026 the Eclipse BaSyx Authors
#
# This program and the accompanying materials are made available under the terms of the MIT License, available in
# the LICENSE file of this project.
Expand Down Expand Up @@ -26,7 +26,7 @@ class Status(enum.IntEnum):
:cvar NOT_EXECUTED:
"""
SUCCESS = 0
SUCCESS_WITH_WARNINGS = 1
SUCCESS_WITH_WARNINGS = 1 # never used
FAILED = 2
NOT_EXECUTED = 3

Expand Down
28 changes: 0 additions & 28 deletions compliance_tool/test/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +0,0 @@
import os
import zipfile

AASX_FILES = ("test_demo_full_example_json_aasx",
"test_demo_full_example_xml_aasx",
"test_demo_full_example_xml_wrong_attribute_aasx",
"test_empty_aasx")


def _zip_directory(directory_path, zip_file_path):
"""Zip a directory recursively."""
with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, directory_path)
zipf.write(file_path, arcname=arcname)


def generate_aasx_files():
"""Zip dirs and create test AASX files."""
script_dir = os.path.dirname(__file__)
for i in AASX_FILES:
_zip_directory(os.path.join(script_dir, "files", i),
os.path.join(script_dir, "files", i.rstrip("_aasx") + ".aasx"))


generate_aasx_files()
57 changes: 57 additions & 0 deletions compliance_tool/test/_test_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import io
from typing import Literal, Type, Optional
import datetime
import logging

import pyecma376_2

from basyx.aas.examples.data import create_example_aas_binding, TEST_PDF_FILE


def create_example_aas_core_properties() -> pyecma376_2.OPCCoreProperties:
"""Create core properties similar to the example AASX file."""

cp = pyecma376_2.OPCCoreProperties()
cp.created = datetime.datetime(2020, 1, 1, 0, 0, 0)
cp.creator = "Eclipse BaSyx Python Testing Framework"
cp.description = "Test_Description"
cp.lastModifiedBy = "Eclipse BaSyx Python Testing Framework Compliance Tool"
cp.modified = datetime.datetime(2020, 1, 1, 0, 0, 1)
cp.revision = "1.0"
cp.version = "2.0.1"
cp.title = "Test Title"
return cp


def create_read_into_mock(file: Literal['TestFile', 'TestFileWrong', None]):
""""Creates side effect function for the AASXReader.read_into mock"""

def fill_stores(store, file_store, **kwargs) -> None:
for item in create_example_aas_binding():
store.add(item)

if file == 'TestFile':
with open(TEST_PDF_FILE, 'rb') as f:
file_store.add_file("/TestFile.pdf", f, "application/pdf")
elif file == 'TestFileWrong':
file_store.add_file("/TestFile.pdf", io.BytesIO(b"dummy"), "application/pdf")
return fill_stores


def create_mock_effect(
module: str,
level: Literal['error', 'warning', 'info', 'debug'],
error_cls: Type[Exception] = ValueError,
error_msg: Optional[str] = None
):
"""Create mock function, that raises or logs error (based on `failsafe` argument)"""

error_msg = error_msg or f"Test {level}!"

def mock_error(*args, **kwargs):
if kwargs.get('failsafe', True):
getattr(logging.getLogger(module), level)(error_msg)
else:
raise error_cls(error_msg)

return mock_error
Loading
Loading