Skip to content

Commit 1d5c8e2

Browse files
authored
Pipe CI: always flush for IoTDBPipeSinkCompressionIT with batch mode (#16247)
* setup * more fix
1 parent eae753f commit 1d5c8e2

1 file changed

Lines changed: 19 additions & 3 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.HashMap;
4949
import java.util.List;
5050
import java.util.Map;
51+
import java.util.function.Consumer;
5152

5253
import static org.junit.Assert.fail;
5354

@@ -131,6 +132,12 @@ private void doTest(
131132
? receiverDataNode.getPipeAirGapReceiverPort()
132133
: receiverDataNode.getPort();
133134

135+
final Consumer<String> handleFailure =
136+
o -> {
137+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
138+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
139+
};
140+
134141
try (final SyncConfigNodeIServiceClient client =
135142
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
136143
if (!TestUtils.tryExecuteNonQueriesWithRetry(
@@ -173,7 +180,8 @@ private void doTest(
173180
receiverEnv,
174181
"select count(*) from root.db.**",
175182
"count(root.db.d1.s1),",
176-
Collections.singleton("2,"));
183+
Collections.singleton("2,"),
184+
handleFailure);
177185

178186
if (!TestUtils.tryExecuteNonQueriesWithRetry(
179187
senderEnv,
@@ -193,7 +201,8 @@ private void doTest(
193201
receiverEnv,
194202
"select count(*) from root.db.**",
195203
"count(root.db.d1.s1),",
196-
Collections.singleton("8,"));
204+
Collections.singleton("8,"),
205+
handleFailure);
197206
}
198207
}
199208

@@ -204,6 +213,12 @@ public void testZstdCompressorLevel() throws Exception {
204213
final String receiverIp = receiverDataNode.getIp();
205214
final int receiverPort = receiverDataNode.getPort();
206215

216+
final Consumer<String> handleFailure =
217+
o -> {
218+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
219+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
220+
};
221+
207222
try (final SyncConfigNodeIServiceClient client =
208223
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
209224
if (!TestUtils.tryExecuteNonQueriesWithRetry(
@@ -320,7 +335,8 @@ public void testZstdCompressorLevel() throws Exception {
320335
receiverEnv,
321336
"count timeseries root.db.**",
322337
"count(timeseries),",
323-
Collections.singleton("3,"));
338+
Collections.singleton("3,"),
339+
handleFailure);
324340
}
325341
}
326342
}

0 commit comments

Comments
 (0)