Skip to content

Commit 0747b60

Browse files
authored
Kafka Connect: Terminate commits on coordinator stop (#10814)
1 parent 5439cbd commit 0747b60

3 files changed

Lines changed: 22 additions & 16 deletions

File tree

kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.iceberg.util.Tasks;
5454
import org.apache.iceberg.util.ThreadPools;
5555
import org.apache.kafka.clients.admin.MemberDescription;
56+
import org.apache.kafka.connect.errors.ConnectException;
5657
import org.apache.kafka.connect.sink.SinkTaskContext;
5758
import org.slf4j.Logger;
5859
import org.slf4j.LoggerFactory;
@@ -71,6 +72,7 @@ class Coordinator extends Channel {
7172
private final String snapshotOffsetsProp;
7273
private final ExecutorService exec;
7374
private final CommitState commitState;
75+
private volatile boolean terminated;
7476

7577
Coordinator(
7678
Catalog catalog,
@@ -218,6 +220,10 @@ private void commitToTable(
218220
.filter(distinctByKey(deleteFile -> deleteFile.path().toString()))
219221
.collect(Collectors.toList());
220222

223+
if (terminated) {
224+
throw new ConnectException("Coordinator is terminated, commit aborted");
225+
}
226+
221227
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
222228
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
223229
} else {
@@ -296,19 +302,18 @@ private Map<Integer, Long> lastCommittedOffsetsForTable(Table table, String bran
296302
return ImmutableMap.of();
297303
}
298304

299-
@Override
300-
void stop() {
305+
void terminate() {
306+
this.terminated = true;
307+
301308
exec.shutdownNow();
302309

303-
// ensure coordinator tasks are shut down, else cause the sink worker to fail
310+
// wait for coordinator termination, else cause the sink task to fail
304311
try {
305312
if (!exec.awaitTermination(1, TimeUnit.MINUTES)) {
306-
throw new RuntimeException("Timed out waiting for coordinator shutdown");
313+
throw new ConnectException("Timed out waiting for coordinator shutdown");
307314
}
308315
} catch (InterruptedException e) {
309-
throw new RuntimeException("Interrupted while waiting for coordinator shutdown", e);
316+
throw new ConnectException("Interrupted while waiting for coordinator shutdown", e);
310317
}
311-
312-
super.stop();
313318
}
314319
}

kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CoordinatorThread.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class CoordinatorThread extends Thread {
2525
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorThread.class);
2626
private static final String THREAD_NAME = "iceberg-coord";
2727

28-
private Coordinator coordinator;
28+
private final Coordinator coordinator;
2929
private volatile boolean terminated;
3030

3131
CoordinatorThread(Coordinator coordinator) {
@@ -39,15 +39,15 @@ public void run() {
3939
coordinator.start();
4040
} catch (Exception e) {
4141
LOG.error("Coordinator error during start, exiting thread", e);
42-
terminated = true;
42+
this.terminated = true;
4343
}
4444

4545
while (!terminated) {
4646
try {
4747
coordinator.process();
4848
} catch (Exception e) {
4949
LOG.error("Coordinator error during process, exiting thread", e);
50-
terminated = true;
50+
this.terminated = true;
5151
}
5252
}
5353

@@ -56,14 +56,14 @@ public void run() {
5656
} catch (Exception e) {
5757
LOG.error("Coordinator error during stop, ignoring", e);
5858
}
59-
coordinator = null;
6059
}
6160

6261
boolean isTerminated() {
6362
return terminated;
6463
}
6564

6665
void terminate() {
67-
terminated = true;
66+
this.terminated = true;
67+
coordinator.terminate();
6868
}
6969
}

kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.iceberg.types.Types.TimestampType;
6262
import org.apache.iceberg.util.DateTimeUtil;
6363
import org.apache.kafka.connect.data.Struct;
64+
import org.apache.kafka.connect.errors.ConnectException;
6465

6566
class RecordConverter {
6667

@@ -421,7 +422,7 @@ protected LocalDate convertDateValue(Object value) {
421422
int days = (int) (((Date) value).getTime() / 1000 / 60 / 60 / 24);
422423
return DateTimeUtil.dateFromDays(days);
423424
}
424-
throw new RuntimeException("Cannot convert date: " + value);
425+
throw new ConnectException("Cannot convert date: " + value);
425426
}
426427

427428
@SuppressWarnings("JavaUtilDate")
@@ -437,7 +438,7 @@ protected LocalTime convertTimeValue(Object value) {
437438
long millis = ((Date) value).getTime();
438439
return DateTimeUtil.timeFromMicros(millis * 1000);
439440
}
440-
throw new RuntimeException("Cannot convert time: " + value);
441+
throw new ConnectException("Cannot convert time: " + value);
441442
}
442443

443444
protected Temporal convertTimestampValue(Object value, TimestampType type) {
@@ -461,7 +462,7 @@ private OffsetDateTime convertOffsetDateTime(Object value) {
461462
} else if (value instanceof Date) {
462463
return DateTimeUtil.timestamptzFromMicros(((Date) value).getTime() * 1000);
463464
}
464-
throw new RuntimeException(
465+
throw new ConnectException(
465466
"Cannot convert timestamptz: " + value + ", type: " + value.getClass());
466467
}
467468

@@ -489,7 +490,7 @@ private LocalDateTime convertLocalDateTime(Object value) {
489490
} else if (value instanceof Date) {
490491
return DateTimeUtil.timestampFromMicros(((Date) value).getTime() * 1000);
491492
}
492-
throw new RuntimeException(
493+
throw new ConnectException(
493494
"Cannot convert timestamp: " + value + ", type: " + value.getClass());
494495
}
495496

0 commit comments

Comments
 (0)