Skip to content

Commit 7f496c5

Browse files
committed
Add distributed lock to entry_property actions
Signed-off-by: Jacob Floyd <cognifloyd@gmail.com>
1 parent d4ff6f1 commit 7f496c5

File tree

3 files changed

+70
-64
lines changed

3 files changed

+70
-64
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ You can also use dynamic values from the datastore. See the
4949
* ``kv.upsert_entry_property`` - Update or insert a property in the named entry
5050
of a JSON serialized object. Then, serialize and store the updated object.
5151
The property's value may be any json-serializable type.
52-
Fails if the datastore key does not exist.
52+
Fails if the datastore key does not exist. A coordination backend is recommended.
5353
* ``kv.delete_entry_property`` - Delete a property from a named entry of a JSON
5454
serialized object in the datastore. If the entry is empty, delete it as well.
55-
Fails if the datastore key does not exist.
55+
Fails if the datastore key does not exist. A coordination backend is recommended.
5656

5757
Note: ``kv.set`` and ``kv.get`` actions support compressing value before
5858
storing it in a datastore and decompressing it when retrieving it from
Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22

3+
from st2common.services import coordination as coordination_service
4+
35
from lib.action import St2BaseAction
46

57
__all__ = [
@@ -10,38 +12,39 @@
1012
class St2KVPDeleteEntryPropertyAction(St2BaseAction):
1113
# noinspection PyShadowingBuiltins
1214
def run(self, key, entry, property):
13-
# get and deserialize object or fail.
14-
_key = self.client.keys.get_by_name(key, decrypt=False)
15-
16-
if not _key:
17-
raise Exception("Key does not exist in datastore")
15+
with coordination_service.get_coordinator().get_lock('st2.kv_entry.' + key):
16+
# get and deserialize object or fail.
17+
_key = self.client.keys.get_by_name(key, decrypt=False)
1818

19-
deserialized = json.loads(_key.value)
19+
if not _key:
20+
raise Exception("Key does not exist in datastore")
2021

21-
# delete object.entry.property
22-
_entry = deserialized.get(entry, {})
23-
try:
24-
del _entry[property]
25-
except KeyError:
26-
pass
22+
deserialized = json.loads(_key.value)
2723

28-
# delete object.entry if entry is empty
29-
if not _entry:
24+
# delete object.entry.property
25+
_entry = deserialized.get(entry, {})
3026
try:
31-
del deserialized[entry]
27+
del _entry[property]
3228
except KeyError:
3329
pass
3430

35-
# re-serialize and save
36-
serialized = json.dumps(deserialized)
37-
kvp = self._kvp(name=key, value=serialized)
38-
kvp.id = key
39-
40-
self.client.keys.update(kvp)
41-
response = {
42-
'key': key,
43-
'entry_name': entry,
44-
'entry': _entry,
45-
'entry_deleted': not _entry,
46-
}
47-
return response
31+
# delete object.entry if entry is empty
32+
if not _entry:
33+
try:
34+
del deserialized[entry]
35+
except KeyError:
36+
pass
37+
38+
# re-serialize and save
39+
serialized = json.dumps(deserialized)
40+
kvp = self._kvp(name=key, value=serialized)
41+
kvp.id = key
42+
43+
self.client.keys.update(kvp)
44+
response = {
45+
'key': key,
46+
'entry_name': entry,
47+
'entry': _entry,
48+
'entry_deleted': not _entry,
49+
}
50+
return response
Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22

3+
from st2common.services import coordination as coordination_service
4+
35
from lib.action import St2BaseAction
46

57
__all__ = [
@@ -10,37 +12,38 @@
1012
class St2KVPUpsertEntryPropertyAction(St2BaseAction):
1113
# noinspection PyShadowingBuiltins
1214
def run(self, key, entry, property, value):
13-
# get and deserialize object or fail.
14-
_key = self.client.keys.get_by_name(key, decrypt=False)
15-
16-
if not _key:
17-
raise Exception("Key does not exist in datastore")
18-
19-
# optimistically try to decode a json value
20-
try:
21-
value = json.loads(value)
22-
except (TypeError, ValueError):
23-
# assume it is either already decoded (TypeError)
24-
# or it is a plain string (ValueError)
25-
# (malformed JSON objects/arrays will be strings)
26-
pass
27-
28-
deserialized = json.loads(_key.value)
29-
30-
# update or insert object.entry.property
31-
_entry = deserialized.get(entry, {})
32-
_entry[property] = value
33-
deserialized[entry] = _entry
34-
35-
# re-serialize and save
36-
serialized = json.dumps(deserialized)
37-
kvp = self._kvp(name=key, value=serialized)
38-
kvp.id = key
39-
40-
self.client.keys.update(kvp)
41-
response = {
42-
'key': key,
43-
'entry_name': entry,
44-
'entry': _entry,
45-
}
46-
return response
15+
with coordination_service.get_coordinator().get_lock('st2.kv_entry.' + key):
16+
# get and deserialize object or fail.
17+
_key = self.client.keys.get_by_name(key, decrypt=False)
18+
19+
if not _key:
20+
raise Exception("Key does not exist in datastore")
21+
22+
# optimistically try to decode a json value
23+
try:
24+
value = json.loads(value)
25+
except (TypeError, ValueError):
26+
# assume it is either already decoded (TypeError)
27+
# or it is a plain string (ValueError)
28+
# (malformed JSON objects/arrays will be strings)
29+
pass
30+
31+
deserialized = json.loads(_key.value)
32+
33+
# update or insert object.entry.property
34+
_entry = deserialized.get(entry, {})
35+
_entry[property] = value
36+
deserialized[entry] = _entry
37+
38+
# re-serialize and save
39+
serialized = json.dumps(deserialized)
40+
kvp = self._kvp(name=key, value=serialized)
41+
kvp.id = key
42+
43+
self.client.keys.update(kvp)
44+
response = {
45+
'key': key,
46+
'entry_name': entry,
47+
'entry': _entry,
48+
}
49+
return response

0 commit comments

Comments
 (0)