Skip to content

Commit cea1f1a

Browse files
author
Kazantsev Maksim
committed
Add more edge cases
1 parent 53a1418 commit cea1f1a

2 files changed

Lines changed: 146 additions & 31 deletions

File tree

native/spark-expr/src/csv_funcs/to_csv.rs

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ pub fn to_csv_inner(
150150
let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
151151
let mut csv_string = String::with_capacity(array.len() * 16);
152152

153-
let quote = write_options.quote.as_ref();
153+
let quote_char = write_options.quote.chars().next().unwrap_or('"');
154+
let escape_char = write_options.escape.chars().next().unwrap_or('\\');
154155
for row_idx in 0..array.len() {
155156
if array.is_null(row_idx) {
156157
builder.append_null();
@@ -160,34 +161,49 @@ pub fn to_csv_inner(
160161
if col_idx > 0 {
161162
csv_string.push_str(&write_options.delimiter);
162163
}
163-
let mut value = column.value(row_idx);
164-
let is_string_field = is_string[col_idx];
165-
if is_string_field {
166-
if write_options.ignore_leading_white_space {
167-
value = value.trim_start();
164+
if column.is_null(row_idx) {
165+
if write_options.quote_all {
166+
csv_string.push(quote_char);
168167
}
169-
if write_options.ignore_trailing_white_space {
170-
value = value.trim_end();
168+
csv_string.push_str(&write_options.null_value);
169+
if write_options.quote_all {
170+
csv_string.push(quote_char);
171171
}
172-
}
173-
let needs_quoting = write_options.quote_all
174-
|| (is_string_field
175-
&& !string_arrays[col_idx].is_null(row_idx)
176-
&& (value.contains(&write_options.delimiter)
177-
|| value.contains(quote)
178-
|| value.is_empty()));
179-
180-
let needs_escaping = is_string_field && needs_quoting;
181-
if needs_quoting {
182-
csv_string.push_str(quote);
183-
}
184-
if needs_escaping {
185-
escape_value(value, quote, &write_options.escape, &mut csv_string);
186172
} else {
187-
csv_string.push_str(value);
188-
}
189-
if needs_quoting {
190-
csv_string.push_str(quote);
173+
let mut value = column.value(row_idx);
174+
let is_string_field = is_string[col_idx];
175+
176+
if is_string_field {
177+
if write_options.ignore_leading_white_space {
178+
value = value.trim_start();
179+
}
180+
if write_options.ignore_trailing_white_space {
181+
value = value.trim_end();
182+
}
183+
}
184+
185+
let needs_quoting = write_options.quote_all
186+
|| (is_string_field
187+
&& (value.contains(&write_options.delimiter)
188+
|| value.contains(quote_char)
189+
|| value.contains('\n')
190+
|| value.contains('\r'))
191+
|| value.is_empty());
192+
193+
let needs_escaping = needs_quoting
194+
&& (value.contains(quote_char) || value.contains(escape_char));
195+
196+
if needs_quoting {
197+
csv_string.push(quote_char);
198+
}
199+
if needs_escaping {
200+
escape_value(value, quote_char, escape_char, &mut csv_string);
201+
} else {
202+
csv_string.push_str(value);
203+
}
204+
if needs_quoting {
205+
csv_string.push(quote_char);
206+
}
191207
}
192208
}
193209
builder.append_value(&csv_string);
@@ -197,11 +213,10 @@ pub fn to_csv_inner(
197213
}
198214

199215
#[inline]
200-
fn escape_value(value: &str, quote: &str, escape: &str, output: &mut String) {
216+
fn escape_value(value: &str, quote_char: char, escape_char: char, output: &mut String) {
201217
for ch in value.chars() {
202-
let ch_str = ch.to_string();
203-
if ch_str == quote || ch_str == escape {
204-
output.push_str(escape);
218+
if ch == quote_char || ch == escape_char {
219+
output.push(escape_char);
205220
}
206221
output.push(ch);
207222
}

spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.CometTestBase
2727
import org.apache.spark.sql.catalyst.expressions.StructsToCsv
2828
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2929
import org.apache.spark.sql.functions._
30+
import org.apache.spark.sql.types.StringType
3031

3132
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
3233

@@ -71,15 +72,44 @@ class CometCsvExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper
7172
val table = "t1"
7273
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
7374
withTable(table) {
75+
val newLinesStr =
76+
""" abc
77+
| bcde""".stripMargin
7478
sql(s"create table $table(col string) using parquet")
7579
sql(s"insert into $table values('')")
7680
sql(s"insert into $table values(cast(null as string))")
7781
sql(s"insert into $table values(' abc')")
7882
sql(s"insert into $table values('abc ')")
7983
sql(s"insert into $table values(' abc ')")
8084
sql(s"""insert into $table values('abc \"abc\"')""")
81-
val df = sql(s"select * from $table")
85+
sql(s"""insert into $table values('$newLinesStr')""")
86+
sql(s"""insert into $table values('abc,def')""")
87+
sql(s"""insert into $table values('abc;def;ghi')""")
88+
sql(s"""insert into $table values('abc\tdef')""")
89+
sql(s"""insert into $table values('a"b"c')""")
90+
sql(s"""insert into $table values('"quoted"')""")
91+
sql(s"""insert into $table values('line1\nline2')""")
92+
sql(s"""insert into $table values('line1\rline2')""")
93+
sql(s"""insert into $table values('line1\r\nline2')""")
94+
sql(s"""insert into $table values('a''b')""")
95+
sql(s"""insert into $table values('a\\\\b')""")
96+
97+
val df = sql(s"select * from $table order by col")
98+
99+
// Default options
82100
checkSparkAnswerAndOperator(df.select(to_csv(struct(col("col"), lit(1)))))
101+
102+
// Custom delimiter
103+
checkSparkAnswerAndOperator(
104+
df.select(to_csv(struct(col("col"), lit(1)), Map("delimiter" -> ";").asJava)))
105+
106+
checkSparkAnswerAndOperator(
107+
df.select(to_csv(struct(col("col"), lit(1)), Map("delimiter" -> "|").asJava)))
108+
109+
checkSparkAnswerAndOperator(
110+
df.select(to_csv(struct(col("col"), lit(1)), Map("delimiter" -> "\t").asJava)))
111+
112+
// Whitespace handling
83113
checkSparkAnswerAndOperator(
84114
df.select(
85115
to_csv(
@@ -88,8 +118,78 @@ class CometCsvExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper
88118
"delimiter" -> ";",
89119
"ignoreLeadingWhiteSpace" -> "false",
90120
"ignoreTrailingWhiteSpace" -> "false").asJava)))
121+
122+
checkSparkAnswerAndOperator(
123+
df.select(
124+
to_csv(
125+
struct(col("col"), lit(1)),
126+
Map(
127+
"ignoreLeadingWhiteSpace" -> "true",
128+
"ignoreTrailingWhiteSpace" -> "false").asJava)))
129+
130+
checkSparkAnswerAndOperator(
131+
df.select(
132+
to_csv(
133+
struct(col("col"), lit(1)),
134+
Map(
135+
"ignoreLeadingWhiteSpace" -> "false",
136+
"ignoreTrailingWhiteSpace" -> "true").asJava)))
137+
138+
checkSparkAnswerAndOperator(df.select(to_csv(
139+
struct(col("col"), lit(1)),
140+
Map("ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true").asJava)))
141+
142+
// Escape character
143+
checkSparkAnswerAndOperator(
144+
df.select(to_csv(struct(col("col"), lit(1)), Map("escape" -> "\\").asJava)))
145+
146+
checkSparkAnswerAndOperator(
147+
df.select(to_csv(struct(col("col"), lit(1)), Map("escape" -> "/").asJava)))
148+
149+
// Quote options
91150
checkSparkAnswerAndOperator(
92151
df.select(to_csv(struct(col("col"), lit(1)), Map("quoteAll" -> "true").asJava)))
152+
153+
checkSparkAnswerAndOperator(
154+
df.select(to_csv(struct(col("col"), lit(1)), Map("quoteAll" -> "false").asJava)))
155+
156+
// Null value representation
157+
checkSparkAnswerAndOperator(
158+
df.select(to_csv(struct(col("col"), lit(1)), Map("nullValue" -> "NULL").asJava)))
159+
160+
checkSparkAnswerAndOperator(
161+
df.select(to_csv(struct(col("col"), lit(1)), Map("nullValue" -> "N/A").asJava)))
162+
163+
checkSparkAnswerAndOperator(
164+
df.select(to_csv(struct(col("col"), lit(1)), Map("nullValue" -> "").asJava)))
165+
166+
// Combined options
167+
checkSparkAnswerAndOperator(
168+
df.select(
169+
to_csv(
170+
struct(col("col"), lit(1)),
171+
Map(
172+
"delimiter" -> "|",
173+
"quoteAll" -> "false",
174+
"escape" -> "\\",
175+
"nullValue" -> "NULL").asJava)))
176+
177+
checkSparkAnswerAndOperator(
178+
df.select(to_csv(
179+
struct(col("col"), lit(1)),
180+
Map(
181+
"delimiter" -> ";",
182+
"quoteAll" -> "false",
183+
"ignoreLeadingWhiteSpace" -> "true",
184+
"ignoreTrailingWhiteSpace" -> "true",
185+
"nullValue" -> "N/A").asJava)))
186+
187+
// Edge cases with multiple columns
188+
checkSparkAnswerAndOperator(
189+
df.select(
190+
to_csv(
191+
struct(col("col"), lit(1), lit("test"), lit(null).cast(StringType)),
192+
Map("delimiter" -> ",", "quoteAll" -> "true").asJava)))
93193
}
94194
}
95195
}

0 commit comments

Comments
 (0)