1+ import unittest
2+ import tempfile
3+ from pathlib import Path
4+
5+ from basyx .aas import model
6+ from basyx .aas import adapter
7+
8+
9+ class LoadDirectoryTest (unittest .TestCase ):
10+ def test_reading_all_files (self ):
11+ # ----- Arange ----
12+ # Create an AAS per file type
13+ json_aas = model .AssetAdministrationShell (
14+ id_ = "http://example.org/JSON_AAS" ,
15+ asset_information = model .AssetInformation (
16+ global_asset_id = "http://example.org/JSON_Asset"
17+ )
18+ )
19+
20+ xml_aas = model .AssetAdministrationShell (
21+ id_ = "http://example.org/XML_AAS" ,
22+ asset_information = model .AssetInformation (
23+ global_asset_id = "http://example.org/XML_Asset"
24+ )
25+ )
26+
27+ aasx_aas = model .AssetAdministrationShell (
28+ id_ = "http://example.org/aasx_AAS" ,
29+ asset_information = model .AssetInformation (
30+ global_asset_id = "http://example.org/aasx_Asset"
31+ )
32+ )
33+
34+ # load test.pdf to save into aasx
35+ file_container = adapter .aasx .DictSupplementaryFileContainer ()
36+ with open (Path (__file__ ).parent / "test.pdf" , "rb" ) as pdf :
37+ resulting_file_name = file_container .add_file (
38+ "/aasx/suppl/file.pdf" , pdf , "application/json" )
39+
40+ # create submodel for aasx_aas that refers to pdf
41+ sm_with_file = model .Submodel (
42+ id_ = "http://example.org/tmp_Submodel" ,
43+ submodel_element = {
44+ model .File (
45+ id_short = "SampleFile" ,
46+ content_type = "application/json" ,
47+ value = resulting_file_name
48+ )
49+ }
50+ )
51+ aasx_aas .submodel .add (model .ModelReference .from_referable (sm_with_file ))
52+
53+ with tempfile .TemporaryDirectory () as temp_dir :
54+ temp_dir_path = Path (temp_dir )
55+
56+ # save to json file
57+ adapter .json .write_aas_json_file (temp_dir_path / "testAAS.json" ,
58+ model .DictIdentifiableStore ([json_aas ]))
59+ # save to xml file
60+ adapter .xml .write_aas_xml_file (temp_dir_path / "testAAS.xml" ,
61+ model .DictIdentifiableStore ([xml_aas ]))
62+ # save to aasx file
63+ with adapter .aasx .AASXWriter (temp_dir_path / "testAAS.aasx" ) as writer :
64+ writer .write_aas (
65+ aas_ids = ["http://example.org/aasx_AAS" ],
66+ object_store = model .DictIdentifiableStore ([aasx_aas , sm_with_file ]),
67+ file_store = file_container
68+ )
69+
70+
71+ # ---- Act ----
72+ new_object_store , new_file_store = adapter .load_directory (temp_dir_path )
73+
74+ # ---- Assert -----
75+ # check for all three AAS
76+ self .assertIn ("http://example.org/JSON_AAS" , new_object_store )
77+ self .assertIn ("http://example.org/XML_AAS" , new_object_store )
78+ self .assertIn ("http://example.org/aasx_AAS" , new_object_store )
79+
80+ # check pdf is loaded
81+ self .assertIn (resulting_file_name , new_file_store )
82+
83+
84+ def test_skipping_other_files (self ):
85+ with tempfile .TemporaryDirectory () as tmp_dir :
86+ # ---- Arange ----
87+ tmp_dir_path = Path (tmp_dir )
88+
89+ # create empty file
90+ open (tmp_dir_path / "test.txt" , "a" ).close ()
91+
92+ # create directory
93+ (tmp_dir_path / "empty" ).mkdir ()
94+
95+ # ---- Act ----
96+ # assert no exception is occurring
97+ object_store , file_store = adapter .load_directory (tmp_dir_path )
98+
99+ # ---- Assert ----
100+ # check stores are empty
101+ self .assertEqual (len (object_store ), 0 )
102+ self .assertEqual (len (file_store ), 0 )
0 commit comments