Skip to content

Commit 90ae954

Browse files
committed
schema-agreement: avoid fallback after mismatch error
1 parent dfaa31c commit 90ae954

2 files changed

Lines changed: 39 additions & 3 deletions

File tree

cassandra/cluster.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ def _connection_reduce_fn(val,import_fn):
194194
_GRAPH_PAGING_MIN_DSE_VERSION = Version('6.8.0')
195195

196196
_NOT_SET = object()
197+
_SCHEMA_AGREEMENT_MISMATCHES_ATTR = '_schema_agreement_mismatches'
197198

198199

199200
class NoHostAvailable(Exception):
@@ -4255,6 +4256,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42554256
if schema_mismatches is not None:
42564257
log.debug("[control connection] Error during schema agreement check after mismatch: %s",
42574258
exc)
4259+
setattr(exc, _SCHEMA_AGREEMENT_MISMATCHES_ATTR, schema_mismatches)
42584260
raise
42594261

42604262
fallback_wait = total_timeout - elapsed
@@ -4519,10 +4521,17 @@ def refresh_schema_and_set_result(control_conn, response_future, connection, **k
45194521
use_session_fallback = False
45204522
try:
45214523
response_future.is_schema_agreed = control_conn._refresh_schema(connection, **kwargs)
4522-
except Exception:
4524+
except Exception as exc:
45234525
log.exception("Exception refreshing schema in response to schema change:")
45244526
response_future.is_schema_agreed = False
4525-
use_session_fallback = True
4527+
schema_mismatches = getattr(exc, _SCHEMA_AGREEMENT_MISMATCHES_ATTR, _NOT_SET)
4528+
if schema_mismatches is not _NOT_SET:
4529+
log.debug("Skipping session schema agreement fallback after control connection "
4530+
"reported a schema disagreement: %s",
4531+
schema_mismatches)
4532+
response_future.session.submit(control_conn.refresh_schema, **kwargs)
4533+
else:
4534+
use_session_fallback = True
45264535

45274536
if use_session_fallback:
45284537
log.debug("Falling back to session schema agreement check")

tests/unit/test_control_connection.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType
2121
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS
22-
from cassandra.cluster import ControlConnection, _Scheduler, ProfileManager, EXEC_PROFILE_DEFAULT, ExecutionProfile
22+
from cassandra.cluster import (ControlConnection, _Scheduler, ProfileManager,
23+
EXEC_PROFILE_DEFAULT, ExecutionProfile,
24+
refresh_schema_and_set_result)
2325
from cassandra.pool import Host
2426
from cassandra.connection import (EndPoint, DefaultEndPoint, DefaultEndPointFactory,
2527
ConnectionException, ConnectionShutdown, ConnectionBusy)
@@ -343,6 +345,31 @@ def test_wait_for_schema_agreement_raises_connection_error_after_mismatch(self):
343345
with self.assertRaises(ConnectionShutdown):
344346
self.control_connection.wait_for_schema_agreement()
345347

348+
def test_schema_change_refresh_does_not_session_fallback_after_mismatch_then_connection_error(self):
349+
session = Mock(is_shutdown=False)
350+
session.wait_for_schema_agreement.return_value = True
351+
self.cluster.sessions = [session]
352+
self.cluster.metadata.refresh = Mock()
353+
354+
peer_columns = self.connection.peer_results[0]
355+
mismatching_peer_rows = [list(row) for row in self.connection.peer_results[1]]
356+
mismatching_peer_rows[1][2] = 'b'
357+
self.connection.wait_for_responses.side_effect = [
358+
_node_meta_results(self.connection.local_results, (peer_columns, mismatching_peer_rows)),
359+
ConnectionShutdown("closed")]
360+
361+
response_future = Mock()
362+
response_future.session = session
363+
event = {'target_type': SchemaTargetType.TABLE, 'change_type': SchemaChangeType.CREATED,
364+
'keyspace': "keyspace1", "table": "table1"}
365+
366+
refresh_schema_and_set_result(self.control_connection, response_future, self.connection, **event)
367+
368+
session.wait_for_schema_agreement.assert_not_called()
369+
self.cluster.metadata.refresh.assert_not_called()
370+
assert not response_future.is_schema_agreed
371+
response_future._set_final_result.assert_called_once_with(None)
372+
346373
def test_wait_for_schema_agreement_does_not_exceed_configured_wait_with_session_fallback(self):
347374
session = Mock(is_shutdown=False)
348375

0 commit comments

Comments
 (0)