Skip to content

Commit e8b3ee6

Browse files
committed
Fix SimpleStatement.is_lwt(): detect LWT from CQL query string
SimpleStatement.is_lwt() always returned False, which meant LWT-aware routing (TokenAwarePolicy Paxos leader routing) and retry policies never activated for: - Raw CQL queries via session.execute(SimpleStatement(...)) - All cqlengine queries (which wrap everything in SimpleStatement) This adds regex-based LWT detection to SimpleStatement that matches: - INSERT ... IF NOT EXISTS - UPDATE/DELETE ... IF EXISTS - UPDATE/DELETE ... IF <column> <op> <value> (conditional) - UPDATE/DELETE ... IF "column" <op> <value> (quoted identifiers) - BEGIN BATCH ... IF NOT EXISTS ... APPLY BATCH DDL statements (CREATE/ALTER/DROP ... IF [NOT] EXISTS) are correctly excluded by checking that the first word is a DML verb. The result is cached after the first call. This is a best-effort heuristic; PreparedStatement continues to get the authoritative is_lwt flag from the server during PREPARE. Also updates the existing BatchStatement LWT test to use the new SimpleStatement.is_lwt() directly instead of a workaround subclass.
1 parent e2a9511 commit e8b3ee6

2 files changed

Lines changed: 212 additions & 9 deletions

File tree

cassandra/query.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,24 @@
2323
import re
2424
import struct
2525
import time
26+
27+
# Regex to detect LWT (Lightweight Transaction) queries in CQL strings.
28+
# Matches: INSERT ... IF NOT EXISTS, UPDATE/DELETE ... IF EXISTS,
29+
# and conditional updates/deletes (e.g. UPDATE ... IF col = ...,
30+
# UPDATE ... IF "col" = ...).
31+
# Uses word boundaries and case-insensitive matching.
32+
# This is a best-effort heuristic for SimpleStatement; PreparedStatement
33+
# gets the authoritative is_lwt flag from the server.
34+
_LWT_PATTERN = re.compile(
35+
r'\bIF\s+(?:NOT\s+)?EXISTS\b' # IF [NOT] EXISTS
36+
r'|\bIF\s+[a-zA-Z_"]', # IF <column_name> or IF "<column_name>" (conditional)
37+
re.IGNORECASE
38+
)
39+
40+
# DML verbs that can be LWT queries. Only these statement types can contain
41+
# Paxos/LWT clauses. DDL statements (CREATE/ALTER/DROP) also use IF [NOT] EXISTS
42+
# but those are not LWT operations.
43+
_LWT_DML_VERBS = frozenset({'INSERT', 'UPDATE', 'DELETE', 'BEGIN'})
2644
import warnings
2745

2846
from cassandra import ConsistencyLevel, OperationTimedOut
@@ -416,6 +434,32 @@ def __str__(self):
416434
(self.query_string, consistency))
417435
__repr__ = __str__
418436

437+
def is_lwt(self):
438+
"""
439+
Detect whether this query is a Lightweight Transaction (LWT) by
440+
inspecting the query string for ``IF [NOT] EXISTS`` or ``IF <condition>``
441+
clauses in DML statements (INSERT, UPDATE, DELETE, BEGIN BATCH).
442+
443+
DDL statements like ``CREATE TABLE IF NOT EXISTS`` are excluded.
444+
445+
This is a best-effort heuristic. For authoritative LWT detection,
446+
use :class:`.PreparedStatement` which gets the ``is_lwt`` flag from
447+
the server during PREPARE.
448+
449+
The result is cached after the first call.
450+
"""
451+
try:
452+
return self._cached_is_lwt
453+
except AttributeError:
454+
# Quick check: only DML statements can be LWT
455+
query = self._query_string.lstrip()
456+
first_word = query.split(None, 1)[0].upper() if query else ''
457+
if first_word not in _LWT_DML_VERBS:
458+
self._cached_is_lwt = False
459+
else:
460+
self._cached_is_lwt = bool(_LWT_PATTERN.search(query))
461+
return self._cached_is_lwt
462+
419463

420464
class PreparedStatement(object):
421465
"""

tests/unit/test_query.py

Lines changed: 168 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,174 @@ def test_is_lwt_propagates_from_statements(self):
103103
batch_with_bound.add(bound_lwt)
104104
assert batch_with_bound.is_lwt() is True
105105

106-
class LwtSimpleStatement(SimpleStatement):
107-
def __init__(self):
108-
super(LwtSimpleStatement, self).__init__(
109-
"INSERT INTO test.table (id) VALUES (2) IF NOT EXISTS"
110-
)
111-
112-
def is_lwt(self):
113-
return True
106+
# SimpleStatement now detects LWT from query string (no subclass needed)
107+
lwt_simple = SimpleStatement(
108+
"INSERT INTO test.table (id) VALUES (2) IF NOT EXISTS"
109+
)
110+
assert lwt_simple.is_lwt() is True
114111

115112
batch_with_simple = BatchStatement()
116-
batch_with_simple.add(LwtSimpleStatement())
113+
batch_with_simple.add(lwt_simple)
117114
assert batch_with_simple.is_lwt() is True
115+
116+
class SimpleStatementIsLwtTest(unittest.TestCase):
117+
"""Tests for SimpleStatement.is_lwt() CQL-based LWT detection."""
118+
119+
# --- INSERT IF NOT EXISTS ---
120+
121+
def test_insert_if_not_exists(self):
122+
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS")
123+
assert s.is_lwt() is True
124+
125+
def test_insert_if_not_exists_lowercase(self):
126+
s = SimpleStatement("insert into ks.t (a) values (1) if not exists")
127+
assert s.is_lwt() is True
128+
129+
def test_insert_if_not_exists_mixed_case(self):
130+
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) If Not Exists")
131+
assert s.is_lwt() is True
132+
133+
# --- UPDATE IF EXISTS ---
134+
135+
def test_update_if_exists(self):
136+
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF EXISTS")
137+
assert s.is_lwt() is True
138+
139+
# --- DELETE IF EXISTS ---
140+
141+
def test_delete_if_exists(self):
142+
s = SimpleStatement("DELETE FROM ks.t WHERE k=1 IF EXISTS")
143+
assert s.is_lwt() is True
144+
145+
# --- Conditional UPDATE (IF <column> = <value>) ---
146+
147+
def test_conditional_update_equals(self):
148+
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a = 2")
149+
assert s.is_lwt() is True
150+
151+
def test_conditional_update_not_equals(self):
152+
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a != 2")
153+
assert s.is_lwt() is True
154+
155+
def test_conditional_update_greater_than(self):
156+
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a > 2")
157+
assert s.is_lwt() is True
158+
159+
def test_conditional_update_multiple_conditions(self):
160+
s = SimpleStatement(
161+
"UPDATE ks.t SET a=1 WHERE k=1 IF a = 2 AND b = 3")
162+
assert s.is_lwt() is True
163+
164+
# --- Conditional DELETE ---
165+
166+
def test_conditional_delete(self):
167+
s = SimpleStatement("DELETE FROM ks.t WHERE k=1 IF a = 2")
168+
assert s.is_lwt() is True
169+
170+
# --- Non-LWT queries (should return False) ---
171+
172+
def test_select_not_lwt(self):
173+
s = SimpleStatement("SELECT * FROM ks.t WHERE k=1")
174+
assert s.is_lwt() is False
175+
176+
def test_insert_without_if(self):
177+
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1)")
178+
assert s.is_lwt() is False
179+
180+
def test_update_without_if(self):
181+
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1")
182+
assert s.is_lwt() is False
183+
184+
def test_delete_without_if(self):
185+
s = SimpleStatement("DELETE FROM ks.t WHERE k=1")
186+
assert s.is_lwt() is False
187+
188+
def test_create_table_with_if_not_exists(self):
189+
"""DDL IF NOT EXISTS is correctly excluded — only DML can be LWT."""
190+
s = SimpleStatement("CREATE TABLE IF NOT EXISTS ks.t (a int PRIMARY KEY)")
191+
assert s.is_lwt() is False
192+
193+
def test_create_index_if_not_exists(self):
194+
s = SimpleStatement("CREATE INDEX IF NOT EXISTS idx ON ks.t (a)")
195+
assert s.is_lwt() is False
196+
197+
def test_create_keyspace_if_not_exists(self):
198+
s = SimpleStatement(
199+
"CREATE KEYSPACE IF NOT EXISTS ks WITH replication = "
200+
"{'class': 'SimpleStrategy', 'replication_factor': 1}")
201+
assert s.is_lwt() is False
202+
203+
def test_drop_table_if_exists(self):
204+
s = SimpleStatement("DROP TABLE IF EXISTS ks.t")
205+
assert s.is_lwt() is False
206+
207+
def test_alter_table_not_lwt(self):
208+
s = SimpleStatement("ALTER TABLE ks.t ADD col int")
209+
assert s.is_lwt() is False
210+
211+
# --- Caching ---
212+
213+
def test_result_is_cached(self):
214+
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS")
215+
assert s.is_lwt() is True
216+
assert s.is_lwt() is True # should use cache
217+
assert s._cached_is_lwt is True
218+
219+
def test_non_lwt_result_is_cached(self):
220+
s = SimpleStatement("SELECT * FROM ks.t")
221+
assert s.is_lwt() is False
222+
assert s._cached_is_lwt is False
223+
224+
# --- Edge cases ---
225+
226+
def test_multiline_query(self):
227+
s = SimpleStatement("""
228+
INSERT INTO ks.t (a, b)
229+
VALUES (1, 2)
230+
IF NOT EXISTS
231+
""")
232+
assert s.is_lwt() is True
233+
234+
def test_extra_whitespace(self):
235+
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF EXISTS")
236+
assert s.is_lwt() is True
237+
238+
def test_tab_separated(self):
239+
s = SimpleStatement("DELETE FROM ks.t WHERE k=1\tIF\tEXISTS")
240+
assert s.is_lwt() is True
241+
242+
# --- Quoted identifiers ---
243+
244+
def test_conditional_with_quoted_identifier(self):
245+
s = SimpleStatement('UPDATE ks.t SET a=1 WHERE k=1 IF "my_col" = 2')
246+
assert s.is_lwt() is True
247+
248+
def test_conditional_delete_quoted_identifier(self):
249+
s = SimpleStatement('DELETE FROM ks.t WHERE k=1 IF "Col" = 2')
250+
assert s.is_lwt() is True
251+
252+
# --- BEGIN BATCH ---
253+
254+
def test_begin_batch_with_lwt(self):
255+
s = SimpleStatement(
256+
"BEGIN BATCH "
257+
"INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS "
258+
"APPLY BATCH")
259+
assert s.is_lwt() is True
260+
261+
def test_begin_batch_without_lwt(self):
262+
s = SimpleStatement(
263+
"BEGIN BATCH "
264+
"INSERT INTO ks.t (a) VALUES (1) "
265+
"APPLY BATCH")
266+
assert s.is_lwt() is False
267+
268+
# --- Leading whitespace ---
269+
270+
def test_leading_whitespace(self):
271+
s = SimpleStatement(" \n INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS")
272+
assert s.is_lwt() is True
273+
274+
def test_leading_whitespace_ddl(self):
275+
s = SimpleStatement(" \n CREATE TABLE IF NOT EXISTS ks.t (a int PRIMARY KEY)")
276+
assert s.is_lwt() is False

0 commit comments

Comments
 (0)