Skip to content

Commit 64999e8

Browse files
authored
Fix initial load completion tracking race condition (#6711)
Move the completion GlobalState creation before the task partition creation loop in performInitialLoad(). Previously, the completion key was created after all partitions, allowing workers to finish and call incrementSnapshotCompletionCount() before the key existed. Those increments were silently lost, causing waitForSnapshotComplete() to never reach the expected total. Verify that GlobalState (completion tracking) is created before InitialLoadTaskPartition, ensuring workers can report completion as soon as they acquire a partition. Fix LeaderSchedulerTest to pass shuffle parameters to updated constructor Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent fda3957 commit 64999e8

2 files changed

Lines changed: 142 additions & 3 deletions

File tree

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ private void performInitialLoad() {
157157
final long snapshotId = table.currentSnapshot().snapshotId();
158158
LOG.info("Starting initial load for table {} at snapshot {}", tableName, snapshotId);
159159

160+
final String completionKey = SNAPSHOT_COMPLETION_PREFIX + "initial-" + snapshotId;
161+
sourceCoordinator.createPartition(new GlobalState(completionKey,
162+
Map.of("total", 0, "completed", 0)));
163+
160164
final TableScan scan = table.newScan().useSnapshot(snapshotId);
161165
int taskCount = 0;
162166

@@ -180,9 +184,6 @@ private void performInitialLoad() {
180184
taskCount, tableName, snapshotId);
181185

182186
// Wait for all initial load partitions to complete
183-
final String completionKey = SNAPSHOT_COMPLETION_PREFIX + "initial-" + snapshotId;
184-
sourceCoordinator.createPartition(new GlobalState(completionKey,
185-
Map.of("total", taskCount, "completed", 0)));
186187
waitForSnapshotComplete(completionKey, taskCount);
187188

188189
// Set lastProcessedSnapshotId so CDC starts from this snapshot
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.iceberg.leader;
12+
13+
import org.apache.iceberg.DataFile;
14+
import org.apache.iceberg.FileScanTask;
15+
import org.apache.iceberg.Snapshot;
16+
import org.apache.iceberg.Table;
17+
import org.apache.iceberg.TableScan;
18+
import org.apache.iceberg.io.CloseableIterable;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.extension.ExtendWith;
22+
import org.mockito.InOrder;
23+
import org.mockito.Mock;
24+
import org.mockito.junit.jupiter.MockitoExtension;
25+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
26+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
27+
import org.opensearch.dataprepper.plugins.source.iceberg.TableConfig;
28+
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState;
29+
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition;
30+
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition;
31+
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig;
32+
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleStorage;
33+
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
34+
35+
import java.time.Duration;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Optional;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.TimeUnit;
41+
import static org.hamcrest.MatcherAssert.assertThat;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.mockito.ArgumentMatchers.any;
44+
import static org.mockito.ArgumentMatchers.argThat;
45+
import static org.mockito.Mockito.doAnswer;
46+
import static org.mockito.Mockito.inOrder;
47+
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.when;
49+
50+
@ExtendWith(MockitoExtension.class)
51+
class LeaderSchedulerTest {
52+
53+
@Mock
54+
private EnhancedSourceCoordinator sourceCoordinator;
55+
@Mock
56+
private Table table;
57+
@Mock
58+
private TableConfig tableConfig;
59+
@Mock
60+
private ShuffleStorage shuffleStorage;
61+
@Mock
62+
private ShuffleConfig shuffleConfig;
63+
@Mock
64+
private Certificate certificate;
65+
66+
private LeaderScheduler leaderScheduler;
67+
68+
private static final String TABLE_NAME = "db.test_table";
69+
private static final long SNAPSHOT_ID = 123L;
70+
71+
@BeforeEach
72+
void setUp() {
73+
leaderScheduler = new LeaderScheduler(
74+
sourceCoordinator,
75+
Map.of(TABLE_NAME, tableConfig),
76+
Duration.ofSeconds(1),
77+
Map.of(TABLE_NAME, table),
78+
shuffleStorage, shuffleConfig, certificate);
79+
}
80+
81+
@Test
82+
void completionTrackingState_isCreated_beforeTaskPartitions_soWorkersCanReportCompletion() throws Exception {
83+
// Arrange: LeaderPartition with initialized=false
84+
final LeaderPartition leaderPartition = new LeaderPartition();
85+
when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
86+
.thenReturn(Optional.of(leaderPartition));
87+
88+
// Arrange: Table with one snapshot
89+
when(tableConfig.isDisableExport()).thenReturn(false);
90+
final Snapshot snapshot = mock(Snapshot.class);
91+
when(snapshot.snapshotId()).thenReturn(SNAPSHOT_ID);
92+
when(table.currentSnapshot()).thenReturn(snapshot);
93+
94+
// Arrange: TableScan returns one FileScanTask
95+
final TableScan tableScan = mock(TableScan.class);
96+
when(table.newScan()).thenReturn(tableScan);
97+
when(tableScan.useSnapshot(SNAPSHOT_ID)).thenReturn(tableScan);
98+
99+
final FileScanTask fileScanTask = mock(FileScanTask.class);
100+
final DataFile dataFile = mock(DataFile.class);
101+
when(dataFile.location()).thenReturn("s3://bucket/data/file.parquet");
102+
when(dataFile.recordCount()).thenReturn(100L);
103+
when(fileScanTask.file()).thenReturn(dataFile);
104+
105+
final CloseableIterable<FileScanTask> scanTasks = CloseableIterable.withNoopClose(List.of(fileScanTask));
106+
when(tableScan.planFiles()).thenReturn(scanTasks);
107+
108+
// Arrange: waitForSnapshotComplete returns immediately (completed >= total)
109+
final String completionKey = "snapshot-completion-initial-" + SNAPSHOT_ID;
110+
final GlobalState completedState = new GlobalState(completionKey,
111+
Map.of("total", 1, "completed", 1));
112+
when(sourceCoordinator.getPartition(completionKey))
113+
.thenReturn(Optional.of(completedState));
114+
115+
// Use a latch to know when performInitialLoad has finished
116+
final CountDownLatch initialLoadDone = new CountDownLatch(1);
117+
doAnswer(invocation -> {
118+
// saveProgressStateForPartition is called after waitForSnapshotComplete returns
119+
initialLoadDone.countDown();
120+
return null;
121+
}).when(sourceCoordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class));
122+
123+
// Act: run in a thread
124+
final Thread thread = new Thread(leaderScheduler);
125+
thread.start();
126+
assertThat("performInitialLoad should complete within timeout",
127+
initialLoadDone.await(5, TimeUnit.SECONDS), is(true));
128+
thread.interrupt();
129+
thread.join(2000);
130+
131+
// Assert: GlobalState (completion tracking) is created BEFORE InitialLoadTaskPartition
132+
final InOrder inOrder = inOrder(sourceCoordinator);
133+
inOrder.verify(sourceCoordinator).createPartition(argThat(
134+
(EnhancedSourcePartition<?> p) -> p instanceof GlobalState));
135+
inOrder.verify(sourceCoordinator).createPartition(argThat(
136+
(EnhancedSourcePartition<?> p) -> p instanceof InitialLoadTaskPartition));
137+
}
138+
}

0 commit comments

Comments
 (0)