Skip to content

Commit 5e1262c

Browse files
Mohammad LinjawiMohammad Linjawi
authored andcommitted
[GLUTEN-10215][VL] Delta write: Fix native partitioned layout accounting
1 parent 6298aeb commit 5e1262c

3 files changed

Lines changed: 239 additions & 62 deletions

File tree

backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.files
1818

1919
import org.apache.gluten.backendsapi.BackendsApiManager
2020
import org.apache.gluten.backendsapi.velox.VeloxBatchType
21+
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
2122
import org.apache.gluten.config.GlutenConfig
2223
import org.apache.gluten.execution._
2324
import org.apache.gluten.execution.datasource.GlutenFormatFactory
@@ -45,6 +46,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter._
4546
import org.apache.spark.sql.execution.metric.SQLMetric
4647
import org.apache.spark.sql.internal.SQLConf
4748
import org.apache.spark.sql.types.DataType
49+
import org.apache.spark.sql.vectorized.ColumnarBatch
4850
import org.apache.spark.util.{SerializableConfiguration, Utils}
4951

5052
import org.apache.hadoop.conf.Configuration
@@ -590,43 +592,86 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
590592
record match {
591593
case carrierRow: BatchCarrierRow =>
592594
carrierRow match {
593-
case placeholderRow: PlaceholderRow =>
595+
case _: PlaceholderRow =>
594596
// Do nothing.
595597
case terminalRow: TerminalRow =>
596-
val numRows = terminalRow.batch().numRows()
597-
if (numRows > 0) {
598-
val blockStripes = GlutenFormatFactory.rowSplitter
599-
.splitBlockByPartitionAndBucket(
600-
terminalRow.batch(),
601-
partitionColIndice,
602-
isBucketed)
603-
val iter = blockStripes.iterator()
604-
while (iter.hasNext) {
605-
val blockStripe = iter.next()
606-
val headingRow = blockStripe.getHeadingRow
607-
beforeWrite(headingRow)
608-
val currentColumnBatch = blockStripe.getColumnarBatch
609-
val numRowsOfCurrentColumnarBatch = currentColumnBatch.numRows()
610-
assert(numRowsOfCurrentColumnarBatch > 0)
611-
val currentTerminalRow = terminalRow.withNewBatch(currentColumnBatch)
612-
currentWriter.write(currentTerminalRow)
613-
statsTrackers.foreach {
614-
tracker =>
615-
tracker.newRow(currentWriter.path, currentTerminalRow)
616-
for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
617-
tracker.newRow(currentWriter.path, new PlaceholderRow())
618-
}
619-
}
620-
currentColumnBatch.close()
621-
}
622-
blockStripes.release()
623-
recordsInFile += numRows
624-
}
598+
writePartitionedBatch(terminalRow)
625599
}
626600
case _ =>
627601
beforeWrite(record)
628602
writeRecord(record)
629603
}
630604
}
605+
606+
private def writeCurrentBatch(terminalRow: TerminalRow, rowCount: Int): Unit = {
607+
assert(rowCount > 0)
608+
currentWriter.write(terminalRow)
609+
statsTrackers.foreach(_.newRow(currentWriter.path, terminalRow))
610+
recordsInFile += rowCount
611+
}
612+
613+
private def writeCurrentBatchWithMaxRecords(
614+
terminalRow: TerminalRow,
615+
columnBatch: ColumnarBatch): Unit = {
616+
val numRows = columnBatch.numRows()
617+
var offset = 0
618+
while (offset < numRows) {
619+
val rowsRemaining = numRows - offset
620+
val rowsToWrite = if (description.maxRecordsPerFile > 0) {
621+
if (recordsInFile >= description.maxRecordsPerFile) {
622+
renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId)
623+
}
624+
math.min(rowsRemaining.toLong, description.maxRecordsPerFile - recordsInFile).toInt
625+
} else {
626+
rowsRemaining
627+
}
628+
629+
assert(rowsToWrite > 0)
630+
val batchToWrite =
631+
if (offset == 0 && rowsToWrite == numRows) {
632+
columnBatch
633+
} else {
634+
VeloxColumnarBatches.slice(columnBatch, offset, rowsToWrite)
635+
}
636+
try {
637+
writeCurrentBatch(terminalRow.withNewBatch(batchToWrite), rowsToWrite)
638+
} finally {
639+
if (batchToWrite ne columnBatch) {
640+
batchToWrite.close()
641+
}
642+
}
643+
offset += rowsToWrite
644+
}
645+
}
646+
647+
private def writePartitionStripe(terminalRow: TerminalRow, blockStripe: BlockStripe): Unit = {
648+
beforeWrite(blockStripe.getHeadingRow)
649+
val currentColumnBatch = blockStripe.getColumnarBatch
650+
try {
651+
assert(currentColumnBatch.numRows() > 0)
652+
writeCurrentBatchWithMaxRecords(terminalRow, currentColumnBatch)
653+
} finally {
654+
currentColumnBatch.close()
655+
}
656+
}
657+
658+
private def writePartitionedBatch(terminalRow: TerminalRow): Unit = {
659+
val numRows = terminalRow.batch().numRows()
660+
if (numRows > 0) {
661+
val blockStripes = GlutenFormatFactory.rowSplitter
662+
.splitBlockByPartitionAndBucket(
663+
terminalRow.batch(),
664+
partitionColIndice,
665+
isBucketed)
666+
try {
667+
val iter = blockStripes.iterator()
668+
while (iter.hasNext) {
669+
writePartitionStripe(terminalRow, iter.next())
670+
}
671+
} finally {
672+
blockStripes.release()
673+
}
674+
}
675+
}
631676
}
632677
}

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.files
1818

1919
import org.apache.gluten.backendsapi.BackendsApiManager
2020
import org.apache.gluten.backendsapi.velox.VeloxBatchType
21+
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
2122
import org.apache.gluten.config.GlutenConfig
2223
import org.apache.gluten.execution._
2324
import org.apache.gluten.execution.datasource.GlutenFormatFactory
@@ -46,6 +47,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter._
4647
import org.apache.spark.sql.execution.metric.SQLMetric
4748
import org.apache.spark.sql.internal.SQLConf
4849
import org.apache.spark.sql.types.DataType
50+
import org.apache.spark.sql.vectorized.ColumnarBatch
4951
import org.apache.spark.util.{SerializableConfiguration, Utils}
5052

5153
import org.apache.hadoop.conf.Configuration
@@ -583,42 +585,84 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
583585
record match {
584586
case carrierRow: BatchCarrierRow =>
585587
carrierRow match {
586-
case placeholderRow: PlaceholderRow =>
588+
case _: PlaceholderRow =>
587589
// Do nothing.
588590
case terminalRow: TerminalRow =>
589-
val numRows = terminalRow.batch().numRows()
590-
if (numRows > 0) {
591-
val blockStripes = GlutenFormatFactory.rowSplitter
592-
.splitBlockByPartitionAndBucket(terminalRow.batch(), partitionColIndice,
593-
isBucketed)
594-
val iter = blockStripes.iterator()
595-
while (iter.hasNext) {
596-
val blockStripe = iter.next()
597-
val headingRow = blockStripe.getHeadingRow
598-
beforeWrite(headingRow)
599-
val currentColumnBatch = blockStripe.getColumnarBatch
600-
val numRowsOfCurrentColumnarBatch = currentColumnBatch.numRows()
601-
assert(numRowsOfCurrentColumnarBatch > 0)
602-
val currentTerminalRow = terminalRow.withNewBatch(currentColumnBatch)
603-
currentWriter.write(currentTerminalRow)
604-
statsTrackers.foreach {
605-
tracker =>
606-
tracker.newRow(currentWriter.path, currentTerminalRow)
607-
for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
608-
tracker.newRow(currentWriter.path, new PlaceholderRow())
609-
}
610-
}
611-
currentColumnBatch.close()
612-
}
613-
blockStripes.release()
614-
recordsInFile += numRows
615-
}
591+
writePartitionedBatch(terminalRow)
616592
}
617593
case _ =>
618594
beforeWrite(record)
619595
writeRecord(record)
620596
}
621597
}
598+
599+
private def writeCurrentBatch(terminalRow: TerminalRow, rowCount: Int): Unit = {
600+
assert(rowCount > 0)
601+
currentWriter.write(terminalRow)
602+
statsTrackers.foreach(_.newRow(currentWriter.path, terminalRow))
603+
recordsInFile += rowCount
604+
}
605+
606+
private def writeCurrentBatchWithMaxRecords(
607+
terminalRow: TerminalRow,
608+
columnBatch: ColumnarBatch): Unit = {
609+
val numRows = columnBatch.numRows()
610+
var offset = 0
611+
while (offset < numRows) {
612+
val rowsRemaining = numRows - offset
613+
val rowsToWrite = if (description.maxRecordsPerFile > 0) {
614+
if (recordsInFile >= description.maxRecordsPerFile) {
615+
renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId)
616+
}
617+
math.min(rowsRemaining.toLong, description.maxRecordsPerFile - recordsInFile).toInt
618+
} else {
619+
rowsRemaining
620+
}
621+
622+
assert(rowsToWrite > 0)
623+
val batchToWrite =
624+
if (offset == 0 && rowsToWrite == numRows) {
625+
columnBatch
626+
} else {
627+
VeloxColumnarBatches.slice(columnBatch, offset, rowsToWrite)
628+
}
629+
try {
630+
writeCurrentBatch(terminalRow.withNewBatch(batchToWrite), rowsToWrite)
631+
} finally {
632+
if (batchToWrite ne columnBatch) {
633+
batchToWrite.close()
634+
}
635+
}
636+
offset += rowsToWrite
637+
}
638+
}
639+
640+
private def writePartitionStripe(terminalRow: TerminalRow, blockStripe: BlockStripe): Unit = {
641+
beforeWrite(blockStripe.getHeadingRow)
642+
val currentColumnBatch = blockStripe.getColumnarBatch
643+
try {
644+
assert(currentColumnBatch.numRows() > 0)
645+
writeCurrentBatchWithMaxRecords(terminalRow, currentColumnBatch)
646+
} finally {
647+
currentColumnBatch.close()
648+
}
649+
}
650+
651+
private def writePartitionedBatch(terminalRow: TerminalRow): Unit = {
652+
val numRows = terminalRow.batch().numRows()
653+
if (numRows > 0) {
654+
val blockStripes = GlutenFormatFactory.rowSplitter
655+
.splitBlockByPartitionAndBucket(terminalRow.batch(), partitionColIndice, isBucketed)
656+
try {
657+
val iter = blockStripes.iterator()
658+
while (iter.hasNext) {
659+
writePartitionStripe(terminalRow, iter.next())
660+
}
661+
} finally {
662+
blockStripes.release()
663+
}
664+
}
665+
}
622666
}
623667
}
624668
// spotless:on

0 commit comments

Comments
 (0)