Skip to content

Commit f1a865b

Browse files
committed
Add LWTRetryPolicy: retry CAS timeouts on same host with backoff
LWT queries use Paxos consensus where the coordinator is the Paxos leader. Retrying on a different host causes Paxos contention — the new coordinator must compete with the original one, potentially causing cascading timeouts. LWTRetryPolicy (extends ExponentialBackoffRetryPolicy) handles this by: - CAS write timeouts: retry on SAME host with exponential backoff - Serial consistency read timeouts: retry on SAME host with backoff - Serial consistency unavailable: retry on NEXT host (paxos quorum lost) - Non-CAS operations: delegate to base ExponentialBackoffRetryPolicy Modeled after gocql's LWTRetryPolicy interface.
1 parent e2a9511 commit f1a865b

2 files changed

Lines changed: 369 additions & 1 deletion

File tree

cassandra/policies.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,120 @@ def on_request_error(self, query, consistency, error, retry_num):
11941194
return self.RETHROW, None, None
11951195

11961196

1197+
class LWTRetryPolicy(ExponentialBackoffRetryPolicy):
1198+
"""
1199+
A retry policy tailored for Lightweight Transaction (LWT) queries.
1200+
1201+
LWT queries use Paxos consensus, where the first replica in the token ring
1202+
acts as the Paxos coordinator (leader). Retrying LWT queries on a *different*
1203+
host causes Paxos contention — the new coordinator must compete with the
1204+
original one, potentially causing cascading timeouts.
1205+
1206+
This policy addresses that by:
1207+
1208+
- **CAS write timeouts**: Retrying on the **same host** (the Paxos coordinator)
1209+
with exponential backoff, giving the coordinator time to complete the Paxos round.
1210+
- **CAS read timeouts** (serial consistency): Retrying on the same host.
1211+
- **Unavailable at serial consistency**: Retrying on the **next host**, since the
1212+
Paxos phase failed on this node (not enough replicas alive to form quorum).
1213+
- **Non-CAS operations**: Delegating to the standard :class:`ExponentialBackoffRetryPolicy`
1214+
behavior.
1215+
1216+
This is modeled after gocql's ``LWTRetryPolicy`` interface, which retries LWT
1217+
queries on the same host to avoid Paxos contention.
1218+
1219+
Example usage::
1220+
1221+
from cassandra.cluster import Cluster
1222+
from cassandra.policies import LWTRetryPolicy
1223+
1224+
# Use as the default retry policy for the cluster
1225+
cluster = Cluster(
1226+
default_retry_policy=LWTRetryPolicy(max_num_retries=3)
1227+
)
1228+
1229+
# Or assign to a specific statement
1230+
statement.retry_policy = LWTRetryPolicy(max_num_retries=5)
1231+
1232+
:param max_num_retries: Maximum number of retry attempts (default: 3).
1233+
:param min_interval: Initial backoff delay in seconds (default: 0.1).
1234+
:param max_interval: Maximum backoff delay in seconds (default: 10.0).
1235+
"""
1236+
1237+
def __init__(self, max_num_retries=3, min_interval=0.1, max_interval=10.0,
1238+
*args, **kwargs):
1239+
super(LWTRetryPolicy, self).__init__(
1240+
max_num_retries=max_num_retries,
1241+
min_interval=min_interval,
1242+
max_interval=max_interval,
1243+
*args, **kwargs)
1244+
1245+
def on_write_timeout(self, query, consistency, write_type,
1246+
required_responses, received_responses, retry_num):
1247+
"""
1248+
For CAS (LWT) write timeouts, retry on the **same host** with exponential
1249+
backoff. Retrying on a different host would cause Paxos contention.
1250+
1251+
For non-CAS writes, delegates to the base ExponentialBackoffRetryPolicy
1252+
behavior (retry BATCH_LOG only, RETHROW otherwise).
1253+
"""
1254+
if retry_num >= self.max_num_retries:
1255+
return self.RETHROW, None, None
1256+
1257+
if write_type == WriteType.CAS:
1258+
# Retry on the SAME host — this is the Paxos coordinator.
1259+
# Moving to another host causes contention in the Paxos protocol.
1260+
return self.RETRY, consistency, self._calculate_backoff(retry_num)
1261+
1262+
# Non-CAS: delegate to parent (retries BATCH_LOG, rethrows others)
1263+
return super(LWTRetryPolicy, self).on_write_timeout(
1264+
query, consistency, write_type,
1265+
required_responses, received_responses, retry_num)
1266+
1267+
def on_read_timeout(self, query, consistency, required_responses,
1268+
received_responses, data_retrieved, retry_num):
1269+
"""
1270+
For reads at serial consistency (CAS reads), retry on the **same host**
1271+
with backoff.
1272+
1273+
For non-serial reads, delegates to the base ExponentialBackoffRetryPolicy
1274+
behavior.
1275+
"""
1276+
if retry_num >= self.max_num_retries:
1277+
return self.RETHROW, None, None
1278+
1279+
if ConsistencyLevel.is_serial(consistency):
1280+
# Serial read = CAS/Paxos read. Retry on same host.
1281+
return self.RETRY, consistency, self._calculate_backoff(retry_num)
1282+
1283+
# Non-serial: delegate to parent
1284+
return super(LWTRetryPolicy, self).on_read_timeout(
1285+
query, consistency, required_responses,
1286+
received_responses, data_retrieved, retry_num)
1287+
1288+
def on_unavailable(self, query, consistency, required_replicas,
1289+
alive_replicas, retry_num):
1290+
"""
1291+
For serial consistency (CAS/Paxos phase), retry on the **next host** —
1292+
this node couldn't form a Paxos quorum, so a different coordinator
1293+
might see a different set of available replicas.
1294+
1295+
For non-serial consistency, delegates to the base ExponentialBackoffRetryPolicy
1296+
behavior.
1297+
"""
1298+
if retry_num >= self.max_num_retries:
1299+
return self.RETHROW, None, None
1300+
1301+
if ConsistencyLevel.is_serial(consistency):
1302+
# Paxos phase failed — not enough replicas for serial quorum.
1303+
# Try a different coordinator; it might have better connectivity.
1304+
return self.RETRY_NEXT_HOST, consistency, self._calculate_backoff(retry_num)
1305+
1306+
# Non-serial: delegate to parent
1307+
return super(LWTRetryPolicy, self).on_unavailable(
1308+
query, consistency, required_replicas, alive_replicas, retry_num)
1309+
1310+
11971311
class AddressTranslator(object):
11981312
"""
11991313
Interface for translating cluster-defined endpoints.

tests/unit/test_policies.py

Lines changed: 255 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
RetryPolicy, WriteType,
3434
DowngradingConsistencyRetryPolicy, ConstantReconnectionPolicy,
3535
LoadBalancingPolicy, ConvictionPolicy, ReconnectionPolicy, FallthroughRetryPolicy,
36-
IdentityTranslator, EC2MultiRegionTranslator, HostFilterPolicy, ExponentialBackoffRetryPolicy)
36+
IdentityTranslator, EC2MultiRegionTranslator, HostFilterPolicy, ExponentialBackoffRetryPolicy,
37+
LWTRetryPolicy)
3738
from cassandra.connection import DefaultEndPoint, UnixSocketEndPoint
3839
from cassandra.pool import Host
3940
from cassandra.query import Statement
@@ -1408,6 +1409,259 @@ def test_calculate_backoff(self):
14081409
assert d < delay + (0.1 / 2), f"d={d} attempts={attempts}, delay={delay}"
14091410

14101411

1412+
class LWTRetryPolicyTest(unittest.TestCase):
1413+
"""Tests for LWTRetryPolicy — LWT-aware retry with same-host preference."""
1414+
1415+
def _make_policy(self, max_retries=3):
1416+
return LWTRetryPolicy(max_num_retries=max_retries)
1417+
1418+
# --- CAS write timeout: retry on SAME host ---
1419+
1420+
def test_cas_write_timeout_retries_same_host(self):
1421+
"""CAS write timeout on first attempt should retry on SAME host."""
1422+
policy = self._make_policy()
1423+
retry, consistency, delay = policy.on_write_timeout(
1424+
query=None, consistency=ConsistencyLevel.QUORUM,
1425+
write_type=WriteType.CAS,
1426+
required_responses=3, received_responses=1, retry_num=0)
1427+
assert retry == RetryPolicy.RETRY
1428+
assert consistency == ConsistencyLevel.QUORUM
1429+
assert delay is not None and delay > 0
1430+
1431+
def test_cas_write_timeout_retries_with_backoff(self):
1432+
"""CAS write timeout backoff delay should increase with retry_num."""
1433+
policy = self._make_policy(max_retries=5)
1434+
delays = []
1435+
for attempt in range(3):
1436+
_, _, delay = policy.on_write_timeout(
1437+
query=None, consistency=ConsistencyLevel.QUORUM,
1438+
write_type=WriteType.CAS,
1439+
required_responses=3, received_responses=1, retry_num=attempt)
1440+
delays.append(delay)
1441+
# Delays should generally increase (with some jitter tolerance)
1442+
# delay_0 ~ 0.1s, delay_1 ~ 0.2s, delay_2 ~ 0.4s
1443+
assert delays[0] < delays[2], (
1444+
f"Backoff should increase: delays={delays}")
1445+
1446+
def test_cas_write_timeout_max_retries_exceeded(self):
1447+
"""CAS write timeout should RETHROW when max retries exceeded."""
1448+
policy = self._make_policy(max_retries=2)
1449+
retry, consistency, delay = policy.on_write_timeout(
1450+
query=None, consistency=ConsistencyLevel.QUORUM,
1451+
write_type=WriteType.CAS,
1452+
required_responses=3, received_responses=1, retry_num=2)
1453+
assert retry == RetryPolicy.RETHROW
1454+
1455+
def test_cas_write_timeout_preserves_consistency(self):
1456+
"""CAS retry should preserve the original consistency level."""
1457+
policy = self._make_policy()
1458+
for cl in [ConsistencyLevel.QUORUM, ConsistencyLevel.LOCAL_QUORUM,
1459+
ConsistencyLevel.ONE, ConsistencyLevel.ALL]:
1460+
retry, consistency, _ = policy.on_write_timeout(
1461+
query=None, consistency=cl,
1462+
write_type=WriteType.CAS,
1463+
required_responses=3, received_responses=1, retry_num=0)
1464+
assert retry == RetryPolicy.RETRY
1465+
assert consistency == cl, f"Expected {cl}, got {consistency}"
1466+
1467+
# --- Non-CAS write timeout: delegate to parent ---
1468+
1469+
def test_simple_write_timeout_rethrows(self):
1470+
"""SIMPLE write timeout should RETHROW (same as base policy)."""
1471+
policy = self._make_policy()
1472+
retry, consistency, delay = policy.on_write_timeout(
1473+
query=None, consistency=ConsistencyLevel.QUORUM,
1474+
write_type=WriteType.SIMPLE,
1475+
required_responses=3, received_responses=1, retry_num=0)
1476+
assert retry == RetryPolicy.RETHROW
1477+
1478+
def test_batch_log_write_timeout_retries(self):
1479+
"""BATCH_LOG write timeout should retry (inherited from base)."""
1480+
policy = self._make_policy()
1481+
retry, consistency, delay = policy.on_write_timeout(
1482+
query=None, consistency=ConsistencyLevel.QUORUM,
1483+
write_type=WriteType.BATCH_LOG,
1484+
required_responses=3, received_responses=1, retry_num=0)
1485+
assert retry == RetryPolicy.RETRY
1486+
assert consistency == ConsistencyLevel.QUORUM
1487+
1488+
def test_counter_write_timeout_rethrows(self):
1489+
"""COUNTER write timeout should RETHROW (same as base policy)."""
1490+
policy = self._make_policy()
1491+
retry, consistency, delay = policy.on_write_timeout(
1492+
query=None, consistency=ConsistencyLevel.QUORUM,
1493+
write_type=WriteType.COUNTER,
1494+
required_responses=3, received_responses=1, retry_num=0)
1495+
assert retry == RetryPolicy.RETHROW
1496+
1497+
# --- Serial (CAS) read timeout: retry on SAME host ---
1498+
1499+
def test_serial_read_timeout_retries_same_host(self):
1500+
"""Read timeout at SERIAL consistency should retry on SAME host."""
1501+
policy = self._make_policy()
1502+
retry, consistency, delay = policy.on_read_timeout(
1503+
query=None, consistency=ConsistencyLevel.SERIAL,
1504+
required_responses=3, received_responses=1,
1505+
data_retrieved=False, retry_num=0)
1506+
assert retry == RetryPolicy.RETRY
1507+
assert consistency == ConsistencyLevel.SERIAL
1508+
assert delay is not None and delay > 0
1509+
1510+
def test_local_serial_read_timeout_retries_same_host(self):
1511+
"""Read timeout at LOCAL_SERIAL should retry on SAME host."""
1512+
policy = self._make_policy()
1513+
retry, consistency, delay = policy.on_read_timeout(
1514+
query=None, consistency=ConsistencyLevel.LOCAL_SERIAL,
1515+
required_responses=3, received_responses=1,
1516+
data_retrieved=False, retry_num=0)
1517+
assert retry == RetryPolicy.RETRY
1518+
assert consistency == ConsistencyLevel.LOCAL_SERIAL
1519+
assert delay is not None and delay > 0
1520+
1521+
def test_serial_read_timeout_max_retries_exceeded(self):
1522+
"""Serial read timeout should RETHROW when max retries exceeded."""
1523+
policy = self._make_policy(max_retries=1)
1524+
retry, consistency, delay = policy.on_read_timeout(
1525+
query=None, consistency=ConsistencyLevel.SERIAL,
1526+
required_responses=3, received_responses=1,
1527+
data_retrieved=False, retry_num=1)
1528+
assert retry == RetryPolicy.RETHROW
1529+
1530+
# --- Non-serial read timeout: delegate to parent ---
1531+
1532+
def test_non_serial_read_timeout_delegates_to_parent(self):
1533+
"""Non-serial read timeout should use base policy behavior."""
1534+
policy = self._make_policy()
1535+
# Base: retry if enough responses but no data
1536+
retry, consistency, delay = policy.on_read_timeout(
1537+
query=None, consistency=ConsistencyLevel.QUORUM,
1538+
required_responses=2, received_responses=2,
1539+
data_retrieved=False, retry_num=0)
1540+
assert retry == RetryPolicy.RETRY
1541+
assert consistency == ConsistencyLevel.QUORUM
1542+
1543+
# Base: rethrow if we got data
1544+
retry, consistency, delay = policy.on_read_timeout(
1545+
query=None, consistency=ConsistencyLevel.QUORUM,
1546+
required_responses=2, received_responses=2,
1547+
data_retrieved=True, retry_num=0)
1548+
assert retry == RetryPolicy.RETHROW
1549+
1550+
# --- Serial unavailable: retry on NEXT host ---
1551+
1552+
def test_serial_unavailable_retries_next_host(self):
1553+
"""Unavailable at SERIAL should retry on NEXT host."""
1554+
policy = self._make_policy()
1555+
retry, consistency, delay = policy.on_unavailable(
1556+
query=None, consistency=ConsistencyLevel.SERIAL,
1557+
required_replicas=3, alive_replicas=1, retry_num=0)
1558+
assert retry == RetryPolicy.RETRY_NEXT_HOST
1559+
assert consistency == ConsistencyLevel.SERIAL
1560+
assert delay is not None and delay > 0
1561+
1562+
def test_local_serial_unavailable_retries_next_host(self):
1563+
"""Unavailable at LOCAL_SERIAL should retry on NEXT host."""
1564+
policy = self._make_policy()
1565+
retry, consistency, delay = policy.on_unavailable(
1566+
query=None, consistency=ConsistencyLevel.LOCAL_SERIAL,
1567+
required_replicas=3, alive_replicas=1, retry_num=0)
1568+
assert retry == RetryPolicy.RETRY_NEXT_HOST
1569+
assert consistency == ConsistencyLevel.LOCAL_SERIAL
1570+
assert delay is not None and delay > 0
1571+
1572+
def test_serial_unavailable_max_retries_exceeded(self):
1573+
"""Serial unavailable should RETHROW when max retries exceeded."""
1574+
policy = self._make_policy(max_retries=1)
1575+
retry, consistency, delay = policy.on_unavailable(
1576+
query=None, consistency=ConsistencyLevel.SERIAL,
1577+
required_replicas=3, alive_replicas=1, retry_num=1)
1578+
assert retry == RetryPolicy.RETHROW
1579+
1580+
# --- Non-serial unavailable: delegate to parent ---
1581+
1582+
def test_non_serial_unavailable_delegates_to_parent(self):
1583+
"""Non-serial unavailable should use base policy behavior."""
1584+
policy = self._make_policy()
1585+
# Base: RETRY_NEXT_HOST on first attempt
1586+
retry, consistency, delay = policy.on_unavailable(
1587+
query=None, consistency=ConsistencyLevel.QUORUM,
1588+
required_replicas=3, alive_replicas=1, retry_num=0)
1589+
assert retry == RetryPolicy.RETRY_NEXT_HOST
1590+
1591+
# --- on_request_error: inherited from parent ---
1592+
1593+
def test_request_error_retries_next_host(self):
1594+
"""Request errors should retry on next host (inherited behavior)."""
1595+
policy = self._make_policy()
1596+
retry, consistency, delay = policy.on_request_error(
1597+
query=None, consistency=ConsistencyLevel.QUORUM,
1598+
error=Exception("overloaded"), retry_num=0)
1599+
assert retry == RetryPolicy.RETRY_NEXT_HOST
1600+
1601+
def test_request_error_max_retries_exceeded(self):
1602+
"""Request errors should RETHROW when max retries exceeded."""
1603+
policy = self._make_policy(max_retries=1)
1604+
retry, consistency, delay = policy.on_request_error(
1605+
query=None, consistency=ConsistencyLevel.QUORUM,
1606+
error=Exception("overloaded"), retry_num=1)
1607+
assert retry == RetryPolicy.RETHROW
1608+
1609+
# --- Constructor defaults ---
1610+
1611+
def test_default_constructor(self):
1612+
"""LWTRetryPolicy should have sensible defaults."""
1613+
policy = LWTRetryPolicy()
1614+
assert policy.max_num_retries == 3
1615+
assert policy.min_interval == 0.1
1616+
assert policy.max_interval == 10.0
1617+
1618+
def test_custom_constructor(self):
1619+
"""LWTRetryPolicy should accept custom parameters."""
1620+
policy = LWTRetryPolicy(max_num_retries=5, min_interval=0.5, max_interval=30.0)
1621+
assert policy.max_num_retries == 5
1622+
assert policy.min_interval == 0.5
1623+
assert policy.max_interval == 30.0
1624+
1625+
def test_inherits_exponential_backoff(self):
1626+
"""LWTRetryPolicy should inherit from ExponentialBackoffRetryPolicy."""
1627+
policy = LWTRetryPolicy()
1628+
assert isinstance(policy, ExponentialBackoffRetryPolicy)
1629+
assert isinstance(policy, RetryPolicy)
1630+
1631+
# --- Verify 3-tuple return format for all methods ---
1632+
1633+
def test_all_methods_return_3_tuples(self):
1634+
"""All retry decisions should return 3-tuples (decision, cl, delay)."""
1635+
policy = self._make_policy()
1636+
1637+
# CAS write timeout
1638+
result = policy.on_write_timeout(
1639+
query=None, consistency=ConsistencyLevel.QUORUM,
1640+
write_type=WriteType.CAS,
1641+
required_responses=3, received_responses=1, retry_num=0)
1642+
assert len(result) == 3, f"Expected 3-tuple, got {result}"
1643+
1644+
# Serial read timeout
1645+
result = policy.on_read_timeout(
1646+
query=None, consistency=ConsistencyLevel.SERIAL,
1647+
required_responses=3, received_responses=1,
1648+
data_retrieved=False, retry_num=0)
1649+
assert len(result) == 3, f"Expected 3-tuple, got {result}"
1650+
1651+
# Serial unavailable
1652+
result = policy.on_unavailable(
1653+
query=None, consistency=ConsistencyLevel.SERIAL,
1654+
required_replicas=3, alive_replicas=1, retry_num=0)
1655+
assert len(result) == 3, f"Expected 3-tuple, got {result}"
1656+
1657+
# RETHROW cases
1658+
result = policy.on_write_timeout(
1659+
query=None, consistency=ConsistencyLevel.QUORUM,
1660+
write_type=WriteType.SIMPLE,
1661+
required_responses=3, received_responses=1, retry_num=0)
1662+
assert len(result) == 3, f"Expected 3-tuple, got {result}"
1663+
1664+
14111665
class WhiteListRoundRobinPolicyTest(unittest.TestCase):
14121666

14131667
def test_hosts_with_hostname(self):

0 commit comments

Comments
 (0)