|
23 | 23 | from cassandra.connection import Connection, ConnectionException |
24 | 24 | from cassandra.protocol import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage, |
25 | 25 | UnavailableErrorMessage, ResultMessage, QueryMessage, |
| 26 | + ExecuteMessage, |
26 | 27 | OverloadedErrorMessage, IsBootstrappingErrorMessage, |
27 | 28 | PreparedQueryNotFound, PrepareMessage, ServerError, |
28 | 29 | RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE, |
@@ -723,3 +724,253 @@ def test_single_host_query_plan_exhausted_after_one_retry(self): |
723 | 724 | # Instead, it should set a NoHostAvailable exception |
724 | 725 | assert rf._final_exception is not None |
725 | 726 | assert isinstance(rf._final_exception, NoHostAvailable) |
| 727 | + |
| 728 | + # ------------------------------------------------------------------------- |
| 729 | + # Helpers for SCYLLA_USE_METADATA_ID tests |
| 730 | + # ------------------------------------------------------------------------- |
| 731 | + |
| 732 | + def _make_rows_response(self, result_metadata_id=None, column_metadata=None): |
| 733 | + """ |
| 734 | + Return a real ResultMessage(kind=RESULT_KIND_ROWS) with all attributes |
| 735 | + that _set_result accesses pre-set, so it passes isinstance checks and |
| 736 | + doesn't trigger unexpected code paths. |
| 737 | + """ |
| 738 | + response = ResultMessage(kind=RESULT_KIND_ROWS) |
| 739 | + response.paging_state = None |
| 740 | + response.column_names = ['col'] |
| 741 | + response.parsed_rows = [] |
| 742 | + response.column_types = [] |
| 743 | + response.column_metadata = column_metadata |
| 744 | + response.result_metadata_id = result_metadata_id |
| 745 | + response.trace_id = None |
| 746 | + response.warnings = None |
| 747 | + response.custom_payload = None |
| 748 | + return response |
| 749 | + |
| 750 | + def _make_execute_response_future(self, session, connection, prepared_statement): |
| 751 | + """ |
| 752 | + Return a ResponseFuture whose message is an ExecuteMessage and which |
| 753 | + has a prepared_statement set so that _query()'s feature-gating logic |
| 754 | + is exercised. |
| 755 | + """ |
| 756 | + execute_msg = ExecuteMessage(b'qid', [], ConsistencyLevel.ONE) |
| 757 | + query = SimpleStatement("SELECT * FROM foo") |
| 758 | + rf = ResponseFuture( |
| 759 | + session, execute_msg, query, timeout=1, |
| 760 | + prepared_statement=prepared_statement, |
| 761 | + ) |
| 762 | + pool = session._pools.get.return_value |
| 763 | + pool.borrow_connection.return_value = (connection, 1) |
| 764 | + return rf |
| 765 | + |
| 766 | + # ------------------------------------------------------------------------- |
| 767 | + # _set_result: METADATA_CHANGED update path |
| 768 | + # ------------------------------------------------------------------------- |
| 769 | + |
| 770 | + def test_set_result_updates_metadata_when_metadata_changed(self): |
| 771 | + """ |
| 772 | + When the EXECUTE response carries a new result_metadata_id (server |
| 773 | + detected a schema change), _set_result must update both |
| 774 | + prepared_statement.result_metadata and prepared_statement.result_metadata_id. |
| 775 | + """ |
| 776 | + session = self.make_session() |
| 777 | + pool = session._pools.get.return_value |
| 778 | + connection = Mock(spec=Connection) |
| 779 | + connection.protocol_version = 4 |
| 780 | + connection.features = Mock() |
| 781 | + connection.features.use_metadata_id = False |
| 782 | + pool.borrow_connection.return_value = (connection, 1) |
| 783 | + |
| 784 | + old_meta = [('ks', 'tb', 'old_col', Mock())] |
| 785 | + new_meta = [('ks', 'tb', 'new_col', Mock())] |
| 786 | + ps = Mock() |
| 787 | + ps.result_metadata = old_meta |
| 788 | + ps.result_metadata_id = b'old_id' |
| 789 | + |
| 790 | + rf = self.make_response_future(session) |
| 791 | + rf.prepared_statement = ps |
| 792 | + rf.send_request() |
| 793 | + |
| 794 | + response = self._make_rows_response( |
| 795 | + result_metadata_id=b'new_id', |
| 796 | + column_metadata=new_meta, |
| 797 | + ) |
| 798 | + rf._set_result(None, None, None, response) |
| 799 | + |
| 800 | + assert ps.result_metadata is new_meta |
| 801 | + assert ps.result_metadata_id == b'new_id' |
| 802 | + |
| 803 | + def test_set_result_does_not_update_metadata_when_metadata_id_absent(self): |
| 804 | + """ |
| 805 | + When the EXECUTE response has no result_metadata_id (normal skip-meta |
| 806 | + path — server metadata unchanged), _set_result must leave the |
| 807 | + prepared_statement's cached metadata untouched. |
| 808 | + """ |
| 809 | + session = self.make_session() |
| 810 | + pool = session._pools.get.return_value |
| 811 | + connection = Mock(spec=Connection) |
| 812 | + connection.protocol_version = 4 |
| 813 | + connection.features = Mock() |
| 814 | + connection.features.use_metadata_id = False |
| 815 | + pool.borrow_connection.return_value = (connection, 1) |
| 816 | + |
| 817 | + old_meta = [('ks', 'tb', 'col', Mock())] |
| 818 | + ps = Mock() |
| 819 | + ps.result_metadata = old_meta |
| 820 | + ps.result_metadata_id = b'old_id' |
| 821 | + |
| 822 | + rf = self.make_response_future(session) |
| 823 | + rf.prepared_statement = ps |
| 824 | + rf.send_request() |
| 825 | + |
| 826 | + # result_metadata_id is None → server sent full metadata, no hash update |
| 827 | + response = self._make_rows_response( |
| 828 | + result_metadata_id=None, |
| 829 | + column_metadata=old_meta, |
| 830 | + ) |
| 831 | + rf._set_result(None, None, None, response) |
| 832 | + |
| 833 | + assert ps.result_metadata is old_meta |
| 834 | + assert ps.result_metadata_id == b'old_id' |
| 835 | + |
| 836 | + def test_set_result_warns_when_metadata_id_but_no_column_metadata(self): |
| 837 | + """ |
| 838 | + If the server sends a new result_metadata_id but no column metadata |
| 839 | + (protocol violation), _set_result must still update result_metadata_id |
| 840 | + and emit a WARNING log so the inconsistency is visible in logs. |
| 841 | + """ |
| 842 | + session = self.make_session() |
| 843 | + pool = session._pools.get.return_value |
| 844 | + connection = Mock(spec=Connection) |
| 845 | + connection.protocol_version = 4 |
| 846 | + connection.features = Mock() |
| 847 | + connection.features.use_metadata_id = False |
| 848 | + pool.borrow_connection.return_value = (connection, 1) |
| 849 | + |
| 850 | + ps = Mock() |
| 851 | + ps.result_metadata = [('ks', 'tb', 'col', Mock())] |
| 852 | + ps.result_metadata_id = b'old_id' |
| 853 | + |
| 854 | + rf = self.make_response_future(session) |
| 855 | + rf.prepared_statement = ps |
| 856 | + rf.send_request() |
| 857 | + |
| 858 | + # column_metadata is falsy (empty list) but result_metadata_id is set |
| 859 | + response = self._make_rows_response( |
| 860 | + result_metadata_id=b'new_id', |
| 861 | + column_metadata=[], |
| 862 | + ) |
| 863 | + |
| 864 | + with self.assertLogs('cassandra.cluster', level='WARNING') as log_ctx: |
| 865 | + rf._set_result(None, None, None, response) |
| 866 | + |
| 867 | + assert any('result_metadata_id' in msg for msg in log_ctx.output) |
| 868 | + # metadata_id is still updated even without column metadata |
| 869 | + assert ps.result_metadata_id == b'new_id' |
| 870 | + |
| 871 | + # ------------------------------------------------------------------------- |
| 872 | + # _query: per-connection feature gating for skip_meta / result_metadata_id |
| 873 | + # ------------------------------------------------------------------------- |
| 874 | + |
| 875 | + def test_query_sets_skip_meta_with_scylla_extension(self): |
| 876 | + """ |
| 877 | + When the borrowed connection has negotiated SCYLLA_USE_METADATA_ID and |
| 878 | + the prepared statement carries a result_metadata_id, _query() must set |
| 879 | + skip_meta=True and attach the metadata_id to the ExecuteMessage. |
| 880 | + """ |
| 881 | + session = self.make_basic_session() |
| 882 | + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] |
| 883 | + session._pools.get.return_value = self.make_pool() |
| 884 | + |
| 885 | + connection = Mock(spec=Connection) |
| 886 | + connection.protocol_version = 4 |
| 887 | + connection.features = Mock() |
| 888 | + connection.features.use_metadata_id = True |
| 889 | + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) |
| 890 | + |
| 891 | + ps = Mock() |
| 892 | + ps.result_metadata = [] |
| 893 | + ps.result_metadata_id = b'meta_hash' |
| 894 | + |
| 895 | + rf = self._make_execute_response_future(session, connection, ps) |
| 896 | + rf.send_request() |
| 897 | + |
| 898 | + assert rf.message.skip_meta is True |
| 899 | + assert rf.message.result_metadata_id == b'meta_hash' |
| 900 | + |
| 901 | + def test_query_no_skip_meta_without_extension(self): |
| 902 | + """ |
| 903 | + When the connection does NOT have SCYLLA_USE_METADATA_ID (and protocol |
| 904 | + is v4), _query() must leave skip_meta=False and result_metadata_id=None. |
| 905 | + """ |
| 906 | + session = self.make_basic_session() |
| 907 | + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] |
| 908 | + session._pools.get.return_value = self.make_pool() |
| 909 | + |
| 910 | + connection = Mock(spec=Connection) |
| 911 | + connection.protocol_version = 4 |
| 912 | + connection.features = Mock() |
| 913 | + connection.features.use_metadata_id = False |
| 914 | + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) |
| 915 | + |
| 916 | + ps = Mock() |
| 917 | + ps.result_metadata = [] |
| 918 | + ps.result_metadata_id = b'meta_hash' |
| 919 | + |
| 920 | + rf = self._make_execute_response_future(session, connection, ps) |
| 921 | + rf.send_request() |
| 922 | + |
| 923 | + assert rf.message.skip_meta is False |
| 924 | + assert rf.message.result_metadata_id is None |
| 925 | + |
| 926 | + def test_query_no_skip_meta_when_prepared_statement_has_no_metadata_id(self): |
| 927 | + """ |
| 928 | + Even if the connection supports SCYLLA_USE_METADATA_ID, if the prepared |
| 929 | + statement was created before the extension was active (result_metadata_id |
| 930 | + is None), _query() must NOT set skip_meta — the driver has no hash to send. |
| 931 | + """ |
| 932 | + session = self.make_basic_session() |
| 933 | + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] |
| 934 | + session._pools.get.return_value = self.make_pool() |
| 935 | + |
| 936 | + connection = Mock(spec=Connection) |
| 937 | + connection.protocol_version = 4 |
| 938 | + connection.features = Mock() |
| 939 | + connection.features.use_metadata_id = True # extension active on this connection |
| 940 | + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) |
| 941 | + |
| 942 | + ps = Mock() |
| 943 | + ps.result_metadata = [] |
| 944 | + ps.result_metadata_id = None # statement prepared without extension |
| 945 | + |
| 946 | + rf = self._make_execute_response_future(session, connection, ps) |
| 947 | + rf.send_request() |
| 948 | + |
| 949 | + assert rf.message.skip_meta is False |
| 950 | + assert rf.message.result_metadata_id is None |
| 951 | + |
| 952 | + def test_query_sets_skip_meta_for_protocol_v5(self): |
| 953 | + """ |
| 954 | + On a protocol-v5 connection (ProtocolVersion.uses_prepared_metadata), |
| 955 | + _query() must set skip_meta=True and send result_metadata_id even if |
| 956 | + the Scylla-specific use_metadata_id flag is False. |
| 957 | + """ |
| 958 | + session = self.make_basic_session() |
| 959 | + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] |
| 960 | + session._pools.get.return_value = self.make_pool() |
| 961 | + |
| 962 | + connection = Mock(spec=Connection) |
| 963 | + connection.protocol_version = 5 # CQL v5 — uses_prepared_metadata() is True |
| 964 | + connection.features = Mock() |
| 965 | + connection.features.use_metadata_id = False # Scylla extension not needed |
| 966 | + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) |
| 967 | + |
| 968 | + ps = Mock() |
| 969 | + ps.result_metadata = [] |
| 970 | + ps.result_metadata_id = b'v5_hash' |
| 971 | + |
| 972 | + rf = self._make_execute_response_future(session, connection, ps) |
| 973 | + rf.send_request() |
| 974 | + |
| 975 | + assert rf.message.skip_meta is True |
| 976 | + assert rf.message.result_metadata_id == b'v5_hash' |
0 commit comments