Skip to content

Commit bce50bc

Browse files
authored
Fix mutation persistence in persistent backends (#553)
PR #370 removed `Referable.commit()` and all call sites in the `server` handlers without replacing the write-back mechanism. Since then, any mutation to an object retrieved from `LocalFileIdentifiableStore`, `LocalFileDescriptorStore`, or `CouchDBIdentifiableStore` was silently lost on cache eviction, visible only within the same in-process `WeakValueDictionary` cache entry. A different uWSGI worker, or any request after the cache entry expired, would re-read the stale on-disk or on-database state. There was also a compounding bug: `get_item()` / `get_identifiable_by_hash()` always re-read from storage even on a cache hit, then called `update_from()` on the cached object, discarding any in-memory mutations even within the same request. This change fixes both issues across all three backends: - `get_identifiable_by_hash()` / `get_identifiable_by_couchdb_id()`: return the cached instance on a hit instead of overwriting it with a freshly-deserialized copy. - `get_item()`: check the cache first and return immediately on a hit. - Add `commit()` to `LocalFileIdentifiableStore` (re-serializes to .json), `LocalFileDescriptorStore` (same), and `CouchDBIdentifiableStore` (PUT with stored `_rev`, updates revision on success). - `AbstractObjectStore.commit()` is added as a no-op default so in-memory stores (`DictIdentifiableStore`) require no changes. All mutating handlers in `server/app/interfaces/repository.py` and `registry.py` now call `self.object_store.commit()` after each mutation. - A regression test `test_mutation_persistence` is added to `sdk/test/backend/test_local_file.py`. Fixes #552
1 parent efea046 commit bce50bc

7 files changed

Lines changed: 173 additions & 35 deletions

File tree

sdk/basyx/aas/backend/couchdb.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,22 +160,17 @@ def get_identifiable_by_couchdb_id(self, couchdb_id: str) -> model.Identifiable:
160160
raise KeyError("No Identifiable with couchdb-id {} found in CouchDB database".format(couchdb_id)) from e
161161
raise
162162

163-
# Add CouchDB metadata (for later commits) to object
164163
obj = data['data']
165164
if not isinstance(obj, model.Identifiable):
166165
raise CouchDBResponseError("The CouchDB document with id {} does not contain an identifiable AAS object."
167166
.format(couchdb_id))
168167
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(couchdb_id, safe='')),
169168
data["_rev"])
170169

171-
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
172-
# replication and return it.
173170
with self._object_cache_lock:
174171
if obj.id in self._object_cache:
175-
old_obj = self._object_cache[obj.id]
176-
old_obj.update_from(obj)
177-
return old_obj
178-
self._object_cache[obj.id] = obj
172+
return self._object_cache[obj.id]
173+
self._object_cache[obj.id] = obj
179174
return obj
180175

181176
def get_item(self, identifier: model.Identifier) -> model.Identifiable:
@@ -186,6 +181,9 @@ def get_item(self, identifier: model.Identifier) -> model.Identifiable:
186181
:raises CouchDBError: If error occur during the request to the CouchDB server
187182
(see ``_do_request()`` for details)
188183
"""
184+
with self._object_cache_lock:
185+
if identifier in self._object_cache:
186+
return self._object_cache[identifier]
189187
try:
190188
return self.get_identifiable_by_couchdb_id(self._transform_id(identifier, False))
191189
except KeyError as e:
@@ -220,6 +218,37 @@ def add(self, x: model.Identifiable) -> None:
220218
with self._object_cache_lock:
221219
self._object_cache[x.id] = x
222220

221+
def commit(self, x: model.Identifiable) -> None:
222+
"""
223+
Write the current in-memory state of a stored object back to the CouchDB.
224+
225+
:param x: The object to persist
226+
:raises KeyError: If the object is not present in the store or no revision is known
227+
:raises CouchDBConflictError: If the object was modified in the database since it was last fetched
228+
:raises CouchDBError: If error occur during the request to the CouchDB server
229+
(see ``_do_request()`` for details)
230+
"""
231+
doc_url = "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.id))
232+
rev = get_couchdb_revision(doc_url)
233+
if rev is None:
234+
raise KeyError("No revision found for object with id {} — not fetched from this store".format(x.id))
235+
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
236+
try:
237+
response = self._do_request(
238+
"{}?rev={}".format(doc_url, rev),
239+
'PUT',
240+
{'Content-type': 'application/json'},
241+
data.encode('utf-8'))
242+
set_couchdb_revision(doc_url, response["rev"])
243+
except CouchDBServerError as e:
244+
if e.code == 404:
245+
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.id)) from e
246+
elif e.code == 409:
247+
raise CouchDBConflictError(
248+
"Object with id {} has been modified in the database since it was last fetched."
249+
.format(x.id)) from e
250+
raise
251+
223252
def discard(self, x: model.Identifiable, safe_delete=False) -> None:
224253
"""
225254
Delete an :class:`~basyx.aas.model.base.Identifiable` AAS object from the CouchDB database

sdk/basyx/aas/backend/local_file.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import json
1717
import os
1818
import hashlib
19+
import tempfile
1920
import threading
2021
import warnings
2122
import weakref
@@ -31,6 +32,13 @@ class LocalFileIdentifiableStore(model.AbstractObjectStore[model.Identifier, mod
3132
"""
3233
An ObjectStore implementation for :class:`~basyx.aas.model.base.Identifiable` BaSyx Python SDK objects backed
3334
by a local file based local backend
35+
36+
.. warning::
37+
This backend is intended for development and testing only. It provides no
38+
concurrency control across processes: concurrent writes to the same object
39+
(e.g. under a multi-worker WSGI server) will silently overwrite each other,
40+
with the last writer winning and no error raised. Use a dedicated database
41+
backend for any production deployment.
3442
"""
3543
def __init__(self, directory_path: str):
3644
"""
@@ -68,21 +76,16 @@ def get_identifiable_by_hash(self, hash_: str) -> model.Identifiable:
6876
6977
:raises KeyError: If the respective file could not be found
7078
"""
71-
# Try to get the correct file
7279
try:
7380
with open("{}/{}.json".format(self.directory_path, hash_), "r") as file:
7481
data = json.load(file, cls=json_deserialization.AASFromJsonDecoder)
7582
obj = data["data"]
7683
except FileNotFoundError as e:
7784
raise KeyError("No Identifiable with hash {} found in local file database".format(hash_)) from e
78-
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
79-
# replication and return it.
8085
with self._object_cache_lock:
8186
if obj.id in self._object_cache:
82-
old_obj = self._object_cache[obj.id]
83-
old_obj.update_from(obj)
84-
return old_obj
85-
self._object_cache[obj.id] = obj
87+
return self._object_cache[obj.id]
88+
self._object_cache[obj.id] = obj
8689
return obj
8790

8891
def get_item(self, identifier: model.Identifier) -> model.Identifiable:
@@ -91,11 +94,33 @@ def get_item(self, identifier: model.Identifier) -> model.Identifiable:
9194
9295
:raises KeyError: If the respective file could not be found
9396
"""
97+
with self._object_cache_lock:
98+
if identifier in self._object_cache:
99+
return self._object_cache[identifier]
94100
try:
95101
return self.get_identifiable_by_hash(self._transform_id(identifier))
96102
except KeyError as e:
97103
raise KeyError("No Identifiable with id {} found in local file database".format(identifier)) from e
98104

105+
def _write_atomic(self, x: model.Identifiable) -> None:
106+
"""
107+
Serialize x to a temp file in the store directory, then atomically replace the final file.
108+
109+
Using os.replace() (rename(2) on POSIX) ensures readers always see a complete file — never
110+
a partially-written one from a crash or concurrent access mid-write.
111+
"""
112+
final_path = "{}/{}.json".format(self.directory_path, self._transform_id(x.id))
113+
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.directory_path, suffix=".tmp")
114+
try:
115+
with os.fdopen(tmp_fd, "w") as tmp_file:
116+
json.dump({"data": x}, tmp_file, cls=json_serialization.AASToJsonEncoder, indent=4)
117+
os.replace(tmp_path, final_path)
118+
# Catch all `Exception`s, as well as `KeyboardInterrupt` and `SystemExit` too, so the temp
119+
# file is never left behind even if the process is being torn down:
120+
except BaseException:
121+
os.unlink(tmp_path)
122+
raise
123+
99124
def add(self, x: model.Identifiable) -> None:
100125
"""
101126
Add an object to the store
@@ -105,10 +130,20 @@ def add(self, x: model.Identifiable) -> None:
105130
logger.debug("Adding object %s to Local File Store ...", repr(x))
106131
if os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
107132
raise KeyError("Identifiable with id {} already exists in local file database".format(x.id))
108-
with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file:
109-
json.dump({"data": x}, file, cls=json_serialization.AASToJsonEncoder, indent=4)
110-
with self._object_cache_lock:
111-
self._object_cache[x.id] = x
133+
self._write_atomic(x)
134+
with self._object_cache_lock:
135+
self._object_cache[x.id] = x
136+
137+
def commit(self, x: model.Identifiable) -> None:
138+
"""
139+
Write the current in-memory state of a stored object back to its file.
140+
141+
:param x: The object to persist
142+
:raises KeyError: If the object is not present in the store
143+
"""
144+
if not os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
145+
raise KeyError("No AAS object with id {} exists in local file database".format(x.id))
146+
self._write_atomic(x)
112147

113148
def discard(self, x: model.Identifiable) -> None:
114149
"""

sdk/basyx/aas/model/provider.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ class AbstractObjectStore(AbstractObjectProvider[_KEY, _VALUE], MutableSet[_VALU
5959
def __init__(self):
6060
pass
6161

62+
def commit(self, x: _VALUE) -> None:
63+
"""
64+
Persist an in-memory mutation of a stored object back to the underlying storage.
65+
66+
Persistent backends (e.g. file-based or database-backed stores) must override this to
67+
write the updated object back to storage. In-memory stores should override this with an
68+
explicit no-op to make the intent clear.
69+
70+
:param x: The object whose current in-memory state should be persisted
71+
"""
72+
raise NotImplementedError()
73+
6274
def update(self, other: Iterable[_VALUE]) -> None:
6375
for x in other:
6476
self.add(x)
@@ -146,6 +158,9 @@ def add(self, x: _IDENTIFIABLE) -> None:
146158
.format(x.id))
147159
self._backend[x.id] = x
148160

161+
def commit(self, x: _IDENTIFIABLE) -> None:
162+
pass
163+
149164
def discard(self, x: _IDENTIFIABLE) -> None:
150165
if self._backend.get(x.id) is x:
151166
del self._backend[x.id]
@@ -223,6 +238,9 @@ def add(self, x: _IDENTIFIABLE) -> None:
223238
else:
224239
raise KeyError(f"Identifiable object with same id {x.id} is already stored in this store")
225240

241+
def commit(self, x: _IDENTIFIABLE) -> None:
242+
pass
243+
226244
def discard(self, x: _IDENTIFIABLE) -> None:
227245
self._backend.discard(x)
228246

sdk/test/backend/test_local_file.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# the LICENSE file of this project.
55
#
66
# SPDX-License-Identifier: MIT
7+
import gc
78
import os.path
89
import shutil
910

@@ -134,6 +135,32 @@ def test_iter_ignores_non_json_files(self) -> None:
134135
self.assertEqual(5, len(items))
135136
os.remove(stray)
136137

138+
def test_mutation_persistence(self) -> None:
139+
submodel = model.Submodel(
140+
id_='https://example.org/MutationTest',
141+
submodel_element={
142+
model.Property(id_short='Prop', value_type=model.datatypes.String, value='before')
143+
}
144+
)
145+
self.identifiable_store.add(submodel)
146+
147+
retrieved = self.identifiable_store.get_item('https://example.org/MutationTest')
148+
assert isinstance(retrieved, model.Submodel)
149+
prop = retrieved.get_referable(['Prop'])
150+
assert isinstance(prop, model.Property)
151+
prop.update_from(model.Property(id_short='Prop', value_type=model.datatypes.String, value='after'))
152+
self.identifiable_store.commit(retrieved)
153+
154+
# Drop all strong references to evict the WeakValueDictionary cache
155+
del submodel, retrieved, prop
156+
gc.collect()
157+
158+
fresh = self.identifiable_store.get_item('https://example.org/MutationTest')
159+
assert isinstance(fresh, model.Submodel)
160+
fresh_prop = fresh.get_referable(['Prop'])
161+
assert isinstance(fresh_prop, model.Property)
162+
self.assertEqual('after', fresh_prop.value)
163+
137164
def test_reload_discard(self) -> None:
138165
# Load example submodel
139166
example_submodel = create_example_submodel()

server/app/backend/local_file.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,15 @@ def get_descriptor_by_hash(self, hash_: str) -> _DESCRIPTOR_TYPE:
6767
6868
:raises KeyError: If the respective file could not be found
6969
"""
70-
# Try to get the correct file
7170
try:
7271
with open("{}/{}.json".format(self.directory_path, hash_), "r") as file:
7372
obj = json.load(file, cls=jsonization.ServerAASFromJsonDecoder)
7473
except FileNotFoundError as e:
7574
raise KeyError("No Descriptor with hash {} found in local file database".format(hash_)) from e
76-
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
77-
# replication and return it.
7875
with self._object_cache_lock:
7976
if obj.id in self._object_cache:
80-
old_obj = self._object_cache[obj.id]
81-
old_obj.update_from(obj)
82-
return old_obj
83-
self._object_cache[obj.id] = obj
77+
return self._object_cache[obj.id]
78+
self._object_cache[obj.id] = obj
8479
return obj
8580

8681
def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE:
@@ -89,6 +84,9 @@ def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE:
8984
9085
:raises KeyError: If the respective file could not be found
9186
"""
87+
with self._object_cache_lock:
88+
if identifier in self._object_cache:
89+
return self._object_cache[identifier]
9290
try:
9391
return self.get_descriptor_by_hash(self._transform_id(identifier))
9492
except KeyError as e:
@@ -113,6 +111,20 @@ def add(self, x: _DESCRIPTOR_TYPE) -> None:
113111
with self._object_cache_lock:
114112
self._object_cache[x.id] = x
115113

114+
def commit(self, x: _DESCRIPTOR_TYPE) -> None:
115+
"""
116+
Write the current in-memory state of a stored descriptor back to its file.
117+
118+
:param x: The descriptor to persist
119+
:raises KeyError: If the descriptor is not present in the store
120+
"""
121+
if not os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
122+
raise KeyError("No AAS Descriptor object with id {} exists in local file database".format(x.id))
123+
with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file:
124+
serialized = json.loads(json.dumps(x, cls=jsonization.ServerAASToJsonEncoder))
125+
serialized["modelType"] = DESCRIPTOR_TYPE_TO_STRING[type(x)]
126+
json.dump(serialized, file, indent=4)
127+
116128
def discard(self, x: _DESCRIPTOR_TYPE) -> None:
117129
"""
118130
Delete an :class:`~app.model.descriptor.Descriptor` AAS object from the local file store

server/app/interfaces/registry.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def post_aas_descriptor(
180180
self.object_store.add(descriptor)
181181
except KeyError as e:
182182
raise Conflict(f"AssetAdministrationShellDescriptor with Identifier {descriptor.id} already exists!") from e
183-
descriptor.commit()
183+
self.object_store.commit(descriptor)
184184
created_resource_url = map_adapter.build(
185185
self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True
186186
)
@@ -202,12 +202,12 @@ def put_aas_descriptor_by_id(
202202
request, server_model.AssetAdministrationShellDescriptor, is_stripped_request(request)
203203
)
204204
)
205-
descriptor.commit()
205+
self.object_store.commit(descriptor)
206206
return response_t()
207207
except NotFound:
208208
descriptor = HTTPApiDecoder.request_body(request, server_model.AssetAdministrationShellDescriptor, False)
209209
self.object_store.add(descriptor)
210-
descriptor.commit()
210+
self.object_store.commit(descriptor)
211211
created_resource_url = map_adapter.build(
212212
self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True
213213
)
@@ -247,7 +247,7 @@ def post_submodel_descriptor_through_superpath(
247247
if any(sd.id == submodel_descriptor.id for sd in aas_descriptor.submodel_descriptors):
248248
raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!")
249249
aas_descriptor.submodel_descriptors.append(submodel_descriptor)
250-
aas_descriptor.commit()
250+
self.object_store.commit(aas_descriptor)
251251
created_resource_url = map_adapter.build(
252252
self.get_submodel_descriptor_by_id_through_superpath,
253253
{"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id},
@@ -269,14 +269,14 @@ def put_submodel_descriptor_by_id_through_superpath(
269269
submodel_descriptor.update_from(
270270
HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request))
271271
)
272-
aas_descriptor.commit()
272+
self.object_store.commit(aas_descriptor)
273273
return response_t()
274274
except NotFound:
275275
submodel_descriptor = HTTPApiDecoder.request_body(
276276
request, server_model.SubmodelDescriptor, is_stripped_request(request)
277277
)
278278
aas_descriptor.submodel_descriptors.append(submodel_descriptor)
279-
aas_descriptor.commit()
279+
self.object_store.commit(aas_descriptor)
280280
created_resource_url = map_adapter.build(
281281
self.get_submodel_descriptor_by_id_through_superpath,
282282
{"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id},
@@ -293,7 +293,7 @@ def delete_submodel_descriptor_by_id_through_superpath(
293293
if submodel_descriptor is None:
294294
raise NotFound(f"Submodel Descriptor with Identifier {submodel_id} not found in AssetAdministrationShell!")
295295
aas_descriptor.submodel_descriptors.remove(submodel_descriptor)
296-
aas_descriptor.commit()
296+
self.object_store.commit(aas_descriptor)
297297
return response_t()
298298

299299
# ------ Submodel REGISTRY ROUTES -------
@@ -321,7 +321,7 @@ def post_submodel_descriptor(
321321
self.object_store.add(submodel_descriptor)
322322
except KeyError as e:
323323
raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!") from e
324-
submodel_descriptor.commit()
324+
self.object_store.commit(submodel_descriptor)
325325
created_resource_url = map_adapter.build(
326326
self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True
327327
)
@@ -335,14 +335,14 @@ def put_submodel_descriptor_by_id(
335335
submodel_descriptor.update_from(
336336
HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request))
337337
)
338-
submodel_descriptor.commit()
338+
self.object_store.commit(submodel_descriptor)
339339
return response_t()
340340
except NotFound:
341341
submodel_descriptor = HTTPApiDecoder.request_body(
342342
request, server_model.SubmodelDescriptor, is_stripped_request(request)
343343
)
344344
self.object_store.add(submodel_descriptor)
345-
submodel_descriptor.commit()
345+
self.object_store.commit(submodel_descriptor)
346346
created_resource_url = map_adapter.build(
347347
self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True
348348
)

0 commit comments

Comments
 (0)