Skip to content

Commit 3d6abab

Browse files
utafraliluoyuxia
andauthored
[lake] Handle coordinator disconnect in TieringSourceEnumerator (#3107)
--------- Co-authored-by: luoyuxia <luoyuxia@alumni.sjtu.edu.cn>
1 parent 32f66f3 commit 3d6abab

2 files changed

Lines changed: 21 additions & 2 deletions

File tree

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.fluss.rpc.messages.PbLakeTieringStats;
4343
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
4444
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
45+
import org.apache.fluss.utils.ExceptionUtils;
4546

4647
import org.apache.flink.api.connector.source.ReaderInfo;
4748
import org.apache.flink.api.connector.source.SourceEvent;
@@ -352,10 +353,11 @@ protected void handleTableTieringReachMaxDuration(
352353
}
353354
}
354355

355-
private void generateAndAssignSplits(
356+
@VisibleForTesting
357+
void generateAndAssignSplits(
356358
@Nullable Tuple3<Long, Long, TablePath> tieringTable, Throwable throwable) {
357359
if (throwable != null) {
358-
LOG.warn("Failed to request tiering table, will retry later.", throwable);
360+
ExceptionUtils.rethrow(throwable);
359361
}
360362
if (tieringTable != null) {
361363
generateTieringSplits(tieringTable);

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.exception.NetworkException;
2223
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
2324
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
2425
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
@@ -37,6 +38,7 @@
3738
import org.apache.flink.api.connector.source.SourceEvent;
3839
import org.apache.flink.api.connector.source.SplitsAssignment;
3940
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
41+
import org.apache.flink.util.FlinkRuntimeException;
4042
import org.junit.jupiter.api.BeforeAll;
4143
import org.junit.jupiter.api.BeforeEach;
4244
import org.junit.jupiter.api.Test;
@@ -55,6 +57,7 @@
5557
import static org.apache.fluss.config.ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE;
5658
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
5759
import static org.assertj.core.api.Assertions.assertThat;
60+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5861

5962
/** Unit tests for {@link TieringSourceEnumerator} and {@link TieringSplitGenerator}. */
6063
class TieringSourceEnumeratorTest extends TieringTestBase {
@@ -730,6 +733,20 @@ private TieringSourceEnumerator createTieringSourceEnumerator(
730733
return new TieringSourceEnumerator(flussConf, context, 500);
731734
}
732735

736+
@Test
737+
void testNetworkErrorInHeartbeatTriggersFailover() throws Exception {
738+
try (FlussMockSplitEnumeratorContext<TieringSplit> context =
739+
new FlussMockSplitEnumeratorContext<>(1)) {
740+
TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context);
741+
FlinkRuntimeException networkError =
742+
new FlinkRuntimeException(
743+
"Failed to wait heartbeat response due to ",
744+
new NetworkException("coordinator disconnected"));
745+
assertThatThrownBy(() -> enumerator.generateAndAssignSplits(null, networkError))
746+
.isSameAs(networkError);
747+
}
748+
}
749+
733750
@Test
734751
void testTableReachMaxTieringDuration() throws Throwable {
735752
TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-max-duration-test-log-table");

0 commit comments

Comments
 (0)