1+ # Licensed to the Apache Software Foundation (ASF) under one
2+ # or more contributor license agreements. See the NOTICE file
3+ # distributed with this work for additional information
4+ # regarding copyright ownership. The ASF licenses this file
5+ # to you under the Apache License, Version 2.0 (the
6+ # "License"); you may not use this file except in compliance
7+ # with the License. You may obtain a copy of the License at
8+ #
9+ # http://www.apache.org/licenses/LICENSE-2.0
10+ #
11+ # Unless required by applicable law or agreed to in writing, software
12+ # distributed under the License is distributed on an "AS IS" BASIS,
13+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ # See the License for the specific language governing permissions and
15+ # limitations under the License.
16+
17+ import unittest
18+ from collections import OrderedDict
19+
20+ from cqlshlib .displaying import NO_COLOR_MAP
21+ from cqlshlib .formatting import (
22+ format_value_text ,
23+ format_value_list ,
24+ format_value_set ,
25+ format_value_tuple ,
26+ format_value_map ,
27+ format_value_utype ,
28+ CqlType
29+ )
30+
31+
32+ class _MockUDT :
33+ """ Mimics the driver's UDT shape (exposes _asdict()) without the
34+ identifier restrictions Python's namedtuple imposes on field names. """
35+ def __init__ (self , items ):
36+ self ._items = items
37+
38+ def _asdict (self ):
39+ return OrderedDict (self ._items )
40+
41+
42+ class TestFormatting (unittest .TestCase ):
43+
44+ def setUp (self ):
45+ self .fmt_kwargs = {
46+ 'encoding' : 'utf-8' ,
47+ 'colormap' : NO_COLOR_MAP ,
48+ 'date_time_format' : None ,
49+ 'float_precision' : 3 ,
50+ 'nullval' : 'null' ,
51+ 'decimal_sep' : '.' ,
52+ 'thousands_sep' : ',' ,
53+ 'boolean_styles' : None
54+ }
55+
56+ def test_format_value_text_control_chars (self ):
57+ """
58+ Test that control chars are escaped for terminal display (default),
59+ but preserved when escape_control_chars=False is passed (for CSV export).
60+ """
61+ self .assertEqual (
62+ format_value_text ("Hello World" , encoding = 'utf-8' , colormap = NO_COLOR_MAP ),
63+ "Hello World"
64+ )
65+
66+ test_string = "Hello\n World\x00 \t Test\r "
67+
68+ terminal_output = format_value_text (test_string , encoding = 'utf-8' , colormap = NO_COLOR_MAP )
69+ self .assertEqual (terminal_output , "Hello\\ nWorld\\ x00\\ tTest\\ r" )
70+
71+ csv_output = format_value_text (test_string , encoding = 'utf-8' , colormap = NO_COLOR_MAP , escape_control_chars = False )
72+ self .assertEqual (csv_output , test_string )
73+
74+ def test_format_value_list_control_chars (self ):
75+ """ Test control character propagation in lists """
76+ list_val = ["line1\n line2" , "null\x00 byte" ]
77+ cql_type = CqlType ('list<text>' )
78+
79+ terminal_output = format_value_list (list_val , cqltype = cql_type , ** self .fmt_kwargs )
80+ self .assertEqual (terminal_output , "['line1\\ nline2', 'null\\ x00byte']" )
81+
82+ csv_output = format_value_list (list_val , cqltype = cql_type , escape_control_chars = False , ** self .fmt_kwargs )
83+ self .assertEqual (csv_output , "['line1\n line2', 'null\x00 byte']" )
84+
85+ def test_format_value_map_control_chars (self ):
86+ """ Test control character propagation in map keys and values """
87+ map_val = {"key\n 1" : "val\x00 1" }
88+ cql_type = CqlType ('map<text, text>' )
89+
90+ terminal_output = format_value_map (map_val , cqltype = cql_type , ** self .fmt_kwargs )
91+ self .assertEqual (terminal_output , "{'key\\ n1': 'val\\ x001'}" )
92+
93+ csv_output = format_value_map (map_val , cqltype = cql_type , escape_control_chars = False , ** self .fmt_kwargs )
94+ self .assertEqual (csv_output , "{'key\n 1': 'val\x00 1'}" )
95+
96+ def test_udt_field_name_and_value_control_chars (self ):
97+ """ Test control character propagation in UDT field names and values """
98+ # The driver exposes UDT instances via an _asdict() shape; namedtuple
99+ # cannot be used here because UDT field names may contain characters
100+ # (e.g. '\n') that are not valid Python identifiers.
101+ udt_val = _MockUDT ([('field_a\n ' , 'val\n 1' ), ('field_b' , 'val\x00 2' )])
102+
103+ cql_type = CqlType ('text' )
104+ cql_type .sub_types = [CqlType ('text' ), CqlType ('text' )]
105+
106+ terminal_output = format_value_utype (udt_val , cqltype = cql_type , ** self .fmt_kwargs )
107+ self .assertEqual (terminal_output , "{field_a\\ n: 'val\\ n1', field_b: 'val\\ x002'}" )
108+
109+ csv_output = format_value_utype (udt_val , cqltype = cql_type , escape_control_chars = False , ** self .fmt_kwargs )
110+ self .assertEqual (csv_output , "{field_a\n : 'val\n 1', field_b: 'val\x00 2'}" )
111+
112+ def test_format_value_text_empty_string (self ):
113+ """ Empty strings pass through cleanly in both modes (no spurious
114+ characters introduced by the regex sub or the escape pipeline). """
115+ self .assertEqual (
116+ format_value_text ("" , encoding = 'utf-8' , colormap = NO_COLOR_MAP ),
117+ ""
118+ )
119+ self .assertEqual (
120+ format_value_text ("" , encoding = 'utf-8' , colormap = NO_COLOR_MAP , escape_control_chars = False ),
121+ ""
122+ )
123+
124+ def test_format_value_text_latin1_and_del_control_chars (self ):
125+ """ UNICODE_CONTROLCHARS_RE matches [\\ x00-\\ x1f\\ x7f-\\ xa0]: in addition
126+ to the common C0 controls, DEL (\\ x7f), C1 controls (e.g. \\ x80) and
127+ NBSP (\\ xa0) must also be escaped on terminals and preserved for CSV. """
128+ test_string = "del\x7f mid\x80 end\xa0 nbsp"
129+
130+ terminal_output = format_value_text (test_string , encoding = 'utf-8' , colormap = NO_COLOR_MAP )
131+ self .assertEqual (terminal_output , "del\\ x7fmid\\ x80end\\ xa0nbsp" )
132+
133+ csv_output = format_value_text (test_string , encoding = 'utf-8' , colormap = NO_COLOR_MAP ,
134+ escape_control_chars = False )
135+ self .assertEqual (csv_output , test_string )
136+
137+ def test_format_value_text_consecutive_control_chars (self ):
138+ """ A run of adjacent control chars must be escaped/preserved
139+ character-by-character, not collapsed. """
140+ test_string = "a\n \n \x00 \x00 b"
141+
142+ terminal_output = format_value_text (test_string , encoding = 'utf-8' , colormap = NO_COLOR_MAP )
143+ self .assertEqual (terminal_output , "a\\ n\\ n\\ x00\\ x00b" )
144+
145+ csv_output = format_value_text (test_string , encoding = 'utf-8' , colormap = NO_COLOR_MAP ,
146+ escape_control_chars = False )
147+ self .assertEqual (csv_output , test_string )
148+
149+ def test_format_value_tuple_control_chars (self ):
150+ """ format_value_tuple delegates to format_simple_collection; verify
151+ the flag propagates to its element formatters. """
152+ tuple_val = ("a\n " , "b\x00 " )
153+ cql_type = CqlType ('tuple<text, text>' )
154+
155+ terminal_output = format_value_tuple (tuple_val , cqltype = cql_type , ** self .fmt_kwargs )
156+ self .assertEqual (terminal_output , "('a\\ n', 'b\\ x00')" )
157+
158+ csv_output = format_value_tuple (tuple_val , cqltype = cql_type , escape_control_chars = False ,
159+ ** self .fmt_kwargs )
160+ self .assertEqual (csv_output , "('a\n ', 'b\x00 ')" )
161+
162+ def test_format_value_set_control_chars (self ):
163+ """ format_value_set delegates to format_simple_collection. A list is
164+ passed here because format_simple_collection just iterates val and
165+ CPython set iteration order depends on PYTHONHASHSEED. """
166+ set_val = ["a\n " , "b\x00 " ]
167+ cql_type = CqlType ('set<text>' )
168+
169+ terminal_output = format_value_set (set_val , cqltype = cql_type , ** self .fmt_kwargs )
170+ self .assertEqual (terminal_output , "{'a\\ n', 'b\\ x00'}" )
171+
172+ csv_output = format_value_set (set_val , cqltype = cql_type , escape_control_chars = False ,
173+ ** self .fmt_kwargs )
174+ self .assertEqual (csv_output , "{'a\n ', 'b\x00 '}" )
175+
176+ def test_nested_map_of_list_control_chars (self ):
177+ """ Two-level nesting (map<text, list<text>>): the flag must propagate
178+ through the outer map's subformat() into the inner list's element
179+ formatters as well. Guards against regressions where the flag is
180+ forwarded at one level but dropped at the next. """
181+ nested_val = {"key\n 1" : ["v\x00 1" , "v\n 2" ]}
182+ cql_type = CqlType ('map<text, list<text>>' )
183+
184+ terminal_output = format_value_map (nested_val , cqltype = cql_type , ** self .fmt_kwargs )
185+ self .assertEqual (terminal_output , "{'key\\ n1': ['v\\ x001', 'v\\ n2']}" )
186+
187+ csv_output = format_value_map (nested_val , cqltype = cql_type , escape_control_chars = False ,
188+ ** self .fmt_kwargs )
189+ self .assertEqual (csv_output , "{'key\n 1': ['v\x00 1', 'v\n 2']}" )
0 commit comments