Skip to content

Commit b045476

Browse files
authored
feat: bulk write pipeline (#68)
* feat: remove common auth method * feat: put result * feat: refactor write api * feat: write pipeline * refactor: big refactor of bulk write impl * chore: avoid unnecessary busy-wait on stream ready * chore: by review
1 parent 9463e24 commit b045476

17 files changed

Lines changed: 799 additions & 277 deletions

File tree

ingester-bulk-protocol/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@
7777
<groupId>com.google.protobuf</groupId>
7878
<artifactId>protobuf-java</artifactId>
7979
</dependency>
80+
<dependency>
81+
<groupId>com.google.code.gson</groupId>
82+
<artifactId>gson</artifactId>
83+
</dependency>
8084

8185
<!-- test -->
8286
<dependency>

ingester-bulk-protocol/src/main/java/io/greptime/ArrowCompressionType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ public enum ArrowCompressionType {
2424
* Zstd compression.
2525
*/
2626
Zstd,
27+
2728
/**
2829
* Lz4 compression.
2930
*/
3031
Lz4,
32+
3133
/**
3234
* No compression.
3335
*/

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.greptime.common.util.MetricsUtil;
2424
import io.greptime.rpc.TlsOptions;
2525
import io.netty.util.internal.SystemPropertyUtil;
26-
import org.apache.arrow.flight.AsyncPutListener;
2726
import org.apache.arrow.flight.BulkFlightClient;
2827
import org.apache.arrow.flight.BulkFlightClient.ClientStreamListener;
2928
import org.apache.arrow.flight.BulkFlightClient.PutListener;
@@ -45,15 +44,9 @@
4544
* BulkWriteManager is a specialized manager for efficiently writing block data to the server.
4645
*
4746
* It encapsulates a Flight client and a buffer allocator to manage memory resources.
48-
*
4947
* The primary function of this manager is to establish bulk write streams,
5048
* which provide an optimized channel for transmitting block data to the server.
5149
* These streams handle the serialization and transfer of data in an efficient manner.
52-
*
53-
* <p>
54-
* `ZERO_COPY_WRITE` is disabled by default, if you want to enable it, you can set the system property
55-
* `arrow.flight.enable_zero_copy_write` to `true`.
56-
* </p>
5750
*/
5851
public class BulkWriteManager implements AutoCloseable {
5952

@@ -136,43 +129,26 @@ public static BulkWriteManager create(
136129
return client;
137130
}
138131

139-
/**
140-
* @see #intoBulkWriteStream(String, String, Schema, long, CallOption...)
141-
*/
142-
public BulkWriteService intoBulkWriteStream(
143-
String database, String table, Schema schema, long timeoutMs, CallOption... options) {
144-
return intoBulkWriteStream(database, table, schema, timeoutMs, new AsyncPutListener(), options);
145-
}
146-
147132
/**
148133
* Creates a bulk write stream for efficiently writing data to the server.
149134
*
150-
* @param database the name of the target database
151135
* @param table the name of the target table
152136
* @param schema the Arrow schema defining the structure of the data to be written
153137
* @param timeoutMs the timeout in milliseconds for the write operation
154-
* @param metadataListener listener for handling server metadata responses during the write operation
155138
* @param options optional RPC-layer hints to configure the underlying Flight client call
156139
* @return a BulkStreamWriter instance that manages the data transfer process
157140
*/
158-
public BulkWriteService intoBulkWriteStream(
159-
String database,
160-
String table,
161-
Schema schema,
162-
long timeoutMs,
163-
PutListener metadataListener,
164-
CallOption... options) {
165-
FlightDescriptor descriptor = FlightDescriptor.path(database, table);
166-
return new BulkWriteService(this, schema, descriptor, metadataListener, timeoutMs, options);
141+
public BulkWriteService intoBulkWriteStream(String table, Schema schema, long timeoutMs, CallOption... options) {
142+
FlightDescriptor descriptor = FlightDescriptor.path(table);
143+
return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, options);
167144
}
168145

169146
VectorSchemaRoot createSchemaRoot(Schema schema) {
170147
return VectorSchemaRoot.create(schema, this.allocator);
171148
}
172149

173-
ClientStreamListener startPut(
174-
FlightDescriptor descriptor, PutListener metadataListener, Runnable onReadyHandler, CallOption... options) {
175-
return this.flightClient.startPut(descriptor, metadataListener, onReadyHandler, options);
150+
ClientStreamListener startPut(FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) {
151+
return this.flightClient.startPut(descriptor, metadataListener, options);
176152
}
177153

178154
DictionaryProvider newDefaultDictionaryProvider() {
@@ -196,13 +172,13 @@ static class FlightAllocationListener implements AllocationListener {
196172

197173
@Override
198174
public void onAllocation(long size) {
199-
LOG.debug("onAllocation: {}", size);
175+
LOG.trace("onAllocation: {}", size);
200176
ALLOCATION_BYTES.inc(size);
201177
}
202178

203179
@Override
204180
public void onRelease(long size) {
205-
LOG.debug("onRelease: {}", size);
181+
LOG.trace("onRelease: {}", size);
206182
ALLOCATION_BYTES.dec(size);
207183
}
208184

0 commit comments

Comments
 (0)