Skip to content

Commit ee7102e

Browse files
committed
Merge remote-tracking branch 'apache/main' into plan-stability-regression
2 parents d2580d0 + 07393c1 commit ee7102e

11 files changed

Lines changed: 142 additions & 30 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,10 @@ object CometConf extends ShimCometConf {
457457
val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
458458
conf("spark.comet.explain.verbose.enabled")
459459
.doc(
460-
"When this setting is enabled, Comet will provide a verbose tree representation of " +
461-
"the extended information.")
460+
"When this setting is enabled, Comet's extended explain output will provide the full " +
461+
"query plan annotated with fallback reasons as well as a summary of how much of " +
462+
"the plan was accelerated by Comet. When this setting is disabled, a list of fallback " +
463+
"reasons will be provided instead.")
462464
.booleanConf
463465
.createWithDefault(false)
464466

dev/benchmarks/tpcbench.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
5959

6060
for iteration in range(0, iterations):
6161
print(f"Starting iteration {iteration} of {iterations}")
62+
iter_start_time = time.time()
6263

6364
# Determine which queries to run
6465
if query_num is not None:

dev/diffs/iceberg/1.8.1.diff

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ index 7327b38905d..7967109f039 100644
1717
exclude group: 'org.apache.avro', module: 'avro'
1818
// already shaded by Parquet
1919
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
20-
index 04ffa8f4edc..cc0099ccc93 100644
20+
index 04ffa8f4edc..a909cd552c1 100644
2121
--- a/gradle/libs.versions.toml
2222
+++ b/gradle/libs.versions.toml
2323
@@ -34,6 +34,7 @@ azuresdk-bom = "1.2.31"
@@ -300,10 +300,10 @@ index 00000000000..ddf6c7de5ae
300300
+}
301301
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
302302
new file mode 100644
303-
index 00000000000..88b195b76a2
303+
index 00000000000..a3cba401827
304304
--- /dev/null
305305
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
306-
@@ -0,0 +1,255 @@
306+
@@ -0,0 +1,260 @@
307307
+/*
308308
+ * Licensed to the Apache Software Foundation (ASF) under one
309309
+ * or more contributor license agreements. See the NOTICE file
@@ -446,6 +446,7 @@ index 00000000000..88b195b76a2
446446
+ private long valuesRead = 0;
447447
+ private T last = null;
448448
+ private final FileReader cometReader;
449+
+ private ReadConf conf;
449450
+
450451
+ FileIterator(
451452
+ ReadConf conf,
@@ -470,6 +471,7 @@ index 00000000000..88b195b76a2
470471
+ length,
471472
+ fileEncryptionKey,
472473
+ fileAADPrefix);
474+
+ this.conf = conf;
473475
+ }
474476
+
475477
+ private FileReader newCometReader(
@@ -556,6 +558,9 @@ index 00000000000..88b195b76a2
556558
+ public void close() throws IOException {
557559
+ model.close();
558560
+ cometReader.close();
561+
+ if (conf != null && conf.reader() != null) {
562+
+ conf.reader().close();
563+
+ }
559564
+ }
560565
+ }
561566
+}

dev/diffs/iceberg/1.9.1.diff

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,10 @@ index 00000000000..ddf6c7de5ae
291291
+}
292292
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
293293
new file mode 100644
294-
index 00000000000..88b195b76a2
294+
index 00000000000..a3cba401827
295295
--- /dev/null
296296
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
297-
@@ -0,0 +1,255 @@
297+
@@ -0,0 +1,260 @@
298298
+/*
299299
+ * Licensed to the Apache Software Foundation (ASF) under one
300300
+ * or more contributor license agreements. See the NOTICE file
@@ -437,6 +437,7 @@ index 00000000000..88b195b76a2
437437
+ private long valuesRead = 0;
438438
+ private T last = null;
439439
+ private final FileReader cometReader;
440+
+ private ReadConf conf;
440441
+
441442
+ FileIterator(
442443
+ ReadConf conf,
@@ -461,6 +462,7 @@ index 00000000000..88b195b76a2
461462
+ length,
462463
+ fileEncryptionKey,
463464
+ fileAADPrefix);
465+
+ this.conf = conf;
464466
+ }
465467
+
466468
+ private FileReader newCometReader(
@@ -547,6 +549,9 @@ index 00000000000..88b195b76a2
547549
+ public void close() throws IOException {
548550
+ model.close();
549551
+ cometReader.close();
552+
+ if (conf != null && conf.reader() != null) {
553+
+ conf.reader().close();
554+
+ }
550555
+ }
551556
+ }
552557
+}

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Comet provides the following configuration settings.
6363
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
6464
| spark.comet.exec.window.enabled | Whether to enable window by default. | true |
6565
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
66-
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
66+
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
6767
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
6868
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
6969
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |

native/core/src/execution/memory_pools/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool(
4242
match memory_pool_config.pool_type {
4343
MemoryPoolType::Unified => {
4444
// Set Comet memory pool for native
45-
let memory_pool = CometUnifiedMemoryPool::new(comet_task_memory_manager);
45+
let memory_pool =
46+
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
4647
Arc::new(TrackConsumersPool::new(
4748
memory_pool,
4849
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),

native/core/src/execution/memory_pools/unified_pool.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use log::warn;
4040
pub struct CometUnifiedMemoryPool {
4141
task_memory_manager_handle: Arc<GlobalRef>,
4242
used: AtomicUsize,
43+
task_attempt_id: i64,
4344
}
4445

4546
impl Debug for CometUnifiedMemoryPool {
@@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool {
5152
}
5253

5354
impl CometUnifiedMemoryPool {
54-
pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> CometUnifiedMemoryPool {
55+
pub fn new(
56+
task_memory_manager_handle: Arc<GlobalRef>,
57+
task_attempt_id: i64,
58+
) -> CometUnifiedMemoryPool {
5559
Self {
5660
task_memory_manager_handle,
61+
task_attempt_id,
5762
used: AtomicUsize::new(0),
5863
}
5964
}
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
8287
fn drop(&mut self) {
8388
let used = self.used.load(Relaxed);
8489
if used != 0 {
85-
warn!("CometUnifiedMemoryPool dropped with {used} bytes still reserved");
90+
warn!(
91+
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
92+
self.task_attempt_id
93+
);
8694
}
8795
}
8896
}
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
96104
}
97105

98106
fn shrink(&self, _: &MemoryReservation, size: usize) {
99-
self.release_to_spark(size)
100-
.unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
107+
if let Err(e) = self.release_to_spark(size) {
108+
panic!(
109+
"Task {} failed to return {size} bytes to Spark: {e:?}",
110+
self.task_attempt_id
111+
);
112+
}
101113
if let Err(prev) = self
102114
.used
103115
.fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
104116
{
105-
panic!("overflow when releasing {size} of {prev} bytes");
117+
panic!(
118+
"Task {} overflow when releasing {size} of {prev} bytes",
119+
self.task_attempt_id
120+
);
106121
}
107122
}
108123

@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
116131
self.release_to_spark(acquired as usize)?;
117132

118133
return Err(resources_datafusion_err!(
119-
"Failed to acquire {} bytes, only got {}. Reserved: {}",
134+
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
135+
self.task_attempt_id,
120136
additional,
121137
acquired,
122138
self.reserved()
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
127143
.fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired as usize))
128144
{
129145
return Err(resources_datafusion_err!(
130-
"Failed to acquire {} bytes due to overflow. Reserved: {}",
146+
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
147+
self.task_attempt_id,
131148
additional,
132149
prev
133150
));

spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
4040
/** The id uniquely identifies the native plan this memory manager is associated to */
4141
private final long id;
4242

43+
private final long taskAttemptId;
44+
4345
public final TaskMemoryManager internal;
4446
private final NativeMemoryConsumer nativeMemoryConsumer;
4547
private final AtomicLong used = new AtomicLong();
4648

47-
public CometTaskMemoryManager(long id) {
49+
public CometTaskMemoryManager(long id, long taskAttemptId) {
4850
this.id = id;
51+
this.taskAttemptId = taskAttemptId;
4952
this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
5053
this.nativeMemoryConsumer = new NativeMemoryConsumer();
5154
}
5255

5356
// Called by Comet native through JNI.
5457
// Returns the actual amount of memory (in bytes) granted.
5558
public long acquireMemory(long size) {
59+
if (logger.isTraceEnabled()) {
60+
logger.trace("Task {} requested {} bytes", taskAttemptId, size);
61+
}
5662
long acquired = internal.acquireExecutionMemory(size, nativeMemoryConsumer);
57-
used.addAndGet(acquired);
63+
long newUsed = used.addAndGet(acquired);
5864
if (acquired < size) {
65+
logger.warn(
66+
"Task {} requested {} bytes but only received {} bytes. Current allocation is {} and "
67+
+ "the total memory consumption is {} bytes.",
68+
taskAttemptId,
69+
size,
70+
acquired,
71+
newUsed,
72+
internal.getMemoryConsumptionForThisTask());
5973
// If memory manager is not able to acquire the requested size, log memory usage
6074
internal.showMemoryUsage();
6175
}
@@ -64,10 +78,16 @@ public long acquireMemory(long size) {
6478

6579
// Called by Comet native through JNI
6680
public void releaseMemory(long size) {
81+
if (logger.isTraceEnabled()) {
82+
logger.trace("Task {} released {} bytes", taskAttemptId, size);
83+
}
6784
long newUsed = used.addAndGet(-size);
6885
if (newUsed < 0) {
6986
logger.error(
70-
"Used memory is negative: " + newUsed + " after releasing memory chunk of: " + size);
87+
"Task {} used memory is negative ({}) after releasing {} bytes",
88+
taskAttemptId,
89+
newUsed,
90+
size);
7191
}
7292
internal.releaseExecutionMemory(size, nativeMemoryConsumer);
7393
}

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class CometExecIterator(
6868
private val memoryMXBean = ManagementFactory.getMemoryMXBean
6969
private val nativeLib = new Native()
7070
private val nativeUtil = new NativeUtil()
71-
private val cometTaskMemoryManager = new CometTaskMemoryManager(id)
71+
private val taskAttemptId = TaskContext.get().taskAttemptId
72+
private val cometTaskMemoryManager = new CometTaskMemoryManager(id, taskAttemptId)
7273
private val cometBatchIterators = inputs.map { iterator =>
7374
new CometBatchIterator(iterator, nativeUtil)
7475
}.toArray
@@ -116,7 +117,7 @@ class CometExecIterator(
116117
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
117118
memoryLimit,
118119
memoryLimitPerTask,
119-
taskAttemptId = TaskContext.get().taskAttemptId,
120+
taskAttemptId,
120121
debug = COMET_DEBUG_ENABLED.get(),
121122
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
122123
tracingEnabled)
@@ -175,6 +176,11 @@ class CometExecIterator(
175176
})
176177
} catch {
177178
case e: CometNativeException =>
179+
// it is generally considered bad practice to log and then rethrow an
180+
// exception, but it really helps debugging to be able to see which task
181+
// threw the exception, so we log the exception with taskAttemptId here
182+
logError(s"Native execution for task $taskAttemptId failed", e)
183+
178184
val fileNotFoundPattern: Regex =
179185
("""^External: Object at location (.+?) not found: No such file or directory """ +
180186
"""\(os error \d+\)$""").r

0 commit comments

Comments
 (0)