Skip to content

Commit 97414b3

Browse files
committed
Added suppor to write to stdout
1 parent cdcea6d commit 97414b3

6 files changed

Lines changed: 241 additions & 27 deletions

File tree

parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.Closeable;
3333
import java.io.IOException;
3434
import java.util.List;
35+
3536
import org.apache.avro.Schema;
3637
import org.apache.avro.generic.GenericData;
3738
import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +43,7 @@
4243
import org.apache.parquet.cli.util.Schemas;
4344
import org.apache.parquet.hadoop.ParquetWriter;
4445
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
46+
import org.apache.parquet.io.StandardOutputFile;
4547
import org.slf4j.Logger;
4648

4749
@Parameters(commandDescription = "Create a Parquet file from a data file")
@@ -56,8 +58,8 @@ public ConvertCommand(Logger console) {
5658

5759
@Parameter(
5860
names = {"-o", "--output"},
59-
description = "Output file path",
60-
required = true)
61+
description = "Output file path, if not given writes to stdout",
62+
required = false)
6163
String outputPath = null;
6264

6365
@Parameter(
@@ -112,18 +114,11 @@ public int run() throws IOException {
112114
}
113115
Schema projection = filterSchema(schema, columns);
114116

115-
Path outPath = qualifiedPath(outputPath);
116-
FileSystem outFS = outPath.getFileSystem(getConf());
117-
if (overwrite && outFS.exists(outPath)) {
118-
console.debug("Deleting output file {} (already exists)", outPath);
119-
outFS.delete(outPath);
120-
}
121-
122117
Iterable<Record> reader = openDataFile(source, projection);
123118
boolean threw = true;
124119
long count = 0;
125120
try {
126-
try (ParquetWriter<Record> writer = AvroParquetWriter.<Record>builder(qualifiedPath(outputPath))
121+
try (ParquetWriter<Record> writer = createBuilder()
127122
.withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
128123
.withConf(getConf())
129124
.withCompressionCodec(codec)
@@ -151,6 +146,19 @@ public int run() throws IOException {
151146
return 0;
152147
}
153148

149+
private AvroParquetWriter.Builder<Record> createBuilder() throws IOException {
150+
if (outputPath != null) {
151+
Path outPath = qualifiedPath(outputPath);
152+
FileSystem outFS = outPath.getFileSystem(getConf());
153+
if (overwrite && outFS.exists(outPath)) {
154+
console.debug("Deleting output file {} (already exists)", outPath);
155+
outFS.delete(outPath);
156+
}
157+
return AvroParquetWriter.<Record>builder(outPath);
158+
}
159+
return AvroParquetWriter.<Record>builder(new StandardOutputFile());
160+
}
161+
154162
@Override
155163
public List<String> getExamples() {
156164
return Lists.newArrayList(

parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,15 @@
3131
import org.apache.hadoop.fs.Path;
3232
import org.apache.parquet.cli.BaseCommand;
3333
import org.apache.parquet.cli.util.Codecs;
34+
import org.apache.parquet.conf.HadoopParquetConfiguration;
3435
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
3536
import org.apache.parquet.hadoop.rewrite.MaskMode;
3637
import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
3738
import org.apache.parquet.hadoop.rewrite.RewriteOptions;
39+
import org.apache.parquet.hadoop.util.HadoopInputFile;
40+
import org.apache.parquet.io.InputFile;
41+
import org.apache.parquet.io.OutputFile;
42+
import org.apache.parquet.io.StandardOutputFile;
3843
import org.slf4j.Logger;
3944

4045
@Parameters(commandDescription = "Rewrite one or more Parquet files to a new Parquet file")
@@ -48,8 +53,8 @@ public class RewriteCommand extends BaseCommand {
4853

4954
@Parameter(
5055
names = {"-o", "--output"},
51-
description = "<output parquet file path>",
52-
required = true)
56+
description = "<output parquet file path. If not given, writes to stdout>",
57+
required = false)
5358
String output;
5459

5560
@Parameter(
@@ -88,17 +93,11 @@ public RewriteCommand(Logger console) {
8893

8994
private RewriteOptions buildOptionsOrFail() throws IOException {
9095
Preconditions.checkArgument(
91-
inputs != null && !inputs.isEmpty() && output != null,
92-
"Both input and output parquet file paths are required.");
93-
94-
List<Path> inputPaths = new ArrayList<>();
95-
for (String input : inputs) {
96-
inputPaths.add(new Path(input));
97-
}
98-
Path outputPath = new Path(output);
96+
inputs != null && !inputs.isEmpty(),
97+
"Input parquet file paths are required.");
9998

10099
// The builder below takes the job to validate all input parameters.
101-
RewriteOptions.Builder builder = new RewriteOptions.Builder(getConf(), inputPaths, outputPath);
100+
RewriteOptions.Builder builder = createBuilder();
102101

103102
// Mask columns if specified.
104103
if (maskMode != null && maskMode.equals("nullify") && maskColumns != null && !maskColumns.isEmpty()) {
@@ -121,18 +120,38 @@ private RewriteOptions buildOptionsOrFail() throws IOException {
121120

122121
RewriteOptions options = builder.build();
123122

124-
// If RewriteOptions are successfully built and the overwrite option is specified, remove the output path
125-
FileSystem outFS = outputPath.getFileSystem(getConf());
126-
if (overwrite && outFS.exists(outputPath)) {
127-
console.debug("Deleting output file {} (already exists)", outputPath);
128-
outFS.delete(outputPath);
123+
if (output != null) {
124+
Path outputPath = new Path(output);
125+
// If RewriteOptions are successfully built and the overwrite option is specified, remove the output path
126+
FileSystem outFS = outputPath.getFileSystem(getConf());
127+
if (overwrite && outFS.exists(outputPath)) {
128+
console.debug("Deleting output file {} (already exists)", outputPath);
129+
outFS.delete(outputPath);
130+
}
129131
}
130132

131133
return options;
132134
}
133135

136+
private RewriteOptions.Builder createBuilder() {
137+
if (output != null) {
138+
List<Path> inputPaths = new ArrayList<>();
139+
for (String input : inputs) {
140+
inputPaths.add(new Path(input));
141+
}
142+
Path outputPath = new Path(output);
143+
return new RewriteOptions.Builder(getConf(), inputPaths, outputPath);
144+
}
145+
146+
List<InputFile> inputFiles = new ArrayList<>();
147+
for (String input : inputs) {
148+
inputFiles.add(HadoopInputFile.fromPathUnchecked(new Path(input), getConf()));
149+
}
150+
OutputFile outputFile = new StandardOutputFile();
151+
return new RewriteOptions.Builder(new HadoopParquetConfiguration(getConf()), inputFiles, outputFile);
152+
}
153+
134154
@Override
135-
@SuppressWarnings("unchecked")
136155
public int run() throws IOException {
137156
RewriteOptions options = buildOptionsOrFail();
138157
ParquetRewriter rewriter = new ParquetRewriter(options);

parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCommandTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.apache.parquet.cli.commands;
2020

21+
import java.io.ByteArrayOutputStream;
2122
import java.io.File;
2223
import java.io.IOException;
24+
import java.io.PrintStream;
2325
import java.util.Arrays;
2426
import org.apache.hadoop.conf.Configuration;
2527
import org.junit.Assert;
@@ -37,4 +39,17 @@ public void testConvertCommand() throws IOException {
3739
Assert.assertEquals(0, command.run());
3840
Assert.assertTrue(output.exists());
3941
}
42+
43+
@Test
44+
public void testConvertCommandToStdOut() throws IOException {
45+
File file = toAvro(parquetFile());
46+
ConvertCommand command = new ConvertCommand(createLogger());
47+
command.targets = Arrays.asList(file.getAbsolutePath());
48+
command.setConf(new Configuration());
49+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
50+
System.setOut(new PrintStream(baos));
51+
Assert.assertEquals(0, command.run());
52+
Assert.assertTrue(baos.size() > 0);
53+
}
54+
4055
}

parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.apache.parquet.cli.commands;
2020

21+
import java.io.ByteArrayOutputStream;
2122
import java.io.File;
2223
import java.io.IOException;
24+
import java.io.PrintStream;
2325
import java.nio.file.Files;
2426
import java.util.Arrays;
2527
import org.apache.hadoop.conf.Configuration;
@@ -95,4 +97,19 @@ public void testRewriteCommandWithCompression_gzip() throws IOException {
9597
Assert.assertEquals(0, command.run());
9698
Assert.assertTrue(output.exists());
9799
}
100+
101+
@Test
102+
public void testRewriteCommandToStdOut() throws IOException {
103+
File file = parquetFile();
104+
RewriteCommand command = new RewriteCommand(createLogger());
105+
command.inputs = Arrays.asList(file.getAbsolutePath());
106+
command.codec = "gzip";
107+
command.setConf(new Configuration());
108+
109+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
110+
System.setOut(new PrintStream(baos));
111+
Assert.assertEquals(0, command.run());
112+
Assert.assertTrue(baos.size() > 0);
113+
}
114+
98115
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.io;
21+
22+
import java.io.IOException;
23+
24+
/**
25+
* An {@link OutputFile} implementation that allows Parquet to write to {@link System#out}
26+
*/
27+
public class StandardOutputFile implements OutputFile {
28+
29+
/**
30+
* @implNote we don't want to close the standard output (it's up to the process lifecycle to handle that)
31+
*/
32+
public class RawPositionOutputStream extends PositionOutputStream {
33+
34+
private long pos = 0;
35+
36+
@Override
37+
public long getPos() throws IOException {
38+
return this.pos;
39+
}
40+
41+
@Override
42+
public void write(int b) throws IOException {
43+
this.pos++;
44+
System.out.write(b);
45+
}
46+
47+
@Override
48+
public void write(byte[] b) throws IOException {
49+
this.pos += b.length;
50+
System.out.write(b);
51+
}
52+
53+
@Override
54+
public void write(byte[] b, int off, int len) throws IOException {
55+
this.pos += len;
56+
System.out.write(b, off, len);
57+
}
58+
59+
@Override
60+
public void flush() throws IOException {
61+
System.out.flush();
62+
}
63+
64+
}
65+
66+
@Override
67+
public PositionOutputStream create(long blockSizeHint) throws IOException {
68+
return new RawPositionOutputStream();
69+
}
70+
71+
@Override
72+
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
73+
return new RawPositionOutputStream();
74+
}
75+
76+
@Override
77+
public boolean supportsBlockSize() {
78+
return false;
79+
}
80+
81+
@Override
82+
public long defaultBlockSize() {
83+
return -1;
84+
}
85+
86+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.io;
20+
21+
import static org.junit.Assert.assertEquals;
22+
23+
import org.junit.Test;
24+
25+
import java.io.ByteArrayOutputStream;
26+
import java.io.IOException;
27+
import java.io.PrintStream;
28+
29+
public class TestStandardOutputFile {
30+
31+
private static final String TEST = "this is a test";
32+
33+
@Test
34+
public void outputFileWriteByteToStdOut() throws IOException, InterruptedException {
35+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
36+
System.setOut(new PrintStream(baos));
37+
final PositionOutputStream stdOut = new StandardOutputFile().create(1);
38+
final byte test = 7;
39+
stdOut.write(test);
40+
assertEquals(1, stdOut.getPos());
41+
assertEquals(1, baos.toByteArray().length);
42+
assertEquals(7, baos.toByteArray()[0]);
43+
}
44+
45+
@Test
46+
public void outputFileWriteArrayToStdOut() throws IOException, InterruptedException {
47+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
48+
System.setOut(new PrintStream(baos));
49+
final PositionOutputStream stdOut = new StandardOutputFile().create(1);
50+
final byte[] test = TEST.getBytes();
51+
stdOut.write(test);
52+
assertEquals(test.length, stdOut.getPos());
53+
assertEquals(test.length, baos.toByteArray().length);
54+
assertEquals(TEST, new String(baos.toByteArray()));
55+
}
56+
57+
@Test
58+
public void outputFileWriteArrayOffsetToStdOut() throws IOException, InterruptedException {
59+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
60+
System.setOut(new PrintStream(baos));
61+
final PositionOutputStream stdOut = new StandardOutputFile().create(1);
62+
final byte[] test = TEST.getBytes();
63+
stdOut.write(test, 1, 4);
64+
assertEquals(4, stdOut.getPos());
65+
assertEquals(4, baos.toByteArray().length);
66+
assertEquals(TEST.substring(1, 5), new String(baos.toByteArray()));
67+
}
68+
69+
}

0 commit comments

Comments
 (0)