1919
2020package org .apache .iotdb .pipe .it .autocreate ;
2121
22+ import org .apache .iotdb .common .rpc .thrift .TConsensusGroupType ;
2223import org .apache .iotdb .common .rpc .thrift .TSStatus ;
2324import org .apache .iotdb .commons .client .exception .ClientManagerException ;
2425import org .apache .iotdb .commons .client .sync .SyncConfigNodeIServiceClient ;
2526import org .apache .iotdb .commons .cluster .RegionRoleType ;
2627import org .apache .iotdb .confignode .rpc .thrift .TCreatePipeReq ;
28+ import org .apache .iotdb .confignode .rpc .thrift .TRegionInfo ;
2729import org .apache .iotdb .confignode .rpc .thrift .TShowPipeInfo ;
2830import org .apache .iotdb .confignode .rpc .thrift .TShowPipeReq ;
2931import org .apache .iotdb .confignode .rpc .thrift .TShowRegionReq ;
4951import java .util .ArrayList ;
5052import java .util .Arrays ;
5153import java .util .Collections ;
54+ import java .util .Comparator ;
5255import java .util .HashMap ;
5356import java .util .List ;
5457import java .util .Map ;
5558import java .util .concurrent .TimeUnit ;
5659import java .util .concurrent .atomic .AtomicInteger ;
5760
61+ import static org .awaitility .Awaitility .await ;
5862import static org .junit .Assert .fail ;
5963
6064@ RunWith (IoTDBTestRunner .class )
6165@ Category ({MultiClusterIT2AutoCreateSchema .class })
6266public class IoTDBPipeClusterIT extends AbstractPipeDualAutoIT {
6367
68+ private static final double SYNC_LAG_DELTA = 0.001 ;
69+
6470 @ Override
6571 @ Before
6672 public void setUp () {
@@ -77,6 +83,10 @@ public void setUp() {
7783 .setDataRegionConsensusProtocolClass (ConsensusFactory .IOT_CONSENSUS )
7884 .setPipeMemoryManagementEnabled (false )
7985 .setIsPipeEnableMemoryCheck (false );
86+ senderEnv
87+ .getConfig ()
88+ .getDataNodeConfig ()
89+ .setMetricReporterType (Collections .singletonList ("PROMETHEUS" ));
8090
8191 receiverEnv
8292 .getConfig ()
@@ -89,6 +99,10 @@ public void setUp() {
8999 .setDataRegionConsensusProtocolClass (ConsensusFactory .IOT_CONSENSUS )
90100 .setPipeMemoryManagementEnabled (false )
91101 .setIsPipeEnableMemoryCheck (false );
102+ receiverEnv
103+ .getConfig ()
104+ .getDataNodeConfig ()
105+ .setMetricReporterType (Collections .singletonList ("PROMETHEUS" ));
92106
93107 // 10 min, assert that the operations will not time out
94108 senderEnv .getConfig ().getCommonConfig ().setDnConnectionTimeoutMs (600000 );
@@ -302,40 +316,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
302316 TestUtils .executeNonQueries (
303317 senderEnv , Arrays .asList ("insert into root.db.d1(time,s1) values (1,1)" , "flush" ), null );
304318
305- final AtomicInteger leaderPort = new AtomicInteger (-1 );
306- final TShowRegionResp showRegionResp = client .showRegion (new TShowRegionReq ());
307- showRegionResp
308- .getRegionInfoList ()
309- .forEach (
310- regionInfo -> {
311- if (RegionRoleType .Leader .getRoleType ().equals (regionInfo .getRoleType ())) {
312- leaderPort .set (regionInfo .getClientRpcPort ());
313- }
314- });
315-
316- int leaderIndex = -1 ;
317- for (int i = 0 ; i < 3 ; ++i ) {
318- if (senderEnv .getDataNodeWrapper (i ).getPort () == leaderPort .get ()) {
319- leaderIndex = i ;
320- try {
321- senderEnv .shutdownDataNode (i );
322- } catch (final Throwable e ) {
323- e .printStackTrace ();
324- return ;
325- }
326- try {
327- TimeUnit .SECONDS .sleep (1 );
328- } catch (final InterruptedException ignored ) {
329- }
330- try {
331- senderEnv .startDataNode (i );
332- ((AbstractEnv ) senderEnv ).checkClusterStatusWithoutUnknown ();
333- } catch (final Throwable e ) {
334- e .printStackTrace ();
335- return ;
336- }
337- }
338- }
319+ final int leaderIndex = restartTreeDataRegionLeader (client , "root.db" );
339320 if (leaderIndex == -1 ) { // ensure the leader is stopped
340321 fail ();
341322 }
@@ -352,6 +333,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
352333 "select count(*) from root.db.d1" ,
353334 "count(root.db.d1.s1)," ,
354335 Collections .singleton ("2," ));
336+ waitForTreeDataRegionReplicationComplete (Collections .singletonList ("root.db" ));
355337 }
356338
357339 try {
@@ -397,6 +379,139 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception {
397379 }
398380 }
399381
382+ private int restartTreeDataRegionLeader (
383+ final SyncConfigNodeIServiceClient client , final String database ) throws TException {
384+ final List <TRegionInfo > leaderRegionInfoList =
385+ showTreeDataRegionLeaders (Collections .singletonList (database ), client );
386+ if (leaderRegionInfoList .isEmpty ()) {
387+ return -1 ;
388+ }
389+
390+ final TRegionInfo targetRegionInfo =
391+ leaderRegionInfoList .stream ()
392+ .min (Comparator .comparingInt (regionInfo -> regionInfo .getConsensusGroupId ().getId ()))
393+ .orElse (null );
394+ if (targetRegionInfo == null ) {
395+ return -1 ;
396+ }
397+
398+ final int leaderPort = targetRegionInfo .getClientRpcPort ();
399+ for (int i = 0 ; i < senderEnv .getDataNodeWrapperList ().size (); ++i ) {
400+ if (senderEnv .getDataNodeWrapper (i ).getPort () != leaderPort ) {
401+ continue ;
402+ }
403+
404+ try {
405+ senderEnv .shutdownDataNode (i );
406+ } catch (final Throwable e ) {
407+ e .printStackTrace ();
408+ return -1 ;
409+ }
410+
411+ try {
412+ TimeUnit .SECONDS .sleep (1 );
413+ } catch (final InterruptedException ignored ) {
414+ Thread .currentThread ().interrupt ();
415+ return -1 ;
416+ }
417+
418+ try {
419+ senderEnv .startDataNode (i );
420+ ((AbstractEnv ) senderEnv ).checkClusterStatusWithoutUnknown ();
421+ } catch (final Throwable e ) {
422+ e .printStackTrace ();
423+ return -1 ;
424+ }
425+ return i ;
426+ }
427+ return -1 ;
428+ }
429+
430+ private void waitForTreeDataRegionReplicationComplete (final List <String > databases ) {
431+ await ()
432+ .pollInterval (500 , TimeUnit .MILLISECONDS )
433+ .atMost (2 , TimeUnit .MINUTES )
434+ .untilAsserted (
435+ () -> {
436+ try (final SyncConfigNodeIServiceClient client =
437+ (SyncConfigNodeIServiceClient ) senderEnv .getLeaderConfigNodeConnection ()) {
438+ final List <TRegionInfo > leaderRegionInfoList =
439+ showTreeDataRegionLeaders (databases , client );
440+ Assert .assertFalse (
441+ "No tree DataRegion leader found for databases " + databases ,
442+ leaderRegionInfoList .isEmpty ());
443+
444+ for (final TRegionInfo regionInfo : leaderRegionInfoList ) {
445+ final DataNodeWrapper leaderNode =
446+ findDataNodeWrapperByPort (regionInfo .getClientRpcPort ());
447+ final String metricsUrl =
448+ "http://"
449+ + leaderNode .getIp ()
450+ + ":"
451+ + leaderNode .getMetricPort ()
452+ + "/metrics" ;
453+ final String metricsContent = senderEnv .getUrlContent (metricsUrl , null );
454+ Assert .assertNotNull (
455+ "Failed to fetch metrics from leader DataNode at " + metricsUrl ,
456+ metricsContent );
457+ assertSyncLagIsZero (metricsContent , buildDataRegionTag (regionInfo ), metricsUrl );
458+ }
459+ }
460+ });
461+ }
462+
463+ private List <TRegionInfo > showTreeDataRegionLeaders (
464+ final List <String > databases , final SyncConfigNodeIServiceClient client ) throws TException {
465+ final TShowRegionResp showRegionResp =
466+ client .showRegion (
467+ new TShowRegionReq ()
468+ .setConsensusGroupType (TConsensusGroupType .DataRegion )
469+ .setDatabases (databases ));
470+ Assert .assertEquals (
471+ TSStatusCode .SUCCESS_STATUS .getStatusCode (), showRegionResp .getStatus ().getCode ());
472+ final List <TRegionInfo > result = new ArrayList <>();
473+ for (final TRegionInfo regionInfo : showRegionResp .getRegionInfoList ()) {
474+ if (RegionRoleType .Leader .getRoleType ().equals (regionInfo .getRoleType ())) {
475+ result .add (regionInfo );
476+ }
477+ }
478+ return result ;
479+ }
480+
481+ private DataNodeWrapper findDataNodeWrapperByPort (final int port ) {
482+ for (final DataNodeWrapper dataNodeWrapper : senderEnv .getDataNodeWrapperList ()) {
483+ if (dataNodeWrapper .getPort () == port ) {
484+ return dataNodeWrapper ;
485+ }
486+ }
487+ fail ("Failed to find DataNodeWrapper for client rpc port " + port );
488+ return null ;
489+ }
490+
491+ private String buildDataRegionTag (final TRegionInfo regionInfo ) {
492+ return "DataRegion[" + regionInfo .getConsensusGroupId ().getId () + "]" ;
493+ }
494+
495+ private void assertSyncLagIsZero (
496+ final String metricsContent , final String dataRegionTag , final String metricsUrl ) {
497+ for (final String line : metricsContent .split ("\\ R" )) {
498+ if (!line .startsWith ("iot_consensus{" )
499+ || !line .contains ("type=\" syncLag\" " )
500+ || !line .contains ("region=\" " + dataRegionTag + "\" " )) {
501+ continue ;
502+ }
503+ final int lastSpaceIndex = line .lastIndexOf (' ' );
504+ Assert .assertTrue ("Malformed syncLag metric line: " + line , lastSpaceIndex > 0 );
505+ Assert .assertEquals (
506+ "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl + " but got " + line ,
507+ 0.0 ,
508+ Double .parseDouble (line .substring (lastSpaceIndex + 1 )),
509+ SYNC_LAG_DELTA );
510+ return ;
511+ }
512+ fail ("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl );
513+ }
514+
400515 @ Test
401516 public void testPipeAfterRegisterNewDataNode () throws Exception {
402517 final DataNodeWrapper receiverDataNode = receiverEnv .getDataNodeWrapper (0 );
0 commit comments