Skip to content

Commit 321e430

Browse files
authored
smoketests: Smoketest enable replication for existing database (#3138)
Adds a (basic) smoketest that shows that we can enable, disable and enable again replication on an existing, non-replicated database.
1 parent aa1e732 commit 321e430

3 files changed

Lines changed: 104 additions & 6 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3232
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
3333
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
3434
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
35+
use spacetimedb_durability::DurableOffset;
3536
use spacetimedb_execution::pipelined::PipelinedProject;
3637
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
3738
use spacetimedb_lib::identity::{AuthCtx, RequestId};
@@ -1233,6 +1234,10 @@ impl ModuleHost {
12331234
&self.replica_ctx().database
12341235
}
12351236

1237+
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
1238+
self.replica_ctx().relational_db.durable_tx_offset()
1239+
}
1240+
12361241
pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
12371242
self.module.replica_ctx()
12381243
}

smoketests/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def log_records(self, n):
216216
logs = self.spacetime("logs", "--format=json", "-n", str(n), "--", self.database_identity)
217217
return list(map(json.loads, logs.splitlines()))
218218

219-
def publish_module(self, domain=None, *, clear=True, capture_stderr=True):
219+
def publish_module(self, domain=None, *, clear=True, capture_stderr=True, num_replicas=None):
220220
print("publishing module", self.publish_module)
221221
publish_output = self.spacetime(
222222
"publish",
@@ -227,10 +227,11 @@ def publish_module(self, domain=None, *, clear=True, capture_stderr=True):
227227
# because the server address is `node` which doesn't look like `localhost` or `127.0.0.1`
228228
# and so the publish step prompts for confirmation.
229229
"--yes",
230+
*["--num-replicas", f"{num_replicas}"] if num_replicas is not None else [],
230231
capture_stderr=capture_stderr,
231232
)
232233
self.resolved_identity = re.search(r"identity: ([0-9a-fA-F]+)", publish_output)[1]
233-
self.database_identity = domain if domain is not None else self.resolved_identity
234+
self.database_identity = self.resolved_identity
234235

235236
@classmethod
236237
def reset_config(cls):

smoketests/tests/replication.py

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime, parse_sql_result
2-
from ..docker import DockerManager
3-
41
import time
5-
from typing import Callable
62
import unittest
3+
from typing import Callable
4+
import json
5+
6+
from .. import COMPOSE_FILE, Smoketest, random_string, requires_docker, spacetime, parse_sql_result
7+
from ..docker import DockerManager
78

89
def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2):
910
"""Retry a function on failure with delay."""
@@ -113,6 +114,18 @@ def ensure_leader_health(self, id):
113114
# TODO: Replace with confirmed read.
114115
time.sleep(0.6)
115116

117+
def wait_counter_value(self, id, value, max_attempts=10, delay=1):
118+
"""Wait for the value for `id` in the counter table to reach `value`"""
119+
120+
for _ in range(max_attempts):
121+
rows = self.sql(f"select * from counter where id={id}")
122+
if len(rows) >= 1 and int(rows[0]['value']) >= value:
123+
return
124+
else:
125+
time.sleep(delay)
126+
127+
raise ValueError(f"Counter {id} below {value}")
128+
116129

117130
def fail_leader(self, action='kill'):
118131
"""Force leader failure through either killing or network disconnect."""
@@ -240,6 +253,9 @@ def start(self, id: int, count: int):
240253
def collect_counter_rows(self):
241254
return int_vals(self.cluster.sql("select * from counter"))
242255

256+
def call_control(self, reducer, *args):
257+
self.spacetime("call", "spacetime-control", reducer, *map(json.dumps, args))
258+
243259

244260
class LeaderElection(ReplicationTest):
245261
def test_leader_election_in_loop(self):
@@ -393,3 +409,79 @@ def test_quorum_loss(self):
393409
with self.assertRaises(Exception):
394410
for i in range(1001):
395411
self.call("send_message", "terminal")
412+
413+
414+
class EnableReplication(ReplicationTest):
415+
AUTOPUBLISH = False
416+
417+
def __init__(self, *args, **kwargs):
418+
super().__init__(*args, **kwargs)
419+
420+
self.expected_counter_rows = []
421+
422+
def run_counter(self, id, n = 100):
423+
self.start(id, n)
424+
self.cluster.wait_counter_value(id, n)
425+
self.expected_counter_rows.append({"id": id, "value": n})
426+
self.assertEqual(self.collect_counter_rows(), self.expected_counter_rows)
427+
428+
def test_enable_replication(self):
429+
"""Tests enabling and disabling replication"""
430+
431+
self.add_me_as_admin()
432+
name = random_string()
433+
n = 100
434+
435+
self.publish_module(name, num_replicas = 1)
436+
self.cluster.wait_for_leader_change(None)
437+
438+
# start un-replicated
439+
self.run_counter(1, n)
440+
# enable replication
441+
self.call_control("enable_replication", {"Name": name}, 3)
442+
self.run_counter(2, n)
443+
# disable replication
444+
self.call_control("disable_replication", {"Name": name })
445+
self.run_counter(3, n)
446+
# enable it one more time
447+
self.call_control("enable_replication", {"Name": name}, 3)
448+
self.run_counter(4, n)
449+
450+
451+
class EnableReplicationSuspended(ReplicationTest):
452+
AUTOPUBLISH = False
453+
454+
def test_enable_replication_on_suspended_database(self):
455+
"""Tests that we can enable replication on a suspended database"""
456+
457+
self.add_me_as_admin()
458+
name = random_string()
459+
460+
self.publish_module(name, num_replicas = 1)
461+
self.cluster.wait_for_leader_change(None)
462+
self.cluster.ensure_leader_health(1)
463+
464+
id = self.cluster.get_db_id()
465+
466+
self.call_control("suspend_database", {"Name": name})
467+
# Database is now unreachable.
468+
with self.assertRaises(Exception):
469+
self.call("send_message", "hi")
470+
471+
self.call_control("enable_replication", {"Name": name}, 3)
472+
# Still unreachable until we call unsuspend.
473+
with self.assertRaises(Exception):
474+
self.call("send_message", "hi")
475+
476+
self.call_control("unsuspend_database", {"Name": name})
477+
self.cluster.wait_for_leader_change(None)
478+
self.cluster.ensure_leader_health(2)
479+
480+
# We can't direcly observe that there are indeed three replicas running,
481+
# so as a sanity check inspect the event log.
482+
rows = self.cluster.read_controldb(
483+
f"select message from staged_enable_replication_event where database_id={id}")
484+
self.assertEqual(rows, [
485+
{'message': '"bootstrap requested"'},
486+
{'message': '"bootstrap complete"'},
487+
])

0 commit comments

Comments
 (0)