diff --git a/sdk/basyx/aas/backend/couchdb.py b/sdk/basyx/aas/backend/couchdb.py index c2871aed..fe02345e 100644 --- a/sdk/basyx/aas/backend/couchdb.py +++ b/sdk/basyx/aas/backend/couchdb.py @@ -160,7 +160,6 @@ def get_identifiable_by_couchdb_id(self, couchdb_id: str) -> model.Identifiable: raise KeyError("No Identifiable with couchdb-id {} found in CouchDB database".format(couchdb_id)) from e raise - # Add CouchDB metadata (for later commits) to object obj = data['data'] if not isinstance(obj, model.Identifiable): raise CouchDBResponseError("The CouchDB document with id {} does not contain an identifiable AAS object." @@ -168,14 +167,10 @@ def get_identifiable_by_couchdb_id(self, couchdb_id: str) -> model.Identifiable: set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(couchdb_id, safe='')), data["_rev"]) - # If we still have a local replication of that object (since it is referenced from anywhere else), update that - # replication and return it. with self._object_cache_lock: if obj.id in self._object_cache: - old_obj = self._object_cache[obj.id] - old_obj.update_from(obj) - return old_obj - self._object_cache[obj.id] = obj + return self._object_cache[obj.id] + self._object_cache[obj.id] = obj return obj def get_item(self, identifier: model.Identifier) -> model.Identifiable: @@ -186,6 +181,9 @@ def get_item(self, identifier: model.Identifier) -> model.Identifiable: :raises CouchDBError: If error occur during the request to the CouchDB server (see ``_do_request()`` for details) """ + with self._object_cache_lock: + if identifier in self._object_cache: + return self._object_cache[identifier] try: return self.get_identifiable_by_couchdb_id(self._transform_id(identifier, False)) except KeyError as e: @@ -220,6 +218,37 @@ def add(self, x: model.Identifiable) -> None: with self._object_cache_lock: self._object_cache[x.id] = x + def commit(self, x: model.Identifiable) -> None: + """ + Write the current in-memory state of a stored object back to the CouchDB. + + :param x: The object to persist + :raises KeyError: If the object is not present in the store or no revision is known + :raises CouchDBConflictError: If the object was modified in the database since it was last fetched + :raises CouchDBError: If error occur during the request to the CouchDB server + (see ``_do_request()`` for details) + """ + doc_url = "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.id)) + rev = get_couchdb_revision(doc_url) + if rev is None: + raise KeyError("No revision found for object with id {} — not fetched from this store".format(x.id)) + data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder) + try: + response = self._do_request( + "{}?rev={}".format(doc_url, rev), + 'PUT', + {'Content-type': 'application/json'}, + data.encode('utf-8')) + set_couchdb_revision(doc_url, response["rev"]) + except CouchDBServerError as e: + if e.code == 404: + raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.id)) from e + elif e.code == 409: + raise CouchDBConflictError( + "Object with id {} has been modified in the database since it was last fetched." + .format(x.id)) from e + raise + def discard(self, x: model.Identifiable, safe_delete=False) -> None: """ Delete an :class:`~basyx.aas.model.base.Identifiable` AAS object from the CouchDB database diff --git a/sdk/basyx/aas/backend/local_file.py b/sdk/basyx/aas/backend/local_file.py index 72d5605a..df21ddfe 100644 --- a/sdk/basyx/aas/backend/local_file.py +++ b/sdk/basyx/aas/backend/local_file.py @@ -16,6 +16,7 @@ import json import os import hashlib +import tempfile import threading import warnings import weakref @@ -31,6 +32,13 @@ class LocalFileIdentifiableStore(model.AbstractObjectStore[model.Identifier, mod """ An ObjectStore implementation for :class:`~basyx.aas.model.base.Identifiable` BaSyx Python SDK objects backed by a local file based local backend + + .. warning:: + This backend is intended for development and testing only. It provides no + concurrency control across processes: concurrent writes to the same object + (e.g. under a multi-worker WSGI server) will silently overwrite each other, + with the last writer winning and no error raised. Use a dedicated database + backend for any production deployment. """ def __init__(self, directory_path: str): """ @@ -68,21 +76,16 @@ def get_identifiable_by_hash(self, hash_: str) -> model.Identifiable: :raises KeyError: If the respective file could not be found """ - # Try to get the correct file try: with open("{}/{}.json".format(self.directory_path, hash_), "r") as file: data = json.load(file, cls=json_deserialization.AASFromJsonDecoder) obj = data["data"] except FileNotFoundError as e: raise KeyError("No Identifiable with hash {} found in local file database".format(hash_)) from e - # If we still have a local replication of that object (since it is referenced from anywhere else), update that - # replication and return it. with self._object_cache_lock: if obj.id in self._object_cache: - old_obj = self._object_cache[obj.id] - old_obj.update_from(obj) - return old_obj - self._object_cache[obj.id] = obj + return self._object_cache[obj.id] + self._object_cache[obj.id] = obj return obj def get_item(self, identifier: model.Identifier) -> model.Identifiable: @@ -91,11 +94,33 @@ def get_item(self, identifier: model.Identifier) -> model.Identifiable: :raises KeyError: If the respective file could not be found """ + with self._object_cache_lock: + if identifier in self._object_cache: + return self._object_cache[identifier] try: return self.get_identifiable_by_hash(self._transform_id(identifier)) except KeyError as e: raise KeyError("No Identifiable with id {} found in local file database".format(identifier)) from e + def _write_atomic(self, x: model.Identifiable) -> None: + """ + Serialize x to a temp file in the store directory, then atomically replace the final file. + + Using os.replace() (rename(2) on POSIX) ensures readers always see a complete file — never + a partially-written one from a crash or concurrent access mid-write. + """ + final_path = "{}/{}.json".format(self.directory_path, self._transform_id(x.id)) + tmp_fd, tmp_path = tempfile.mkstemp(dir=self.directory_path, suffix=".tmp") + try: + with os.fdopen(tmp_fd, "w") as tmp_file: + json.dump({"data": x}, tmp_file, cls=json_serialization.AASToJsonEncoder, indent=4) + os.replace(tmp_path, final_path) + # Catch all `Exception`s, as well as `KeyboardInterrupt` and `SystemExit` too, so the temp + # file is never left behind even if the process is being torn down: + except BaseException: + os.unlink(tmp_path) + raise + def add(self, x: model.Identifiable) -> None: """ Add an object to the store @@ -105,10 +130,20 @@ def add(self, x: model.Identifiable) -> None: logger.debug("Adding object %s to Local File Store ...", repr(x)) if os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))): raise KeyError("Identifiable with id {} already exists in local file database".format(x.id)) - with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file: - json.dump({"data": x}, file, cls=json_serialization.AASToJsonEncoder, indent=4) - with self._object_cache_lock: - self._object_cache[x.id] = x + self._write_atomic(x) + with self._object_cache_lock: + self._object_cache[x.id] = x + + def commit(self, x: model.Identifiable) -> None: + """ + Write the current in-memory state of a stored object back to its file. + + :param x: The object to persist + :raises KeyError: If the object is not present in the store + """ + if not os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))): + raise KeyError("No AAS object with id {} exists in local file database".format(x.id)) + self._write_atomic(x) def discard(self, x: model.Identifiable) -> None: """ diff --git a/sdk/basyx/aas/model/provider.py b/sdk/basyx/aas/model/provider.py index c48342c6..9a91a346 100644 --- a/sdk/basyx/aas/model/provider.py +++ b/sdk/basyx/aas/model/provider.py @@ -59,6 +59,18 @@ class AbstractObjectStore(AbstractObjectProvider[_KEY, _VALUE], MutableSet[_VALU def __init__(self): pass + def commit(self, x: _VALUE) -> None: + """ + Persist an in-memory mutation of a stored object back to the underlying storage. + + Persistent backends (e.g. file-based or database-backed stores) must override this to + write the updated object back to storage. In-memory stores should override this with an + explicit no-op to make the intent clear. + + :param x: The object whose current in-memory state should be persisted + """ + raise NotImplementedError() + def update(self, other: Iterable[_VALUE]) -> None: for x in other: self.add(x) @@ -146,6 +158,9 @@ def add(self, x: _IDENTIFIABLE) -> None: .format(x.id)) self._backend[x.id] = x + def commit(self, x: _IDENTIFIABLE) -> None: + pass + def discard(self, x: _IDENTIFIABLE) -> None: if self._backend.get(x.id) is x: del self._backend[x.id] @@ -223,6 +238,9 @@ def add(self, x: _IDENTIFIABLE) -> None: else: raise KeyError(f"Identifiable object with same id {x.id} is already stored in this store") + def commit(self, x: _IDENTIFIABLE) -> None: + pass + def discard(self, x: _IDENTIFIABLE) -> None: self._backend.discard(x) diff --git a/sdk/test/backend/test_local_file.py b/sdk/test/backend/test_local_file.py index f1080240..71447f61 100644 --- a/sdk/test/backend/test_local_file.py +++ b/sdk/test/backend/test_local_file.py @@ -4,6 +4,7 @@ # the LICENSE file of this project. # # SPDX-License-Identifier: MIT +import gc import os.path import shutil @@ -134,6 +135,32 @@ def test_iter_ignores_non_json_files(self) -> None: self.assertEqual(5, len(items)) os.remove(stray) + def test_mutation_persistence(self) -> None: + submodel = model.Submodel( + id_='https://example.org/MutationTest', + submodel_element={ + model.Property(id_short='Prop', value_type=model.datatypes.String, value='before') + } + ) + self.identifiable_store.add(submodel) + + retrieved = self.identifiable_store.get_item('https://example.org/MutationTest') + assert isinstance(retrieved, model.Submodel) + prop = retrieved.get_referable(['Prop']) + assert isinstance(prop, model.Property) + prop.update_from(model.Property(id_short='Prop', value_type=model.datatypes.String, value='after')) + self.identifiable_store.commit(retrieved) + + # Drop all strong references to evict the WeakValueDictionary cache + del submodel, retrieved, prop + gc.collect() + + fresh = self.identifiable_store.get_item('https://example.org/MutationTest') + assert isinstance(fresh, model.Submodel) + fresh_prop = fresh.get_referable(['Prop']) + assert isinstance(fresh_prop, model.Property) + self.assertEqual('after', fresh_prop.value) + def test_reload_discard(self) -> None: # Load example submodel example_submodel = create_example_submodel() diff --git a/server/app/backend/local_file.py b/server/app/backend/local_file.py index e55c08e6..e71a0e98 100644 --- a/server/app/backend/local_file.py +++ b/server/app/backend/local_file.py @@ -67,20 +67,15 @@ def get_descriptor_by_hash(self, hash_: str) -> _DESCRIPTOR_TYPE: :raises KeyError: If the respective file could not be found """ - # Try to get the correct file try: with open("{}/{}.json".format(self.directory_path, hash_), "r") as file: obj = json.load(file, cls=jsonization.ServerAASFromJsonDecoder) except FileNotFoundError as e: raise KeyError("No Descriptor with hash {} found in local file database".format(hash_)) from e - # If we still have a local replication of that object (since it is referenced from anywhere else), update that - # replication and return it. with self._object_cache_lock: if obj.id in self._object_cache: - old_obj = self._object_cache[obj.id] - old_obj.update_from(obj) - return old_obj - self._object_cache[obj.id] = obj + return self._object_cache[obj.id] + self._object_cache[obj.id] = obj return obj def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE: @@ -89,6 +84,9 @@ def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE: :raises KeyError: If the respective file could not be found """ + with self._object_cache_lock: + if identifier in self._object_cache: + return self._object_cache[identifier] try: return self.get_descriptor_by_hash(self._transform_id(identifier)) except KeyError as e: @@ -113,6 +111,20 @@ def add(self, x: _DESCRIPTOR_TYPE) -> None: with self._object_cache_lock: self._object_cache[x.id] = x + def commit(self, x: _DESCRIPTOR_TYPE) -> None: + """ + Write the current in-memory state of a stored descriptor back to its file. + + :param x: The descriptor to persist + :raises KeyError: If the descriptor is not present in the store + """ + if not os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))): + raise KeyError("No AAS Descriptor object with id {} exists in local file database".format(x.id)) + with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file: + serialized = json.loads(json.dumps(x, cls=jsonization.ServerAASToJsonEncoder)) + serialized["modelType"] = DESCRIPTOR_TYPE_TO_STRING[type(x)] + json.dump(serialized, file, indent=4) + def discard(self, x: _DESCRIPTOR_TYPE) -> None: """ Delete an :class:`~app.model.descriptor.Descriptor` AAS object from the local file store diff --git a/server/app/interfaces/registry.py b/server/app/interfaces/registry.py index 37ab9555..274602ff 100644 --- a/server/app/interfaces/registry.py +++ b/server/app/interfaces/registry.py @@ -180,7 +180,7 @@ def post_aas_descriptor( self.object_store.add(descriptor) except KeyError as e: raise Conflict(f"AssetAdministrationShellDescriptor with Identifier {descriptor.id} already exists!") from e - descriptor.commit() + self.object_store.commit(descriptor) created_resource_url = map_adapter.build( self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True ) @@ -202,12 +202,12 @@ def put_aas_descriptor_by_id( request, server_model.AssetAdministrationShellDescriptor, is_stripped_request(request) ) ) - descriptor.commit() + self.object_store.commit(descriptor) return response_t() except NotFound: descriptor = HTTPApiDecoder.request_body(request, server_model.AssetAdministrationShellDescriptor, False) self.object_store.add(descriptor) - descriptor.commit() + self.object_store.commit(descriptor) created_resource_url = map_adapter.build( self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True ) @@ -247,7 +247,7 @@ def post_submodel_descriptor_through_superpath( if any(sd.id == submodel_descriptor.id for sd in aas_descriptor.submodel_descriptors): raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!") aas_descriptor.submodel_descriptors.append(submodel_descriptor) - aas_descriptor.commit() + self.object_store.commit(aas_descriptor) created_resource_url = map_adapter.build( self.get_submodel_descriptor_by_id_through_superpath, {"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id}, @@ -269,14 +269,14 @@ def put_submodel_descriptor_by_id_through_superpath( submodel_descriptor.update_from( HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request)) ) - aas_descriptor.commit() + self.object_store.commit(aas_descriptor) return response_t() except NotFound: submodel_descriptor = HTTPApiDecoder.request_body( request, server_model.SubmodelDescriptor, is_stripped_request(request) ) aas_descriptor.submodel_descriptors.append(submodel_descriptor) - aas_descriptor.commit() + self.object_store.commit(aas_descriptor) created_resource_url = map_adapter.build( self.get_submodel_descriptor_by_id_through_superpath, {"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id}, @@ -293,7 +293,7 @@ def delete_submodel_descriptor_by_id_through_superpath( if submodel_descriptor is None: raise NotFound(f"Submodel Descriptor with Identifier {submodel_id} not found in AssetAdministrationShell!") aas_descriptor.submodel_descriptors.remove(submodel_descriptor) - aas_descriptor.commit() + self.object_store.commit(aas_descriptor) return response_t() # ------ Submodel REGISTRY ROUTES ------- @@ -321,7 +321,7 @@ def post_submodel_descriptor( self.object_store.add(submodel_descriptor) except KeyError as e: raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!") from e - submodel_descriptor.commit() + self.object_store.commit(submodel_descriptor) created_resource_url = map_adapter.build( self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True ) @@ -335,14 +335,14 @@ def put_submodel_descriptor_by_id( submodel_descriptor.update_from( HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request)) ) - submodel_descriptor.commit() + self.object_store.commit(submodel_descriptor) return response_t() except NotFound: submodel_descriptor = HTTPApiDecoder.request_body( request, server_model.SubmodelDescriptor, is_stripped_request(request) ) self.object_store.add(submodel_descriptor) - submodel_descriptor.commit() + self.object_store.commit(submodel_descriptor) created_resource_url = map_adapter.build( self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True ) diff --git a/server/app/interfaces/repository.py b/server/app/interfaces/repository.py index 89ad0d64..c604408e 100644 --- a/server/app/interfaces/repository.py +++ b/server/app/interfaces/repository.py @@ -559,6 +559,7 @@ def put_aas(self, request: Request, url_args: Dict, response_t: Type[APIResponse aas.update_from( HTTPApiDecoder.request_body(request, model.AssetAdministrationShell, is_stripped_request(request)) ) + self.object_store.commit(aas) return response_t() def delete_aas(self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs) -> Response: @@ -577,6 +578,7 @@ def put_aas_asset_information( ) -> Response: aas = self._get_shell(url_args) aas.asset_information = HTTPApiDecoder.request_body(request, model.AssetInformation, False) + self.object_store.commit(aas) return response_t() def get_aas_submodel_refs( @@ -595,6 +597,7 @@ def post_aas_submodel_refs(self, request: Request, url_args: Dict, response_t: T if sm_ref in aas.submodel: raise Conflict(f"{sm_ref!r} already exists!") aas.submodel.add(sm_ref) + self.object_store.commit(aas) created_resource_url = map_adapter.build(self.delete_aas_submodel_refs_specific, { "aas_id": aas.id, "submodel_id": sm_ref.key[0].value @@ -606,6 +609,7 @@ def delete_aas_submodel_refs_specific( ) -> Response: aas = self._get_shell(url_args) aas.submodel.remove(self._get_submodel_reference(aas, url_args["submodel_id"])) + self.object_store.commit(aas) return response_t() def put_aas_submodel_refs_submodel( @@ -619,9 +623,11 @@ def put_aas_submodel_refs_submodel( id_changed: bool = submodel.id != new_submodel.id # TODO: https://github.com/eclipse-basyx/basyx-python-sdk/issues/216 submodel.update_from(new_submodel) + self.object_store.commit(submodel) if id_changed: aas.submodel.remove(sm_ref) aas.submodel.add(model.ModelReference.from_referable(submodel)) + self.object_store.commit(aas) return response_t() def delete_aas_submodel_refs_submodel( @@ -632,6 +638,7 @@ def delete_aas_submodel_refs_submodel( submodel = self._resolve_reference(sm_ref) self.object_store.remove(submodel) aas.submodel.remove(sm_ref) + self.object_store.commit(aas) return response_t() def aas_submodel_refs_redirect( @@ -708,6 +715,7 @@ def get_submodels_reference( def put_submodel(self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs) -> Response: submodel = self._get_submodel(url_args) submodel.update_from(HTTPApiDecoder.request_body(request, model.Submodel, is_stripped_request(request))) + self.object_store.commit(submodel) return response_t() def get_submodel_submodel_elements( @@ -775,6 +783,7 @@ def post_submodel_submodel_elements_id_short_path( raise Conflict( f"SubmodelElement with idShort {new_submodel_element.id_short} already exists " f"within {parent}!" ) + self.object_store.commit(self._get_submodel(url_args)) submodel = self._get_submodel(url_args) id_short_path = url_args.get("id_shorts", []) created_resource_url = map_adapter.build( @@ -794,6 +803,7 @@ def put_submodel_submodel_elements_id_short_path( request, model.SubmodelElement, is_stripped_request(request) # type: ignore[type-abstract] ) submodel_element.update_from(new_submodel_element) + self.object_store.commit(self._get_submodel(url_args)) return response_t() def delete_submodel_submodel_elements_id_short_path( @@ -802,6 +812,7 @@ def delete_submodel_submodel_elements_id_short_path( sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) parent: model.UniqueIdShortNamespace = self._expect_namespace(sm_or_se.parent, sm_or_se.id_short) self._namespace_submodel_element_op(parent, parent.remove_referable, sm_or_se.id_short) + self.object_store.commit(self._get_submodel(url_args)) return response_t() def get_submodel_submodel_element_attachment(self, request: Request, url_args: Dict, **_kwargs) -> Response: @@ -854,6 +865,7 @@ def put_submodel_submodel_element_attachment( ) submodel_element.value = self.file_store.add_file(filename, file_storage.stream, submodel_element.content_type) + self.object_store.commit(self._get_submodel(url_args)) return response_t() def delete_submodel_submodel_element_attachment( @@ -876,6 +888,7 @@ def delete_submodel_submodel_element_attachment( pass submodel_element.value = None + self.object_store.commit(self._get_submodel(url_args)) return response_t() def get_submodel_submodel_element_qualifiers( @@ -895,6 +908,7 @@ def post_submodel_submodel_element_qualifiers( if sm_or_se.qualifier.contains_id("type", qualifier.type): raise Conflict(f"Qualifier with type {qualifier.type} already exists!") sm_or_se.qualifier.add(qualifier) + self.object_store.commit(self._get_submodel(url_args)) created_resource_url = map_adapter.build( self.get_submodel_submodel_element_qualifiers, { @@ -918,6 +932,7 @@ def put_submodel_submodel_element_qualifiers( raise Conflict(f"A qualifier of type {new_qualifier.type!r} already exists for {sm_or_se!r}") sm_or_se.remove_qualifier_by_type(qualifier.type) sm_or_se.qualifier.add(new_qualifier) + self.object_store.commit(self._get_submodel(url_args)) if qualifier_type_changed: created_resource_url = map_adapter.build( self.get_submodel_submodel_element_qualifiers, @@ -937,6 +952,7 @@ def delete_submodel_submodel_element_qualifiers( sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) qualifier_type = url_args["qualifier_type"] self._qualifiable_qualifier_op(sm_or_se, sm_or_se.remove_qualifier_by_type, qualifier_type) + self.object_store.commit(self._get_submodel(url_args)) return response_t() # --------- CONCEPT DESCRIPTION ROUTES --------- @@ -976,6 +992,7 @@ def put_concept_description( concept_description.update_from( HTTPApiDecoder.request_body(request, model.ConceptDescription, is_stripped_request(request)) ) + self.object_store.commit(concept_description) return response_t() def delete_concept_description(