Skip to content

Commit 9a47f8c

Browse files
Added a leader readiness check
1 parent 19f683f commit 9a47f8c

1 file changed

Lines changed: 15 additions & 3 deletions

File tree

tests/src/test/java/com/datastax/oss/pulsar/functions/transforms/tests/AbstractDockerTest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,22 @@ private void deployFunction(
595595
new Gson().fromJson(userConfig, new TypeToken<Map<String, Object>>() {}.getType()))
596596
.build();
597597

598-
admin.functions().createFunction(functionConfig, null);
598+
for (int i = 0; i < 600; i++) {
599+
try {
600+
admin.functions().createFunction(functionConfig, null);
601+
break;
602+
} catch (PulsarAdminException e) {
603+
if (e.getMessage() != null && e.getMessage().contains("Leader not yet ready")) {
604+
log.info("Function worker leader not ready yet, retrying... ({}/600)", i + 1);
605+
Thread.sleep(500);
606+
} else {
607+
throw e;
608+
}
609+
}
610+
}
599611

600612
FunctionStatus functionStatus = null;
601-
for (int i = 0; i < 300; i++) {
613+
for (int i = 0; i < 600; i++) {
602614
functionStatus = admin.functions().getFunctionStatus("public", "default", functionName);
603615
if (functionStatus.getNumRunning() == 1) {
604616
break;
@@ -613,7 +625,7 @@ private void deployFunction(
613625
log.error("Function errored out " + f);
614626
}
615627
});
616-
Thread.sleep(100);
628+
Thread.sleep(500);
617629
}
618630

619631
if (functionStatus.getNumRunning() != 1) {

0 commit comments

Comments
 (0)