-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_pubsub.py
More file actions
216 lines (154 loc) · 7.04 KB
/
test_pubsub.py
File metadata and controls
216 lines (154 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import threading
import uuid
import pytest
from sqlitecloud.datatypes import SQLITECLOUD_ERRCODE, SQLITECLOUD_PUBSUB_SUBJECT
from sqlitecloud.exceptions import SQLiteCloudError
from sqlitecloud.pubsub import SQLiteCloudPubSub
from sqlitecloud.resultset import SQLITECLOUD_RESULT_TYPE, SQLiteCloudResultSet
class TestPubSub:
def test_listen_channel_and_notify(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection
callback_called = False
flag = threading.Event()
def assert_callback(conn, result, data):
nonlocal callback_called
nonlocal flag
if isinstance(result, SQLiteCloudResultSet):
assert result.tag == SQLITECLOUD_RESULT_TYPE.RESULT_JSON
assert isinstance(result.get_result(), dict)
content = result.get_result()
assert content["channel"] == channel
assert len(content["payload"]) > 0
assert content["payload"] == "somedata2"
assert data == ["somedata"]
callback_called = True
flag.set()
pubsub = SQLiteCloudPubSub()
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL
channel = "channel" + str(uuid.uuid4())
pubsub.create_channel(connection, channel)
pubsub.listen(connection, subject_type, channel, assert_callback, ["somedata"])
pubsub.notify_channel(connection, channel, "somedata2")
# wait for callback to be called
flag.wait(30)
assert callback_called
def test_notify_multiple_messages(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection
called_times = 3
flag = threading.Event()
def assert_callback(conn, result, data):
nonlocal called_times
nonlocal flag
if isinstance(result, SQLiteCloudResultSet):
assert data == ["somedataX"]
called_times -= 1
if called_times == 0:
flag.set()
pubsub = SQLiteCloudPubSub()
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL
channel = "channel" + str(uuid.uuid4())
pubsub.create_channel(connection, channel)
pubsub.listen(connection, subject_type, channel, assert_callback, ["somedataX"])
pubsub.notify_channel(connection, channel, "somedataX")
pubsub.notify_channel(connection, channel, "somedataX")
pubsub.notify_channel(connection, channel, "somedataX")
# wait for callback to be called
flag.wait(30)
assert called_times == 0
def test_unlisten_channel(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection
pubsub = SQLiteCloudPubSub()
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL
channel_name = "channel" + str(uuid.uuid4())
pubsub.create_channel(connection, channel_name)
pubsub.listen(
connection, subject_type, channel_name, lambda conn, result, data: None
)
result = pubsub.list_connections(connection)
assert channel_name in result.data
pubsub.unlisten(connection, subject_type, channel_name)
result = pubsub.list_connections(connection)
assert channel_name not in result.data
assert connection.pubsub_callback is None
assert connection.pubsub_data is None
def test_create_channel_to_fail_if_exists(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection
pubsub = SQLiteCloudPubSub()
channel_name = "channel" + str(uuid.uuid4())
pubsub.create_channel(connection, channel_name, if_not_exists=True)
with pytest.raises(SQLiteCloudError) as e:
pubsub.create_channel(connection, channel_name, if_not_exists=False)
assert (
e.value.errmsg
== f"Cannot create channel {channel_name} because it already exists."
)
assert e.value.errcode == SQLITECLOUD_ERRCODE.GENERIC.value
def test_is_connected(self, sqlitecloud_connection):
connection, _ = sqlitecloud_connection
pubsub = SQLiteCloudPubSub()
channel_name = "channel" + str(uuid.uuid4())
assert not pubsub.is_connected(connection)
pubsub.create_channel(connection, channel_name, if_not_exists=True)
pubsub.listen(
connection,
SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL,
channel_name,
lambda conn, result, data: None,
)
assert pubsub.is_connected(connection)
def test_set_pubsub_only(self, sqlitecloud_connection):
connection, client = sqlitecloud_connection
callback_called = False
flag = threading.Event()
def assert_callback(conn, result, data):
nonlocal callback_called
nonlocal flag
if isinstance(result, SQLiteCloudResultSet):
assert result.get_result() is not None
callback_called = True
flag.set()
pubsub = SQLiteCloudPubSub()
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.CHANNEL
channel = "channel" + str(uuid.uuid4())
pubsub.create_channel(connection, channel, if_not_exists=True)
pubsub.listen(connection, subject_type, channel, assert_callback)
pubsub.set_pubsub_only(connection)
assert not client.is_connected(connection)
assert pubsub.is_connected(connection)
connection2 = client.open_connection()
try:
pubsub2 = SQLiteCloudPubSub()
pubsub2.notify_channel(connection2, channel, "message-in-a-bottle")
# wait for callback to be called
flag.wait(30)
assert callback_called
finally:
client.disconnect(connection2)
def test_listen_table_for_update(self, sqlitecloud_connection):
connection, client = sqlitecloud_connection
callback_called = False
flag = threading.Event()
def assert_callback(conn, result, data):
nonlocal callback_called
nonlocal flag
if isinstance(result, SQLiteCloudResultSet):
assert result.tag == SQLITECLOUD_RESULT_TYPE.RESULT_JSON
assert isinstance(result.get_result(), dict)
content = result.get_result()
assert content["channel"] == "genres"
assert len(content["payload"]) > 0
assert content["payload"][0]["Name"] == new_name
assert content["payload"][0]["type"] == "UPDATE"
assert data == ["somedata"]
callback_called = True
flag.set()
pubsub = SQLiteCloudPubSub()
subject_type = SQLITECLOUD_PUBSUB_SUBJECT.TABLE
new_name = "Rock" + str(uuid.uuid4())
pubsub.listen(connection, subject_type, "genres", assert_callback, ["somedata"])
client.exec_query(
f"UPDATE genres SET Name = '{new_name}' WHERE GenreId = 1;", connection
)
# wait for callback to be called
flag.wait(30)
assert callback_called