Skip to content

Commit f538424

Browse files
Experimental: Native CSV files read (apache#3044)
1 parent c6c3002 commit f538424

20 files changed

Lines changed: 539 additions & 61 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ jobs:
188188
org.apache.spark.sql.comet.ParquetEncryptionITCase
189189
org.apache.comet.exec.CometNativeReaderSuite
190190
org.apache.comet.CometIcebergNativeSuite
191+
- name: "csv"
192+
value: |
193+
org.apache.comet.csv.CometCsvNativeReadSuite
191194
- name: "exec"
192195
value: |
193196
org.apache.comet.exec.CometAggregateSuite

.github/workflows/pr_build_macos.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ jobs:
138138
org.apache.spark.sql.comet.ParquetEncryptionITCase
139139
org.apache.comet.exec.CometNativeReaderSuite
140140
org.apache.comet.CometIcebergNativeSuite
141+
- name: "csv"
142+
value: |
143+
org.apache.comet.csv.CometCsvNativeReadSuite
141144
- name: "exec"
142145
value: |
143146
org.apache.comet.exec.CometAggregateSuite

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,16 @@ object CometConf extends ShimCometConf {
150150
.booleanConf
151151
.createWithDefault(false)
152152

153+
val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
154+
conf("spark.comet.scan.csv.v2.enabled")
155+
.category(CATEGORY_TESTING)
156+
.doc(
157+
"Whether to use the native Comet V2 CSV reader for improved performance. " +
158+
"Default: false (uses standard Spark CSV reader) " +
159+
"Experimental: Performance benefits are workload-dependent.")
160+
.booleanConf
161+
.createWithDefault(false)
162+
153163
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
154164
conf("spark.comet.parquet.respectFilterPushdown")
155165
.category(CATEGORY_PARQUET)

dev/benchmarks/comet-tpch.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,5 @@ $SPARK_HOME/bin/spark-submit \
5050
--data $TPCH_DATA \
5151
--queries $TPCH_QUERIES \
5252
--output . \
53-
--iterations 1
53+
--iterations 1 \
54+
--format parquet

dev/benchmarks/spark-tpch.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ $SPARK_HOME/bin/spark-submit \
4242
--data $TPCH_DATA \
4343
--queries $TPCH_QUERIES \
4444
--output . \
45-
--iterations 1
45+
--iterations 1 \
46+
--format parquet

dev/benchmarks/tpcbench.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
from pyspark.sql import SparkSession
2222
import time
23+
from typing import Dict
2324

2425
# rename same columns aliases
2526
# a, a, b, b -> a, a_1, b, b_1
@@ -37,7 +38,7 @@ def dedup_columns(df):
3738
new_cols.append(f"{c}_{counts[c]}")
3839
return df.toDF(*new_cols)
3940

40-
def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None):
41+
def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, format: str, query_num: int = None, write_path: str = None, options: Dict[str, str] = None):
4142

4243
# Initialize a SparkSession
4344
spark = SparkSession.builder \
@@ -58,9 +59,9 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
5859
raise "invalid benchmark"
5960

6061
for table in table_names:
61-
path = f"{data_path}/{table}.parquet"
62+
path = f"{data_path}/{table}.{format}"
6263
print(f"Registering table {table} using path {path}")
63-
df = spark.read.parquet(path)
64+
df = spark.read.format(format).options(**options).load(path)
6465
df.createOrReplaceTempView(table)
6566

6667
conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()}
@@ -146,15 +147,17 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
146147

147148
if __name__ == "__main__":
148149
parser = argparse.ArgumentParser(description="DataFusion benchmark derived from TPC-H / TPC-DS")
149-
parser.add_argument("--benchmark", required=True, help="Benchmark to run (tpch or tpcds)")
150+
parser.add_argument("--benchmark", required=True, default="tpch", help="Benchmark to run (tpch or tpcds)")
150151
parser.add_argument("--data", required=True, help="Path to data files")
151152
parser.add_argument("--queries", required=True, help="Path to query files")
152153
parser.add_argument("--iterations", required=False, default="1", help="How many iterations to run")
153154
parser.add_argument("--output", required=True, help="Path to write output")
154155
parser.add_argument("--name", required=True, help="Prefix for result file e.g. spark/comet/gluten")
155156
parser.add_argument("--query", required=False, type=int, help="Specific query number to run (1-based). If not specified, all queries will be run.")
156157
parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.")
158+
parser.add_argument("--format", required=True, default="parquet", help="Input file format (parquet, csv, json)")
159+
parser.add_argument("--options", type=json.loads, required=False, default={}, help='Spark options as JSON string, e.g., \'{"header": "true", "delimiter": ","}\'')
157160
args = parser.parse_args()
158161

159-
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)
162+
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.format, args.query, args.write, args.options)
160163

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ These settings can be used to determine which parts of the plan are accelerated
144144
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
145145
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
146146
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false |
147+
| `spark.comet.scan.csv.v2.enabled` | Whether to use the native Comet V2 CSV reader for improved performance. Default: false (uses standard Spark CSV reader) Experimental: Performance benefits are workload-dependent. | false |
147148
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false |
148149
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan |
149150
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::execution::operators::ExecutionError;
19+
use arrow::datatypes::{Field, SchemaRef};
20+
use datafusion::common::DataFusionError;
21+
use datafusion::common::Result;
22+
use datafusion::datasource::object_store::ObjectStoreUrl;
23+
use datafusion::datasource::physical_plan::CsvSource;
24+
use datafusion_comet_proto::spark_operator::CsvOptions;
25+
use datafusion_datasource::file_groups::FileGroup;
26+
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
27+
use datafusion_datasource::source::DataSourceExec;
28+
use datafusion_datasource::PartitionedFile;
29+
use itertools::Itertools;
30+
use std::sync::Arc;
31+
32+
pub fn init_csv_datasource_exec(
33+
object_store_url: ObjectStoreUrl,
34+
file_groups: Vec<Vec<PartitionedFile>>,
35+
data_schema: SchemaRef,
36+
partition_schema: SchemaRef,
37+
projection_vector: Vec<usize>,
38+
csv_options: &CsvOptions,
39+
) -> Result<Arc<DataSourceExec>, ExecutionError> {
40+
let csv_source = build_csv_source(csv_options.clone());
41+
42+
let file_groups = file_groups
43+
.iter()
44+
.map(|files| FileGroup::new(files.clone()))
45+
.collect();
46+
47+
let partition_fields = partition_schema
48+
.fields()
49+
.iter()
50+
.map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
51+
.collect_vec();
52+
53+
let file_scan_config: FileScanConfig =
54+
FileScanConfigBuilder::new(object_store_url, data_schema, csv_source)
55+
.with_file_groups(file_groups)
56+
.with_table_partition_cols(partition_fields)
57+
.with_projection_indices(Some(projection_vector))
58+
.build();
59+
60+
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
61+
}
62+
63+
fn build_csv_source(options: CsvOptions) -> Arc<CsvSource> {
64+
let delimiter = string_to_u8(&options.delimiter, "delimiter").unwrap();
65+
let quote = string_to_u8(&options.quote, "quote").unwrap();
66+
let escape = string_to_u8(&options.escape, "escape").unwrap();
67+
let terminator = string_to_u8(&options.terminator, "terminator").unwrap();
68+
let comment = options
69+
.comment
70+
.map(|c| string_to_u8(&c, "comment").unwrap());
71+
let csv_source = CsvSource::new(options.has_header, delimiter, quote)
72+
.with_escape(Some(escape))
73+
.with_comment(comment)
74+
.with_terminator(Some(terminator))
75+
.with_truncate_rows(options.truncated_rows);
76+
Arc::new(csv_source)
77+
}
78+
79+
fn string_to_u8(option: &str, option_name: &str) -> Result<u8> {
80+
match option.as_bytes().first() {
81+
Some(&ch) if ch.is_ascii() => Ok(ch),
82+
_ => Err(DataFusionError::Configuration(format!(
83+
"invalid {option_name} character '{option}': must be an ASCII character"
84+
))),
85+
}
86+
}

native/core/src/execution/operators/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ pub use expand::ExpandExec;
3131
mod iceberg_scan;
3232
mod parquet_writer;
3333
pub use parquet_writer::ParquetWriterExec;
34+
mod csv_scan;
3435
pub mod projection;
3536
mod scan;
37+
pub use csv_scan::init_csv_datasource_exec;
3638

3739
/// Error returned during executing operators.
3840
#[derive(thiserror::Error, Debug)]

native/core/src/execution/planner.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod expression_registry;
2121
pub mod macros;
2222
pub mod operator_registry;
2323

24+
use crate::execution::operators::init_csv_datasource_exec;
2425
use crate::execution::operators::IcebergScanExec;
2526
use crate::{
2627
errors::ExpressionError,
@@ -94,6 +95,7 @@ use datafusion::physical_expr::window::WindowExpr;
9495
use datafusion::physical_expr::LexOrdering;
9596

9697
use crate::parquet::parquet_exec::init_datasource_exec;
98+
9799
use arrow::array::{
98100
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array,
99101
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
@@ -1059,6 +1061,44 @@ impl PhysicalPlanner {
10591061
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
10601062
))
10611063
}
1064+
OpStruct::CsvScan(scan) => {
1065+
let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
1066+
let partition_schema =
1067+
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
1068+
let projection_vector: Vec<usize> =
1069+
scan.projection_vector.iter().map(|i| *i as usize).collect();
1070+
let object_store_options: HashMap<String, String> = scan
1071+
.object_store_options
1072+
.iter()
1073+
.map(|(k, v)| (k.clone(), v.clone()))
1074+
.collect();
1075+
let one_file = scan
1076+
.file_partitions
1077+
.first()
1078+
.and_then(|f| f.partitioned_file.first())
1079+
.map(|f| f.file_path.clone())
1080+
.ok_or(GeneralError("Failed to locate file".to_string()))?;
1081+
let (object_store_url, _) = prepare_object_store_with_configs(
1082+
self.session_ctx.runtime_env(),
1083+
one_file,
1084+
&object_store_options,
1085+
)?;
1086+
let files =
1087+
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
1088+
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
1089+
let scan = init_csv_datasource_exec(
1090+
object_store_url,
1091+
file_groups,
1092+
data_schema,
1093+
partition_schema,
1094+
projection_vector,
1095+
&scan.csv_options.clone().unwrap(),
1096+
)?;
1097+
Ok((
1098+
vec![],
1099+
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
1100+
))
1101+
}
10621102
OpStruct::Scan(scan) => {
10631103
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();
10641104

0 commit comments

Comments
 (0)