|
54 | 54 | import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; |
55 | 55 | import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; |
56 | 56 | import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; |
| 57 | +import org.apache.logging.log4j.Level; |
| 58 | +import org.apache.logging.log4j.LogManager; |
| 59 | +import org.apache.logging.log4j.core.LoggerContext; |
| 60 | +import org.apache.logging.log4j.core.config.LoggerConfig; |
| 61 | +import org.apache.logging.log4j.core.test.appender.ListAppender; |
57 | 62 | import org.apache.thrift.TException; |
58 | 63 | import org.junit.After; |
59 | 64 | import org.junit.Assert; |
@@ -473,7 +478,7 @@ public void testPartitionDiscoveryTablePattern() throws TException, IOException |
473 | 478 | } |
474 | 479 |
|
475 | 480 | @Test |
476 | | - public void testPartitionDiscoveryTransactionalTable() |
| 481 | + public void testPartitionDiscoveryTransactionalTableConcurrent() |
477 | 482 | throws TException, IOException, InterruptedException, ExecutionException { |
478 | 483 | String dbName = "db6"; |
479 | 484 | String tableName = "tbl6"; |
@@ -503,47 +508,72 @@ public void testPartitionDiscoveryTransactionalTable() |
503 | 508 | TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); |
504 | 509 | client.alter_table(dbName, tableName, table); |
505 | 510 |
|
506 | | - runPartitionManagementTask(conf); |
507 | | - partitions = client.listPartitions(dbName, tableName, (short) -1); |
508 | | - assertEquals(5, partitions.size()); |
509 | | - |
510 | | - // only one partition discovery task is running, there will be no skipped attempts |
511 | | - assertEquals(0, PartitionManagementTask.getSkippedAttempts()); |
512 | | - |
513 | | - // delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the |
514 | | - // same table, only one of them will run other attempts will be skipped |
515 | | - boolean deleted = fs.delete(newPart1.getParent(), true); |
516 | | - assertTrue(deleted); |
517 | | - assertEquals(4, fs.listStatus(tablePath).length); |
518 | | - |
519 | | - // 3 tasks are submitted at the same time, only one will eventually lock the table and only one get to run at a time |
520 | | - // This is to simulate, skipping partition discovery task attempt when previous attempt is still incomplete |
521 | | - PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask(); |
522 | | - partitionDiscoveryTask1.setConf(conf); |
523 | | - PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask(); |
524 | | - partitionDiscoveryTask2.setConf(conf); |
525 | | - PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask(); |
526 | | - partitionDiscoveryTask3.setConf(conf); |
527 | | - List<PartitionManagementTask> tasks = Lists |
528 | | - .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3); |
529 | | - ExecutorService executorService = Executors.newFixedThreadPool(3); |
530 | | - int successBefore = PartitionManagementTask.getCompletedAttempts(); |
531 | | - int skippedBefore = PartitionManagementTask.getSkippedAttempts(); |
532 | | - List<Future<?>> futures = new ArrayList<>(); |
533 | | - for (PartitionManagementTask task : tasks) { |
534 | | - futures.add(executorService.submit(task)); |
535 | | - } |
536 | | - for (Future<?> future : futures) { |
537 | | - future.get(); |
| 511 | + final String appenderName = "testPartitionDiscoveryTransactionalTableConcurrentAppender"; |
| 512 | + LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false); |
| 513 | + LoggerConfig rootLoggerConfig = loggerContext.getConfiguration().getLoggerConfig(""); |
| 514 | + ListAppender skipAppender = new ListAppender(appenderName); |
| 515 | + skipAppender.start(); |
| 516 | + rootLoggerConfig.addAppender(skipAppender, Level.INFO, null); |
| 517 | + try { |
| 518 | + runPartitionManagementTask(conf); |
| 519 | + partitions = client.listPartitions(dbName, tableName, (short) -1); |
| 520 | + assertEquals(5, partitions.size()); |
| 521 | + |
| 522 | + // only one partition discovery task is running, there will be no skipped attempts |
| 523 | + assertEquals(0, countSkipMessages(skipAppender)); |
| 524 | + assertEquals(1, countDiscoveryEntries(skipAppender)); |
| 525 | + |
| 526 | + // delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the |
| 527 | + // same table, only one of them will run other attempts will be skipped |
| 528 | + boolean deleted = fs.delete(newPart1.getParent(), true); |
| 529 | + assertTrue(deleted); |
| 530 | + assertEquals(4, fs.listStatus(tablePath).length); |
| 531 | + |
| 532 | + // 3 tasks are submitted at the same time, only one will eventually lock the table and only one |
| 533 | + // get to run at a time. This is to simulate, skipping partition discovery task attempt when |
| 534 | + // previous attempt is still incomplete |
| 535 | + PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask(); |
| 536 | + partitionDiscoveryTask1.setConf(conf); |
| 537 | + PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask(); |
| 538 | + partitionDiscoveryTask2.setConf(conf); |
| 539 | + PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask(); |
| 540 | + partitionDiscoveryTask3.setConf(conf); |
| 541 | + List<PartitionManagementTask> tasks = Lists |
| 542 | + .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3); |
| 543 | + ExecutorService executorService = Executors.newFixedThreadPool(3); |
| 544 | + List<Future<?>> futures = new ArrayList<>(); |
| 545 | + for (PartitionManagementTask task : tasks) { |
| 546 | + futures.add(executorService.submit(task)); |
| 547 | + } |
| 548 | + for (Future<?> future : futures) { |
| 549 | + future.get(); |
| 550 | + } |
| 551 | + long skips = countSkipMessages(skipAppender); |
| 552 | + long discoveries = countDiscoveryEntries(skipAppender); |
| 553 | + assertEquals(4, skips + discoveries); |
| 554 | + assertTrue("at least one more task should have entered the work path during the race", discoveries >= 2); |
| 555 | + } finally { |
| 556 | + rootLoggerConfig.removeAppender(appenderName); |
| 557 | + skipAppender.stop(); |
538 | 558 | } |
539 | | - int successAfter = PartitionManagementTask.getCompletedAttempts(); |
540 | | - int skippedAfter = PartitionManagementTask.getSkippedAttempts(); |
541 | | - assertEquals(1, successAfter - successBefore); |
542 | | - assertEquals(2, skippedAfter - skippedBefore); |
543 | 559 | partitions = client.listPartitions(dbName, tableName, (short) -1); |
544 | 560 | assertEquals(4, partitions.size()); |
545 | 561 | } |
546 | 562 |
|
| 563 | + private static long countSkipMessages(ListAppender appender) { |
| 564 | + return appender.getEvents().stream() |
| 565 | + .map(e -> e.getMessage().getFormattedMessage()) |
| 566 | + .filter(m -> m.equals("Lock is held by some other partition discovery task. Skipping this attempt.")) |
| 567 | + .count(); |
| 568 | + } |
| 569 | + |
| 570 | + private static long countDiscoveryEntries(ListAppender appender) { |
| 571 | + return appender.getEvents().stream() |
| 572 | + .map(e -> e.getMessage().getFormattedMessage()) |
| 573 | + .filter(m -> m.equals("Found 1 candidate tables for partition discovery")) |
| 574 | + .count(); |
| 575 | + } |
| 576 | + |
547 | 577 | @Test |
548 | 578 | public void testPartitionRetention() throws TException, IOException, InterruptedException { |
549 | 579 | String dbName = "db7"; |
|
0 commit comments