Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions server/app/model/provider.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
from pathlib import Path
from typing import IO, Dict, Iterable, Iterator, Union

from basyx.aas import model
from basyx.aas.model import provider as sdk_provider

import app.adapter as adapter
from app.model import descriptor

PathOrIO = Union[Path, IO]
Expand Down Expand Up @@ -51,29 +51,35 @@ def __iter__(self) -> Iterator[_DESCRIPTOR_TYPE]:
return iter(self._backend.values())


_DESCRIPTOR_KEY_TO_CLS = (
("assetAdministrationShellDescriptors", descriptor.AssetAdministrationShellDescriptor),
("submodelDescriptors", descriptor.SubmodelDescriptor),
)


def load_directory(directory: Union[Path, str]) -> DictDescriptorStore:
"""
Create a new :class:`~basyx.aas.model.provider.DictIdentifiableStore` and use it to load Asset Administration Shell
and Submodel files in ``AASX``, ``JSON`` and ``XML`` format from a given directory into memory. Additionally, load
all embedded supplementary files into a new :class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer`.

:param directory: :class:`~pathlib.Path` or ``str`` pointing to the directory containing all Asset Administration
Shell and Submodel files to load
:return: Tuple consisting of a :class:`~basyx.aas.model.provider.DictIdentifiableStore` and a
:class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer` containing all loaded data
"""
Load AAS/Submodel descriptor JSON files from a directory into a :class:`DictDescriptorStore`.

dict_descriptor_store: DictDescriptorStore = DictDescriptorStore()
:param directory: Path to the directory containing JSON descriptor files
:return: Populated :class:`DictDescriptorStore`
"""
from app.adapter import ServerAASFromJsonDecoder
Comment thread
zrgt marked this conversation as resolved.
Outdated

store = DictDescriptorStore()
directory = Path(directory)

for file in directory.iterdir():
if not file.is_file():
if not file.is_file() or file.suffix.lower() != ".json":
continue

suffix = file.suffix.lower()
if suffix == ".json":
with open(file) as f:
adapter.read_server_aas_json_file_into(dict_descriptor_store, f)
Comment thread
zrgt marked this conversation as resolved.

return dict_descriptor_store
with open(file) as f:
data = json.load(f, cls=ServerAASFromJsonDecoder)
for key, cls in _DESCRIPTOR_KEY_TO_CLS:
Comment thread
zrgt marked this conversation as resolved.
Outdated
for item in data.get(key, []):
if isinstance(item, cls):
try:
store.add(item)
except KeyError:
pass

return store
Loading