-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathstate_store.py
More file actions
129 lines (109 loc) · 4.49 KB
/
state_store.py
File metadata and controls
129 lines (109 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
dapr run python3 state_store.py
"""
import grpc
from dapr.clients import DaprClient
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
from dapr.clients.grpc._state import StateItem
with DaprClient() as d:
storeName = 'statestore'
key = 'key_1'
value = 'value_1'
updated_value = 'value_1_updated'
another_key = 'key_2'
another_value = 'value_2'
yet_another_key = 'key_3'
yet_another_value = 'value_3'
# Save single state.
d.save_state(store_name=storeName, key=key, value=value)
print(f'State store has successfully saved {value} with {key} as key')
# Save with an etag that is different from the one stored in the database.
try:
d.save_state(store_name=storeName, key=key, value=another_value, etag='9999')
except grpc.RpcError as err:
# StatusCode should be StatusCode.ABORTED.
print(f'Cannot save due to bad etag. ErrorCode={err.code()}')
# For detailed error messages from the dapr runtime:
# print(f"Details={err.details()})
# Save multiple states.
d.save_bulk_state(
store_name=storeName,
states=[
StateItem(key=another_key, value=another_value),
StateItem(key=yet_another_key, value=yet_another_value),
],
)
print(f'State store has successfully saved {another_value} with {another_key} as key')
print(f'State store has successfully saved {yet_another_value} with {yet_another_key} as key')
# Save bulk with etag that is different from the one stored in the database.
try:
d.save_bulk_state(
store_name=storeName,
states=[
StateItem(key=another_key, value=another_value, etag='999'),
StateItem(key=yet_another_key, value=yet_another_value, etag='999'),
],
)
except grpc.RpcError as err:
# StatusCode should be StatusCode.ABORTED.
print(f'Cannot save bulk due to bad etags. ErrorCode={err.code()}')
# For detailed error messages from the dapr runtime: # print(f"Details={err.details()})
# Get one state by key.
state = d.get_state(store_name=storeName, key=key, state_metadata={'metakey': 'metavalue'})
print(f'Got value={state.data} eTag={state.etag}')
# Transaction upsert
d.execute_state_transaction(
store_name=storeName,
operations=[
TransactionalStateOperation(
operation_type=TransactionOperationType.upsert,
key=key,
data=updated_value,
etag=state.etag,
),
TransactionalStateOperation(key=another_key, data=another_value),
TransactionalStateOperation(key=yet_another_key, data=yet_another_value),
],
)
# Batch get
items = d.get_bulk_state(
store_name=storeName, keys=[key, another_key], states_metadata={'metakey': 'metavalue'}
).items
print(f'Got items with etags: {[(i.data, i.etag) for i in items]}')
# Outbox pattern
# pass in the ("outbox.projection", "true") metadata to the transaction to enable the outbox pattern.
d.execute_state_transaction(
store_name=storeName,
operations=[
TransactionalStateOperation(
key='key1', data='val1', metadata={'outbox.projection': 'false'}
),
TransactionalStateOperation(
key='key1', data='val2', metadata={'outbox.projection': 'true'}
),
],
)
print('Transaction with outbox pattern executed successfully!')
val = d.get_state(store_name=storeName, key='key1').data
print(f'Got value after outbox pattern: {val}')
# Transaction delete
d.execute_state_transaction(
store_name=storeName,
operations=[
TransactionalStateOperation(operation_type=TransactionOperationType.delete, key=key),
TransactionalStateOperation(
operation_type=TransactionOperationType.delete, key=another_key
),
],
)
# Batch get
items = d.get_bulk_state(
store_name=storeName, keys=[key, another_key], states_metadata={'metakey': 'metavalue'}
).items
print(f'Got values after transaction delete: {[data.data for data in items]}')
# Delete one state by key.
d.delete_state(
store_name=storeName, key=yet_another_key, state_metadata={'metakey': 'metavalue'}
)
data = d.get_state(store_name=storeName, key=yet_another_key).data
print(f'Got value after delete: {data}')