Skip to content

Commit 7d53503

Browse files
committed
Merge branch 'master' into vinooganesh/thrift0.22.0
2 parents 9baf228 + 46595e2 commit 7d53503

14 files changed

Lines changed: 204 additions & 23 deletions

File tree

parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -962,9 +962,9 @@ private ParquetWriter<GenericRecord> writer(String file, Schema schema) throws I
962962
writerBuilder.withCodecFactory(HadoopCodecs.newFactory(ParquetProperties.DEFAULT_PAGE_SIZE));
963963
}
964964
if (conf == ConfigurationType.PLAIN_PARQUET_INTERFACE) {
965-
return writerBuilder.withConf(hadoopConfWithInterface).build();
966-
} else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
967965
return writerBuilder.withConf(plainParquetConf).build();
966+
} else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
967+
return writerBuilder.withConf(hadoopConfWithInterface).build();
968968
} else {
969969
return writerBuilder.withConf(testConf).build();
970970
}
@@ -983,9 +983,9 @@ private ParquetReader<GenericRecord> reader(String file) throws IOException {
983983
readerBuilder.withCodecFactory(HadoopCodecs.newFactory(ParquetProperties.DEFAULT_PAGE_SIZE));
984984
}
985985
if (conf == ConfigurationType.PLAIN_PARQUET_INTERFACE) {
986-
return readerBuilder.withConf(hadoopConfWithInterface).build();
987-
} else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
988986
return readerBuilder.withConf(plainParquetConf).build();
987+
} else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
988+
return readerBuilder.withConf(hadoopConfWithInterface).build();
989989
} else {
990990
return readerBuilder.withConf(testConf).build();
991991
}

parquet-cli/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ You can build this project using maven:
3131
The build produces a shaded Jar that can be run using the `hadoop` command:
3232

3333
```
34-
hadoop jar parquet-cli-1.12.3-runtime.jar org.apache.parquet.cli.Main
34+
hadoop jar parquet-cli-1.16.0-runtime.jar org.apache.parquet.cli.Main
3535
```
3636

3737
For a shorter command-line invocation, add an alias to your shell like this:
3838

3939
```
40-
alias parquet="hadoop jar /path/to/parquet-cli-1.12.3-runtime.jar org.apache.parquet.cli.Main --dollar-zero parquet"
40+
alias parquet="hadoop jar /path/to/parquet-cli-1.16.0-runtime.jar org.apache.parquet.cli.Main --dollar-zero parquet"
4141
```
4242

4343
### Running without Hadoop
@@ -51,7 +51,7 @@ To run from the target directory instead of using the `hadoop` command, first co
5151
Then, run the command-line and add `target/dependencies/*` to the classpath:
5252

5353
```
54-
java -cp 'target/parquet-cli-1.12.3.jar:target/dependency/*' org.apache.parquet.cli.Main
54+
java -cp 'target/parquet-cli-1.16.0.jar:target/dependency/*' org.apache.parquet.cli.Main
5555
```
5656

5757
Note that you shouldn't include the runtime jar used above into the classpath in this case.
@@ -79,6 +79,8 @@ Usage: parquet [options] [command] [command options]
7979
8080
help
8181
Retrieves details on the functions of other commands
82+
version
83+
Print version of the Parquet CLI tool
8284
meta
8385
Print a Parquet file's metadata
8486
pages
@@ -126,7 +128,7 @@ Usage: parquet [options] [command] [command options]
126128
127129
Examples:
128130
129-
# print information for create
131+
# print information for meta
130132
parquet help meta
131133
132134
See 'parquet help <command>' for more information on a specific command.

parquet-cli/src/main/java/org/apache/parquet/cli/Help.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.beust.jcommander.ParameterDescription;
2626
import com.beust.jcommander.Parameters;
2727
import com.google.common.collect.Lists;
28+
import java.util.Arrays;
29+
import java.util.Collections;
2830
import java.util.List;
2931
import org.slf4j.Logger;
3032

@@ -116,7 +118,7 @@ public void printGenericHelp() {
116118
}
117119

118120
jc.getCommands().keySet().stream()
119-
.filter(s -> !s.equals("help"))
121+
.filter(s -> !Arrays.asList("version", "help").contains(s))
120122
.findFirst()
121123
.ifPresent(command -> {
122124
console.info("\n Examples:");
@@ -148,6 +150,6 @@ private String formatDefault(ParameterDescription param) {
148150

149151
@Override
150152
public List<String> getExamples() {
151-
return null;
153+
return Collections.emptyList();
152154
}
153155
}

parquet-cli/src/main/java/org/apache/parquet/cli/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.parquet.cli.commands.ShowGeospatialStatisticsCommand;
5454
import org.apache.parquet.cli.commands.ShowPagesCommand;
5555
import org.apache.parquet.cli.commands.ShowSizeStatisticsCommand;
56+
import org.apache.parquet.cli.commands.ShowVersionCommand;
5657
import org.apache.parquet.cli.commands.ToAvroCommand;
5758
import org.apache.parquet.cli.commands.TransCompressionCommand;
5859
import org.slf4j.Logger;
@@ -87,6 +88,7 @@ public class Main extends Configured implements Tool {
8788
this.help = new Help(jc, console);
8889
jc.setProgramName(DEFAULT_PROGRAM_NAME);
8990
jc.addCommand("help", help, "-h", "-help", "--help");
91+
jc.addCommand("version", new ShowVersionCommand(console), "-version", "--version");
9092
jc.addCommand("meta", new ParquetMetadataCommand(console));
9193
jc.addCommand("pages", new ShowPagesCommand(console));
9294
jc.addCommand("dictionary", new ShowDictionaryCommand(console));
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.cli.commands;
20+
21+
import com.beust.jcommander.Parameters;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import org.apache.parquet.Version;
25+
import org.apache.parquet.cli.BaseCommand;
26+
import org.slf4j.Logger;
27+
28+
@Parameters(commandDescription = "Print version of the Parquet CLI tool")
29+
public class ShowVersionCommand extends BaseCommand {
30+
31+
public ShowVersionCommand(Logger console) {
32+
super(console);
33+
}
34+
35+
@Override
36+
public int run() {
37+
console.info(Version.FULL_VERSION);
38+
return 0;
39+
}
40+
41+
@Override
42+
public List<String> getExamples() {
43+
return Collections.emptyList();
44+
}
45+
}

parquet-cli/src/test/java/org/apache/parquet/cli/commands/FileTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919
package org.apache.parquet.cli.commands;
2020

2121
import java.io.File;
22+
import java.util.Queue;
23+
import java.util.concurrent.LinkedBlockingQueue;
2224
import org.apache.commons.logging.LogFactory;
2325
import org.apache.log4j.PropertyConfigurator;
2426
import org.junit.Rule;
2527
import org.junit.rules.TemporaryFolder;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
30+
import org.slf4j.event.LoggingEvent;
31+
import org.slf4j.event.SubstituteLoggingEvent;
32+
import org.slf4j.helpers.SubstituteLoggerFactory;
2833

2934
public abstract class FileTest {
3035

@@ -52,4 +57,24 @@ protected static Logger createLogger() {
5257
.setAttribute("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.Log4JLogger");
5358
return console;
5459
}
60+
61+
@FunctionalInterface
62+
public interface ThrowableBiConsumer<T, U> {
63+
void accept(T t, U u) throws Exception;
64+
}
65+
66+
protected static void withLogger(ThrowableBiConsumer<Logger, Queue<? extends LoggingEvent>> body) {
67+
SubstituteLoggerFactory loggerFactory = new SubstituteLoggerFactory();
68+
LinkedBlockingQueue<SubstituteLoggingEvent> loggingEvents = loggerFactory.getEventQueue();
69+
Logger console = loggerFactory.getLogger(ParquetFileTest.class.getName());
70+
try {
71+
body.accept(console, loggingEvents);
72+
} catch (RuntimeException rethrow) {
73+
throw rethrow;
74+
} catch (Exception checkedException) {
75+
throw new RuntimeException(checkedException);
76+
} finally {
77+
loggerFactory.clear();
78+
}
79+
}
5580
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.cli.commands;
20+
21+
import java.util.Queue;
22+
import org.apache.parquet.Version;
23+
import org.junit.Assert;
24+
import org.junit.Test;
25+
import org.slf4j.Logger;
26+
import org.slf4j.event.LoggingEvent;
27+
28+
public class ShowVersionCommandTest extends FileTest {
29+
30+
@Test
31+
public void testVersionCommand() {
32+
withLogger(this::testVersionCommand0);
33+
}
34+
35+
private void testVersionCommand0(Logger console, Queue<? extends LoggingEvent> loggingEvents) {
36+
ShowVersionCommand command = new ShowVersionCommand(console);
37+
Assert.assertEquals(0, command.run());
38+
Assert.assertEquals(1, loggingEvents.size());
39+
LoggingEvent loggingEvent = loggingEvents.remove();
40+
Assert.assertEquals(Version.FULL_VERSION, loggingEvent.getMessage());
41+
loggingEvents.clear();
42+
}
43+
}

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class ParquetProperties {
5757
public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD = Integer.MAX_VALUE / 2;
5858
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
5959
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
60+
public static final int DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT = Integer.MAX_VALUE;
6061
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
6162
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
6263
public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
@@ -122,6 +123,7 @@ public static WriterVersion fromString(String name) {
122123
private final ColumnProperty<Boolean> bloomFilterEnabled;
123124
private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
124125
private final ColumnProperty<Integer> numBloomFilterCandidates;
126+
private final int rowGroupRowCountLimit;
125127
private final int pageRowCountLimit;
126128
private final boolean pageWriteChecksumEnabled;
127129
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
@@ -153,6 +155,7 @@ private ParquetProperties(Builder builder) {
153155
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
154156
this.adaptiveBloomFilterEnabled = builder.adaptiveBloomFilterEnabled.build();
155157
this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
158+
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
156159
this.pageRowCountLimit = builder.pageRowCountLimit;
157160
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
158161
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
@@ -302,6 +305,10 @@ public boolean estimateNextSizeCheck() {
302305
return estimateNextSizeCheck;
303306
}
304307

308+
public int getRowGroupRowCountLimit() {
309+
return rowGroupRowCountLimit;
310+
}
311+
305312
public int getPageRowCountLimit() {
306313
return pageRowCountLimit;
307314
}
@@ -400,6 +407,7 @@ public static class Builder {
400407
private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
401408
private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
402409
private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
410+
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
403411
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
404412
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
405413
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
@@ -679,6 +687,12 @@ public Builder withBloomFilterEnabled(String columnPath, boolean enabled) {
679687
return this;
680688
}
681689

690+
public Builder withRowGroupRowCountLimit(int rowCount) {
691+
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for row groups: %s", rowCount);
692+
rowGroupRowCountLimit = rowCount;
693+
return this;
694+
}
695+
682696
public Builder withPageRowCountLimit(int rowCount) {
683697
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: %s", rowCount);
684698
pageRowCountLimit = rowCount;

parquet-hadoop/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,16 @@ conf.set("parquet.bloom.filter.fpp#column.path", 0.02)
266266

267267
---
268268

269-
270269
**Property:** `parquet.decrypt.off-heap.buffer.enabled`
271270
**Description:** Whether to use direct buffers to decrypt encrypted files. This should be set to
272271
true if the reader is using a `DirectByteBufferAllocator`
273272
**Default value:** `false`
274273

274+
---
275+
276+
**Property:** `parquet.block.row.count.limit`
277+
**Description:** The maximum number of rows per row group.
278+
**Default value:** `2147483647` (Integer.MAX_VALUE)
275279

276280
---
277281
**Property:** `parquet.page.row.count.limit`

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class InternalParquetRecordWriter<T> {
4848
private final WriteSupport<T> writeSupport;
4949
private final MessageType schema;
5050
private final Map<String, String> extraMetaData;
51-
private final long rowGroupSize;
5251
private long rowGroupSizeThreshold;
52+
private final int rowGroupRecordCountThreshold;
5353
private long nextRowGroupSize;
5454
private final BytesInputCompressor compressor;
5555
private final boolean validating;
@@ -91,8 +91,8 @@ public InternalParquetRecordWriter(
9191
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
9292
this.schema = schema;
9393
this.extraMetaData = extraMetaData;
94-
this.rowGroupSize = rowGroupSize;
9594
this.rowGroupSizeThreshold = rowGroupSize;
95+
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
9696
this.nextRowGroupSize = rowGroupSizeThreshold;
9797
this.compressor = compressor;
9898
this.validating = validating;
@@ -166,9 +166,16 @@ public long getDataSize() {
166166
}
167167

168168
private void checkBlockSizeReached() throws IOException {
169-
if (recordCount
170-
>= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it
171-
// for every record.
169+
if (recordCount >= rowGroupRecordCountThreshold) {
170+
LOG.debug("record count reaches threshold: flushing {} records to disk.", recordCount);
171+
flushRowGroupToStore();
172+
initStore();
173+
recordCountForNextMemCheck = min(
174+
max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
175+
props.getMaxRowCountForPageSizeCheck());
176+
this.lastRowGroupEndPos = parquetFileWriter.getPos();
177+
} else if (recordCount >= recordCountForNextMemCheck) {
178+
// checking the memory size is relatively expensive, so let's not do it for every record.
172179
long memSize = columnStore.getBufferedSize();
173180
long recordSize = memSize / recordCount;
174181
// flush the row group if it is within ~2 records of the limit

0 commit comments

Comments
 (0)