Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

import org.apache.paimon.arrow.reader.ArrowBatchReader;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.ProjectableBundleRecords;
import org.apache.paimon.io.ReplayableBundleRecords;
import org.apache.paimon.types.RowType;

import org.apache.arrow.vector.VectorSchemaRoot;

import java.util.Iterator;

/** Batch records for vector schema root. */
public class ArrowBundleRecords implements BundleRecords {
public class ArrowBundleRecords implements ProjectableBundleRecords {

private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
Expand All @@ -55,4 +56,9 @@ public Iterator<InternalRow> iterator() {
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}

@Override
public ReplayableBundleRecords project(int[] projection) {
return new ArrowBundleRecords(vectorSchemaRoot, rowType.project(projection), caseSensitive);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.annotation.Experimental;

/**
* Opt-in extension for replayable bundles that can preserve their bundle type under projection.
*
* <p>This allows projection-aware writers to keep format-specific bundle fast-paths without
* coupling to concrete bundle implementations from other modules.
*/
@Experimental
public interface ProjectableBundleRecords extends ReplayableBundleRecords {

/** Returns a replayable bundle that exposes the projected fields in projection order. */
ReplayableBundleRecords project(int[] projection);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.annotation.Experimental;

/**
* Opt-in marker for {@link BundleRecords} implementations that support repeated iteration.
*
* <p>This keeps replayability as an explicit capability instead of strengthening the base {@link
* BundleRecords} contract for all callers.
*/
@Experimental
public interface ReplayableBundleRecords extends BundleRecords {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.append;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.FileWriterContext;
import org.apache.paimon.io.ReplayableBundleRecords;
import org.apache.paimon.io.RowDataFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

/**
* Dedicated-path {@link RowDataFileWriter} which preserves row-level side effects when writing
* replayable bundles.
*/
class BundleAwareRowDataFileWriter extends RowDataFileWriter implements BundlePassThroughWriter {

private final boolean supportsBundlePassThrough;

public BundleAwareRowDataFileWriter(
FileIO fileIO,
FileWriterContext context,
Path path,
RowType writeSchema,
long schemaId,
Supplier<LongCounter> seqNumCounterSupplier,
FileIndexOptions fileIndexOptions,
FileSource fileSource,
boolean asyncFileWrite,
boolean statsDenseStore,
boolean isExternalPath,
@Nullable List<String> writeCols) {
super(
fileIO,
context,
path,
writeSchema,
schemaId,
seqNumCounterSupplier,
fileIndexOptions,
fileSource,
asyncFileWrite,
statsDenseStore,
isExternalPath,
writeCols);
this.supportsBundlePassThrough = supportsBundleWrite();
}

@Override
public boolean supportsBundlePassThrough() {
return supportsBundlePassThrough;
}

@Override
public void writeBundle(BundleRecords bundle) throws IOException {
if (!(bundle instanceof ReplayableBundleRecords)) {
for (InternalRow row : bundle) {
write(row);
}
return;
}

writeReplayableBundle((ReplayableBundleRecords) bundle);
}

@Override
public void writeReplayableBundle(ReplayableBundleRecords bundle) throws IOException {
if (!supportsBundlePassThrough) {
for (InternalRow row : bundle) {
write(row);
}
return;
}

try {
super.writeBundle(bundle);
// Dedicated-format fan-out only forwards replayable bundles here, so row-level side
// effects can safely replay the same logical rows after the format writer consumes the
// bundle.
for (InternalRow row : bundle) {
recordRowWrite(row);
}
} catch (Throwable e) {
abort();
throw e;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.append;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.ReplayableBundleRecords;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;

import java.util.function.Supplier;

/** Rolling writer used by dedicated-format bundle pass-through paths. */
class BundleAwareRowDataRollingFileWriter extends RollingFileWriterImpl<InternalRow, DataFileMeta>
implements BundlePassThroughWriter {

private final boolean supportsBundlePassThrough;

public BundleAwareRowDataRollingFileWriter(
Supplier<? extends BundleAwareRowDataFileWriter> writerFactory,
boolean supportsBundlePassThrough,
long targetFileSize) {
super(writerFactory, targetFileSize);
this.supportsBundlePassThrough = supportsBundlePassThrough;
}

@Override
public boolean supportsBundlePassThrough() {
return supportsBundlePassThrough;
}

@Override
public void writeReplayableBundle(ReplayableBundleRecords bundle) throws java.io.IOException {
writeBundle(bundle);
}

@VisibleForTesting
public static boolean supportsBundlePassThrough(
FileFormat fileFormat,
RowType rowType,
SimpleColStatsCollector.Factory[] statsCollectors) {
return !RollingFileWriter.createStatsProducer(fileFormat, rowType, statsCollectors)
.requirePerRecord();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.append;

import org.apache.paimon.io.ReplayableBundleRecords;

import java.io.IOException;

/**
* Internal contract for writers that can safely accept replayable bundle pass-through from
* projection wrappers.
*
* <p>The support is instance-specific because it may depend on runtime configuration such as
* statistics collection mode.
*/
interface BundlePassThroughWriter {

boolean supportsBundlePassThrough();

void writeReplayableBundle(ReplayableBundleRecords bundle) throws IOException;
}
Loading