Skip to content

Commit f244ff9

Browse files
chore: don't reimplement Channels.newOutputStream (#4363)
Change-Id: I9eafa3fa692db20f77b8a526761a8f76f597df50 Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable-hbase/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 95628dc commit f244ff9

File tree

1 file changed

+2
-54
lines changed
  • bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles

1 file changed

+2
-54
lines changed

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616
package com.google.cloud.bigtable.beam.sequencefiles;
1717

1818
import com.google.common.collect.Sets;
19-
import java.io.IOException;
20-
import java.io.OutputStream;
21-
import java.nio.Buffer;
22-
import java.nio.ByteBuffer;
19+
import java.nio.channels.Channels;
2320
import java.nio.channels.WritableByteChannel;
2421
import java.util.Set;
2522
import java.util.concurrent.atomic.AtomicLong;
2623
import org.apache.beam.sdk.io.Compression;
2724
import org.apache.beam.sdk.io.DynamicFileDestinations;
2825
import org.apache.beam.sdk.io.FileBasedSink;
29-
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
30-
import org.apache.beam.sdk.io.FileBasedSink.Writer;
3126
import org.apache.beam.sdk.io.fs.ResourceId;
3227
import org.apache.beam.sdk.options.ValueProvider;
3328
import org.apache.beam.sdk.util.MimeTypes;
@@ -155,7 +150,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
155150
configuration.setStrings("io.serializations", writeOperation.serializationNames);
156151

157152
FSDataOutputStream outputStream =
158-
new FSDataOutputStream(new OutputStreamWrapper(channel), new Statistics("dataflow"));
153+
new FSDataOutputStream(Channels.newOutputStream(channel), new Statistics("dataflow"));
159154
sequenceFile =
160155
SequenceFile.createWriter(
161156
configuration,
@@ -181,51 +176,4 @@ public void write(KV<K, V> value) throws Exception {
181176
sequenceFile.append(value.getKey(), value.getValue());
182177
}
183178
}
184-
185-
/**
186-
* Adapter to allow Hadoop's {@link SequenceFile} to write to Beam's {@link WritableByteChannel}.
187-
*/
188-
static class OutputStreamWrapper extends OutputStream {
189-
private final WritableByteChannel inner;
190-
private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1);
191-
192-
/**
193-
* Constructs a new {@link OutputStreamWrapper}.
194-
*
195-
* @param inner An instance of Beam's {@link WritableByteChannel}.
196-
*/
197-
OutputStreamWrapper(WritableByteChannel inner) {
198-
this.inner = inner;
199-
}
200-
201-
/** {@inheritDoc} */
202-
@Override
203-
public void write(byte[] b, int off, int len) throws IOException {
204-
int written = 0;
205-
206-
ByteBuffer byteBuffer = ByteBuffer.wrap(b, off, len);
207-
208-
while (written < len) {
209-
// Workaround Java 9 overridden methods with covariant return types
210-
((Buffer) byteBuffer).position(written + off);
211-
written += this.inner.write(byteBuffer);
212-
}
213-
}
214-
215-
/** {@inheritDoc} */
216-
@Override
217-
public void write(int b) throws IOException {
218-
// Workaround Java 9 overridden methods with covariant return types
219-
((Buffer) singleByteBuffer).clear();
220-
singleByteBuffer.put((byte) b);
221-
222-
int written = 0;
223-
224-
while (written == 0) {
225-
// Workaround Java 9 overridden methods with covariant return types
226-
((Buffer) singleByteBuffer).position(0);
227-
written = this.inner.write(singleByteBuffer);
228-
}
229-
}
230-
}
231179
}

0 commit comments

Comments
 (0)