Skip to content

Commit 97a405d

Browse files
committed
chew
1 parent 20b870d commit 97a405d

5 files changed

Lines changed: 199 additions & 48 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121

2222
import org.apache.iotdb.commons.client.ThriftClient;
2323
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
24-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
25-
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
26-
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
24+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
2725
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
2826
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
2927
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -37,12 +35,10 @@
3735
import org.slf4j.LoggerFactory;
3836

3937
import java.util.Objects;
40-
import java.util.concurrent.atomic.AtomicInteger;
4138

4239
public abstract class PipeTransferTrackableHandler
4340
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
4441
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
45-
private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0);
4642

4743
protected final IoTDBDataRegionAsyncSink sink;
4844
protected volatile AsyncPipeDataTransferServiceClient client;
@@ -137,9 +133,8 @@ protected abstract void doTransfer(
137133
protected final void transferWithOptionalRequestSlicing(
138134
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
139135
throws TException {
140-
final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
141-
if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
142-
|| req.body.limit() < bodySizeLimit) {
136+
final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
137+
if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
143138
client.pipeTransfer(req, this);
144139
return;
145140
}
@@ -152,15 +147,14 @@ protected final void transferWithOptionalRequestSlicing(
152147
req.body.limit(),
153148
bodySizeLimit);
154149

155-
final int sliceCount =
156-
req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1);
150+
final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit);
157151
final boolean shouldReturnSelf = client.shouldReturnSelf();
158152
try {
159153
transferSlicedRequest(
160154
client,
161155
req,
162156
shouldReturnSelf,
163-
SLICE_ORDER_ID_GENERATOR.getAndIncrement(),
157+
PipeTransferSliceReqBuilder.nextSliceOrderId(),
164158
0,
165159
sliceCount,
166160
bodySizeLimit);
@@ -180,18 +174,10 @@ private void transferSlicedRequest(
180174
final int sliceCount,
181175
final int bodySizeLimit)
182176
throws Exception {
183-
final int startIndexInBody = sliceIndex * bodySizeLimit;
184-
final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, originalReq.body.limit());
185177
client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 1);
186178
client.pipeTransfer(
187-
PipeTransferSliceReq.toTPipeTransferReq(
188-
sliceOrderId,
189-
originalReq.getType(),
190-
sliceIndex,
191-
sliceCount,
192-
originalReq.body.duplicate(),
193-
startIndexInBody,
194-
endIndexInBody),
179+
PipeTransferSliceReqBuilder.buildSliceReq(
180+
originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit),
195181
new AsyncMethodCallback<TPipeTransferResp>() {
196182
@Override
197183
public void onComplete(final TPipeTransferResp response) {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public void testLargeRequestWillBeSlicedForAsyncTransfer() throws Exception {
9797
final PipeTransferSliceReq thirdSlice =
9898
PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2));
9999

100-
Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType());
100+
Assert.assertEquals(
101+
PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType());
101102
Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId());
102103
Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId());
103104
Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType());
@@ -144,7 +145,8 @@ public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() thro
144145
handler.transfer(client, originalReq);
145146

146147
Assert.assertEquals(2, transferredRequests.size());
147-
Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType());
148+
Assert.assertEquals(
149+
PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType());
148150
Assert.assertEquals(originalReq.getType(), transferredRequests.get(1).getType());
149151
Assert.assertEquals(originalReq.getVersion(), transferredRequests.get(1).getVersion());
150152
Assert.assertArrayEquals(originalReq.getBody(), transferredRequests.get(1).getBody());
@@ -187,7 +189,8 @@ private TestPipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) {
187189
super(sink);
188190
}
189191

190-
private void transfer(final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
192+
private void transfer(
193+
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
191194
throws TException {
192195
tryTransfer(client, req);
193196
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.commons.client.ThriftClient;
2424
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
25-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
26-
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
27-
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
25+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
2826
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
2927
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
3028
import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,15 +37,11 @@
3937
import org.slf4j.Logger;
4038
import org.slf4j.LoggerFactory;
4139

42-
import java.util.concurrent.atomic.AtomicInteger;
43-
4440
public class IoTDBSyncClient extends IClientRPCService.Client
4541
implements ThriftClient, AutoCloseable {
4642

4743
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncClient.class);
4844

49-
private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0);
50-
5145
private final String ipAddress;
5246
private final int port;
5347
private final TEndPoint endPoint;
@@ -100,9 +94,8 @@ public void setTimeout(int timeout) {
10094

10195
@Override
10296
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException {
103-
final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
104-
if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
105-
|| req.body.limit() < bodySizeLimit) {
97+
final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
98+
if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
10699
return super.pipeTransfer(req);
107100
}
108101

@@ -115,23 +108,13 @@ public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TExcept
115108
bodySizeLimit);
116109

117110
try {
118-
final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
119-
// Slice the buffer to avoid the buffer being too large
120-
final int sliceCount =
121-
req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1);
111+
final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId();
112+
final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit);
122113
for (int i = 0; i < sliceCount; ++i) {
123-
final int startIndexInBody = i * bodySizeLimit;
124-
final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, req.body.limit());
125114
final TPipeTransferResp sliceResp =
126115
super.pipeTransfer(
127-
PipeTransferSliceReq.toTPipeTransferReq(
128-
sliceOrderId,
129-
req.getType(),
130-
i,
131-
sliceCount,
132-
req.body.duplicate(),
133-
startIndexInBody,
134-
endIndexInBody));
116+
PipeTransferSliceReqBuilder.buildSliceReq(
117+
req, sliceOrderId, i, sliceCount, bodySizeLimit));
135118

136119
if (i == sliceCount - 1) {
137120
return sliceResp;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
21+
22+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
23+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
24+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
25+
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
26+
27+
import java.io.IOException;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
public final class PipeTransferSliceReqBuilder {
31+
32+
private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0);
33+
34+
private PipeTransferSliceReqBuilder() {
35+
// Utility class
36+
}
37+
38+
public static int getBodySizeLimit() {
39+
return PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
40+
}
41+
42+
public static boolean shouldSlice(final TPipeTransferReq req, final int bodySizeLimit) {
43+
return req.getVersion() == IoTDBSinkRequestVersion.VERSION_1.getVersion()
44+
&& req.body.limit() >= bodySizeLimit;
45+
}
46+
47+
public static int nextSliceOrderId() {
48+
return SLICE_ORDER_ID_GENERATOR.getAndIncrement();
49+
}
50+
51+
public static int getSliceCount(final TPipeTransferReq req, final int bodySizeLimit) {
52+
return req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1);
53+
}
54+
55+
public static PipeTransferSliceReq buildSliceReq(
56+
final TPipeTransferReq originalReq,
57+
final int sliceOrderId,
58+
final int sliceIndex,
59+
final int sliceCount,
60+
final int bodySizeLimit)
61+
throws IOException {
62+
final int startIndexInBody = sliceIndex * bodySizeLimit;
63+
final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, originalReq.body.limit());
64+
return PipeTransferSliceReq.toTPipeTransferReq(
65+
sliceOrderId,
66+
originalReq.getType(),
67+
sliceIndex,
68+
sliceCount,
69+
originalReq.body.duplicate(),
70+
startIndexInBody,
71+
endIndexInBody);
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
21+
22+
import org.apache.iotdb.commons.conf.CommonConfig;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
25+
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
26+
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
27+
28+
import org.junit.After;
29+
import org.junit.Assert;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
import java.nio.ByteBuffer;
34+
35+
public class PipeTransferSliceReqBuilderTest {
36+
37+
private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
38+
39+
private int originalRequestSliceThresholdBytes;
40+
41+
@Before
42+
public void setUp() {
43+
originalRequestSliceThresholdBytes = commonConfig.getPipeSinkRequestSliceThresholdBytes();
44+
commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
45+
}
46+
47+
@After
48+
public void tearDown() {
49+
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
50+
}
51+
52+
@Test
53+
public void testBuildSliceReq() throws Exception {
54+
final TPipeTransferReq req = createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 10);
55+
final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
56+
57+
Assert.assertTrue(PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit));
58+
Assert.assertEquals(3, PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit));
59+
60+
final PipeTransferSliceReq firstSlice =
61+
PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 0, 3, bodySizeLimit);
62+
final PipeTransferSliceReq secondSlice =
63+
PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 1, 3, bodySizeLimit);
64+
final PipeTransferSliceReq thirdSlice =
65+
PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 2, 3, bodySizeLimit);
66+
67+
Assert.assertArrayEquals(new byte[] {0, 1, 2, 3}, firstSlice.getSliceBody());
68+
Assert.assertArrayEquals(new byte[] {4, 5, 6, 7}, secondSlice.getSliceBody());
69+
Assert.assertArrayEquals(new byte[] {8, 9}, thirdSlice.getSliceBody());
70+
Assert.assertEquals(0, firstSlice.getSliceIndex());
71+
Assert.assertEquals(1, secondSlice.getSliceIndex());
72+
Assert.assertEquals(2, thirdSlice.getSliceIndex());
73+
Assert.assertEquals(3, firstSlice.getSliceCount());
74+
Assert.assertEquals(req.getType(), firstSlice.getOriginReqType());
75+
Assert.assertEquals(10, firstSlice.getOriginBodySize());
76+
}
77+
78+
@Test
79+
public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() {
80+
final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
81+
82+
Assert.assertFalse(
83+
PipeTransferSliceReqBuilder.shouldSlice(
84+
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 3), bodySizeLimit));
85+
Assert.assertFalse(
86+
PipeTransferSliceReqBuilder.shouldSlice(
87+
createReq((byte) (IoTDBSinkRequestVersion.VERSION_1.getVersion() + 1), 10),
88+
bodySizeLimit));
89+
Assert.assertTrue(
90+
PipeTransferSliceReqBuilder.shouldSlice(
91+
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4), bodySizeLimit));
92+
}
93+
94+
private static TPipeTransferReq createReq(final byte version, final int bodySize) {
95+
final byte[] body = new byte[bodySize];
96+
for (int i = 0; i < body.length; ++i) {
97+
body[i] = (byte) i;
98+
}
99+
100+
final TPipeTransferReq req = new TPipeTransferReq();
101+
req.version = version;
102+
req.type = (short) 123;
103+
req.body = ByteBuffer.wrap(body);
104+
return req;
105+
}
106+
}

0 commit comments

Comments
 (0)