1919
2020package org .apache .iotdb .pipe .it .dual .tablemodel .manual .enhanced ;
2121
22+ import org .apache .iotdb .common .rpc .thrift .TConsensusGroupType ;
2223import org .apache .iotdb .common .rpc .thrift .TSStatus ;
2324import org .apache .iotdb .commons .client .exception .ClientManagerException ;
2425import org .apache .iotdb .commons .client .sync .SyncConfigNodeIServiceClient ;
2526import org .apache .iotdb .commons .cluster .RegionRoleType ;
2627import org .apache .iotdb .confignode .rpc .thrift .TCreatePipeReq ;
28+ import org .apache .iotdb .confignode .rpc .thrift .TRegionInfo ;
2729import org .apache .iotdb .confignode .rpc .thrift .TShowPipeInfo ;
2830import org .apache .iotdb .confignode .rpc .thrift .TShowPipeReq ;
2931import org .apache .iotdb .confignode .rpc .thrift .TShowRegionReq ;
5153import java .io .IOException ;
5254import java .util .ArrayList ;
5355import java .util .Arrays ;
56+ import java .util .Collections ;
57+ import java .util .Comparator ;
5458import java .util .HashMap ;
5559import java .util .HashSet ;
5660import java .util .List ;
5963import java .util .concurrent .atomic .AtomicInteger ;
6064import java .util .function .Consumer ;
6165
66+ import static org .awaitility .Awaitility .await ;
6267import static org .junit .Assert .fail ;
6368
6469@ RunWith (IoTDBTestRunner .class )
6570@ Category ({MultiClusterIT2DualTableManualEnhanced .class })
6671public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT {
6772
73+ private static final double SYNC_LAG_DELTA = 0.001 ;
74+
6875 @ Override
6976 @ Before
7077 public void setUp () {
@@ -299,41 +306,7 @@ public void testPipeAfterDataRegionLeaderStop() {
299306
300307 TableModelUtils .insertData ("test1" , "test1" , 100 , 200 , senderEnv );
301308
302- final AtomicInteger leaderPort = new AtomicInteger (-1 );
303- final TShowRegionResp showRegionResp =
304- client .showRegion (new TShowRegionReq ().setIsTableModel (true ));
305- showRegionResp
306- .getRegionInfoList ()
307- .forEach (
308- regionInfo -> {
309- if (RegionRoleType .Leader .getRoleType ().equals (regionInfo .getRoleType ())) {
310- leaderPort .set (regionInfo .getClientRpcPort ());
311- }
312- });
313-
314- int leaderIndex = -1 ;
315- for (int i = 0 ; i < 3 ; ++i ) {
316- if (senderEnv .getDataNodeWrapper (i ).getPort () == leaderPort .get ()) {
317- leaderIndex = i ;
318- try {
319- senderEnv .shutdownDataNode (i );
320- } catch (final Throwable e ) {
321- e .printStackTrace ();
322- return ;
323- }
324- try {
325- TimeUnit .SECONDS .sleep (1 );
326- } catch (final InterruptedException ignored ) {
327- }
328- try {
329- senderEnv .startDataNode (i );
330- ((AbstractEnv ) senderEnv ).checkClusterStatusWithoutUnknown ();
331- } catch (final Throwable e ) {
332- e .printStackTrace ();
333- return ;
334- }
335- }
336- }
309+ final int leaderIndex = restartTableDataRegionLeader (client , "test1" );
337310 if (leaderIndex == -1 ) { // ensure the leader is stopped
338311 fail ();
339312 }
@@ -343,6 +316,7 @@ public void testPipeAfterDataRegionLeaderStop() {
343316 TableModelUtils .insertData ("test1" , "test1" , 200 , 300 , senderEnv );
344317
345318 TableModelUtils .assertData ("test" , "test" , 0 , 300 , receiverEnv , handleFailure );
319+ waitForTableDataRegionReplicationComplete (Arrays .asList ("test" , "test1" ));
346320 }
347321
348322 try {
@@ -398,6 +372,140 @@ public void testPipeAfterDataRegionLeaderStop() {
398372 }
399373 }
400374
375+ private int restartTableDataRegionLeader (
376+ final SyncConfigNodeIServiceClient client , final String database ) throws TException {
377+ final List <TRegionInfo > leaderRegionInfoList =
378+ showTableDataRegionLeaders (Collections .singletonList (database ), client );
379+ if (leaderRegionInfoList .isEmpty ()) {
380+ return -1 ;
381+ }
382+
383+ final TRegionInfo targetRegionInfo =
384+ leaderRegionInfoList .stream ()
385+ .min (Comparator .comparingInt (regionInfo -> regionInfo .getConsensusGroupId ().getId ()))
386+ .orElse (null );
387+ if (targetRegionInfo == null ) {
388+ return -1 ;
389+ }
390+
391+ final int leaderPort = targetRegionInfo .getClientRpcPort ();
392+ for (int i = 0 ; i < senderEnv .getDataNodeWrapperList ().size (); ++i ) {
393+ if (senderEnv .getDataNodeWrapper (i ).getPort () != leaderPort ) {
394+ continue ;
395+ }
396+
397+ try {
398+ senderEnv .shutdownDataNode (i );
399+ } catch (final Throwable e ) {
400+ e .printStackTrace ();
401+ return -1 ;
402+ }
403+
404+ try {
405+ TimeUnit .SECONDS .sleep (1 );
406+ } catch (final InterruptedException ignored ) {
407+ Thread .currentThread ().interrupt ();
408+ return -1 ;
409+ }
410+
411+ try {
412+ senderEnv .startDataNode (i );
413+ ((AbstractEnv ) senderEnv ).checkClusterStatusWithoutUnknown ();
414+ } catch (final Throwable e ) {
415+ e .printStackTrace ();
416+ return -1 ;
417+ }
418+ return i ;
419+ }
420+ return -1 ;
421+ }
422+
423+ private void waitForTableDataRegionReplicationComplete (final List <String > databases ) {
424+ await ()
425+ .pollInterval (500 , TimeUnit .MILLISECONDS )
426+ .atMost (2 , TimeUnit .MINUTES )
427+ .untilAsserted (
428+ () -> {
429+ try (final SyncConfigNodeIServiceClient client =
430+ (SyncConfigNodeIServiceClient ) senderEnv .getLeaderConfigNodeConnection ()) {
431+ final List <TRegionInfo > leaderRegionInfoList =
432+ showTableDataRegionLeaders (databases , client );
433+ Assert .assertFalse (
434+ "No table DataRegion leader found for databases " + databases ,
435+ leaderRegionInfoList .isEmpty ());
436+
437+ for (final TRegionInfo regionInfo : leaderRegionInfoList ) {
438+ final DataNodeWrapper leaderNode =
439+ findDataNodeWrapperByPort (regionInfo .getClientRpcPort ());
440+ final String metricsUrl =
441+ "http://"
442+ + leaderNode .getIp ()
443+ + ":"
444+ + leaderNode .getMetricPort ()
445+ + "/metrics" ;
446+ final String metricsContent = senderEnv .getUrlContent (metricsUrl , null );
447+ Assert .assertNotNull (
448+ "Failed to fetch metrics from leader DataNode at " + metricsUrl ,
449+ metricsContent );
450+ assertSyncLagIsZero (metricsContent , buildDataRegionTag (regionInfo ), metricsUrl );
451+ }
452+ }
453+ });
454+ }
455+
456+ private List <TRegionInfo > showTableDataRegionLeaders (
457+ final List <String > databases , final SyncConfigNodeIServiceClient client ) throws TException {
458+ final TShowRegionResp showRegionResp =
459+ client .showRegion (
460+ new TShowRegionReq ()
461+ .setConsensusGroupType (TConsensusGroupType .DataRegion )
462+ .setDatabases (databases )
463+ .setIsTableModel (true ));
464+ Assert .assertEquals (
465+ TSStatusCode .SUCCESS_STATUS .getStatusCode (), showRegionResp .getStatus ().getCode ());
466+ final List <TRegionInfo > result = new ArrayList <>();
467+ for (final TRegionInfo regionInfo : showRegionResp .getRegionInfoList ()) {
468+ if (RegionRoleType .Leader .getRoleType ().equals (regionInfo .getRoleType ())) {
469+ result .add (regionInfo );
470+ }
471+ }
472+ return result ;
473+ }
474+
475+ private DataNodeWrapper findDataNodeWrapperByPort (final int port ) {
476+ for (final DataNodeWrapper dataNodeWrapper : senderEnv .getDataNodeWrapperList ()) {
477+ if (dataNodeWrapper .getPort () == port ) {
478+ return dataNodeWrapper ;
479+ }
480+ }
481+ fail ("Failed to find DataNodeWrapper for client rpc port " + port );
482+ return null ;
483+ }
484+
485+ private String buildDataRegionTag (final TRegionInfo regionInfo ) {
486+ return "DataRegion[" + regionInfo .getConsensusGroupId ().getId () + "]" ;
487+ }
488+
489+ private void assertSyncLagIsZero (
490+ final String metricsContent , final String dataRegionTag , final String metricsUrl ) {
491+ for (final String line : metricsContent .split ("\\ R" )) {
492+ if (!line .startsWith ("iot_consensus{" )
493+ || !line .contains ("type=\" syncLag\" " )
494+ || !line .contains ("region=\" " + dataRegionTag + "\" " )) {
495+ continue ;
496+ }
497+ final int lastSpaceIndex = line .lastIndexOf (' ' );
498+ Assert .assertTrue ("Malformed syncLag metric line: " + line , lastSpaceIndex > 0 );
499+ Assert .assertEquals (
500+ "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl + " but got " + line ,
501+ 0.0 ,
502+ Double .parseDouble (line .substring (lastSpaceIndex + 1 )),
503+ SYNC_LAG_DELTA );
504+ return ;
505+ }
506+ fail ("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl );
507+ }
508+
401509 @ Test
402510 public void testPipeAfterRegisterNewDataNode () throws Exception {
403511 final DataNodeWrapper receiverDataNode = receiverEnv .getDataNodeWrapper (0 );
0 commit comments