Skip to content

Commit 3242df4

Browse files
committed
schema-agreement: avoid fallback after mismatch error
1 parent 3731dbb commit 3242df4

2 files changed

Lines changed: 39 additions & 3 deletions

File tree

cassandra/cluster.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ def _connection_reduce_fn(val,import_fn):
194194
_GRAPH_PAGING_MIN_DSE_VERSION = Version('6.8.0')
195195

196196
_NOT_SET = object()
197+
_SCHEMA_AGREEMENT_MISMATCHES_ATTR = '_schema_agreement_mismatches'
197198

198199

199200
class NoHostAvailable(Exception):
@@ -4337,6 +4338,7 @@ def _wait_for_schema_agreement(self, connection=None, preloaded_results=None, wa
43374338
if schema_mismatches is not None:
43384339
log.debug("[control connection] Error during schema agreement check after mismatch: %s",
43394340
exc)
4341+
setattr(exc, _SCHEMA_AGREEMENT_MISMATCHES_ATTR, schema_mismatches)
43404342
raise
43414343

43424344
fallback_wait = total_timeout - elapsed
@@ -4601,10 +4603,17 @@ def refresh_schema_and_set_result(control_conn, response_future, connection, **k
46014603
use_session_fallback = False
46024604
try:
46034605
response_future.is_schema_agreed = control_conn._refresh_schema(connection, **kwargs)
4604-
except Exception:
4606+
except Exception as exc:
46054607
log.exception("Exception refreshing schema in response to schema change:")
46064608
response_future.is_schema_agreed = False
4607-
use_session_fallback = True
4609+
schema_mismatches = getattr(exc, _SCHEMA_AGREEMENT_MISMATCHES_ATTR, _NOT_SET)
4610+
if schema_mismatches is not _NOT_SET:
4611+
log.debug("Skipping session schema agreement fallback after control connection "
4612+
"reported a schema disagreement: %s",
4613+
schema_mismatches)
4614+
response_future.session.submit(control_conn.refresh_schema, **kwargs)
4615+
else:
4616+
use_session_fallback = True
46084617

46094618
if use_session_fallback:
46104619
log.debug("Falling back to session schema agreement check")

tests/unit/test_control_connection.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType
2121
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS
22-
from cassandra.cluster import ControlConnection, _Scheduler, ProfileManager, EXEC_PROFILE_DEFAULT, ExecutionProfile
22+
from cassandra.cluster import (ControlConnection, _Scheduler, ProfileManager,
23+
EXEC_PROFILE_DEFAULT, ExecutionProfile,
24+
refresh_schema_and_set_result)
2325
from cassandra.pool import Host
2426
from cassandra.connection import (EndPoint, DefaultEndPoint, DefaultEndPointFactory,
2527
ConnectionException, ConnectionShutdown, ConnectionBusy)
@@ -352,6 +354,31 @@ def test_wait_for_schema_agreement_raises_connection_error_after_mismatch(self):
352354
with self.assertRaises(ConnectionShutdown):
353355
self.control_connection.wait_for_schema_agreement()
354356

357+
def test_schema_change_refresh_does_not_session_fallback_after_mismatch_then_connection_error(self):
358+
session = Mock(is_shutdown=False)
359+
session.wait_for_schema_agreement.return_value = True
360+
self.cluster.sessions = [session]
361+
self.cluster.metadata.refresh = Mock()
362+
363+
peer_columns = self.connection.peer_results[0]
364+
mismatching_peer_rows = [list(row) for row in self.connection.peer_results[1]]
365+
mismatching_peer_rows[1][2] = 'b'
366+
self.connection.wait_for_responses.side_effect = [
367+
_node_meta_results(self.connection.local_results, (peer_columns, mismatching_peer_rows)),
368+
ConnectionShutdown("closed")]
369+
370+
response_future = Mock()
371+
response_future.session = session
372+
event = {'target_type': SchemaTargetType.TABLE, 'change_type': SchemaChangeType.CREATED,
373+
'keyspace': "keyspace1", "table": "table1"}
374+
375+
refresh_schema_and_set_result(self.control_connection, response_future, self.connection, **event)
376+
377+
session.wait_for_schema_agreement.assert_not_called()
378+
self.cluster.metadata.refresh.assert_not_called()
379+
assert not response_future.is_schema_agreed
380+
response_future._set_final_result.assert_called_once_with(None)
381+
355382
def test_wait_for_schema_agreement_does_not_exceed_configured_wait_with_session_fallback(self):
356383
session = Mock(is_shutdown=False)
357384

0 commit comments

Comments
 (0)