Skip to content

Commit 82c3d83

Browse files
[python] Add conflict detection in shard update (#7630)
### Purpose This PR is a follow-up to #7323. PR #7323 introduced conflict detection for Python data evolution updates, but the shard-update path was not covered. As a result, when shard update and compact run concurrently, the shard update may commit successfully against a stale scan snapshot instead of failing fast. The problem only shows up later during read, with the error: `All files in a field merge split should have the same row count`.This PR extends the same conflict-detection coverage to the shard-update path. ### Tests run_compact_conflict_test in run_mixed_tests.sh
1 parent 22baf60 commit 82c3d83

13 files changed

Lines changed: 254 additions & 27 deletions

File tree

paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.paimon;
2020

21+
import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
22+
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
2123
import org.apache.paimon.catalog.Catalog;
2224
import org.apache.paimon.catalog.CatalogContext;
2325
import org.apache.paimon.catalog.CatalogFactory;
@@ -48,9 +50,11 @@
4850
import org.apache.paimon.table.sink.BatchTableCommit;
4951
import org.apache.paimon.table.sink.BatchTableWrite;
5052
import org.apache.paimon.table.sink.BatchWriteBuilder;
53+
import org.apache.paimon.table.sink.CommitMessage;
5154
import org.apache.paimon.table.sink.InnerTableCommit;
5255
import org.apache.paimon.table.sink.StreamTableCommit;
5356
import org.apache.paimon.table.sink.StreamTableWrite;
57+
import org.apache.paimon.table.source.EndOfScanException;
5458
import org.apache.paimon.table.source.ReadBuilder;
5559
import org.apache.paimon.table.source.Split;
5660
import org.apache.paimon.table.source.TableRead;
@@ -937,6 +941,71 @@ protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... values) {
937941
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
938942
}
939943

944+
/** Step 1: Write 5 base files for compact conflict test. */
945+
@Test
946+
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
947+
public void testCompactConflictWriteBase() throws Exception {
948+
Identifier id = identifier("compact_conflict_test");
949+
try {
950+
catalog.dropTable(id, true);
951+
} catch (Exception ignore) {
952+
}
953+
Schema schema =
954+
Schema.newBuilder()
955+
.column("f0", DataTypes.INT())
956+
.column("f1", DataTypes.STRING())
957+
.column("f2", DataTypes.STRING())
958+
.option(ROW_TRACKING_ENABLED.key(), "true")
959+
.option(DATA_EVOLUTION_ENABLED.key(), "true")
960+
.option(BUCKET.key(), "-1")
961+
.build();
962+
catalog.createTable(id, schema, false);
963+
964+
RowType fullType = schema.rowType().project(Arrays.asList("f0", "f1"));
965+
966+
for (int fileIdx = 0; fileIdx < 5; fileIdx++) {
967+
int startId = fileIdx * 200;
968+
FileStoreTable t = (FileStoreTable) catalog.getTable(id);
969+
BatchWriteBuilder builder = t.newBatchWriteBuilder();
970+
try (BatchTableWrite w = builder.newWrite().withWriteType(fullType)) {
971+
for (int i = 0; i < 200; i++) {
972+
w.write(
973+
GenericRow.of(
974+
startId + i, BinaryString.fromString("n" + (startId + i))));
975+
}
976+
builder.newCommit().commit(w.prepareCommit());
977+
}
978+
}
979+
LOG.info("compact_conflict_test: 5 base files written (200 rows each, total 1000)");
980+
}
981+
982+
/** Step 3: Run compact on compact_conflict_test table. */
983+
@Test
984+
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
985+
public void testCompactConflictRunCompact() throws Exception {
986+
Identifier id = identifier("compact_conflict_test");
987+
doDataEvolutionCompact((FileStoreTable) catalog.getTable(id));
988+
LOG.info("compact_conflict_test: compact done, 5 files merged into 1 (1000 rows)");
989+
}
990+
991+
private void doDataEvolutionCompact(FileStoreTable table) throws Exception {
992+
DataEvolutionCompactCoordinator coordinator =
993+
new DataEvolutionCompactCoordinator(table, false, false);
994+
List<CommitMessage> messages = new ArrayList<>();
995+
try {
996+
List<DataEvolutionCompactTask> tasks;
997+
while (!(tasks = coordinator.plan()).isEmpty()) {
998+
for (DataEvolutionCompactTask task : tasks) {
999+
messages.add(task.doCompact(table, "test-compact"));
1000+
}
1001+
}
1002+
} catch (EndOfScanException ignore) {
1003+
}
1004+
if (!messages.isEmpty()) {
1005+
table.newBatchWriteBuilder().newCommit().commit(messages);
1006+
}
1007+
}
1008+
9401009
private static String rowToStringWithStruct(InternalRow row, RowType type) {
9411010
StringBuilder build = new StringBuilder();
9421011
build.append(row.getRowKind().shortString()).append("[");

paimon-python/dev/run_mixed_tests.sh

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,32 @@ run_lumina_vector_test() {
290290
fi
291291
}
292292

293+
run_compact_conflict_test() {
294+
echo -e "${YELLOW}=== Running Compact Conflict Test (Java Write Base, Python Shard Update + Java Compact) ===${NC}"
295+
296+
cd "$PROJECT_ROOT"
297+
298+
# Step 1: Java writes 5 base files
299+
echo "Running Maven test for JavaPyE2ETest.testCompactConflictWriteBase..."
300+
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictWriteBase -pl paimon-core -q -Drun.e2e.tests=true; then
301+
echo -e "${GREEN}✓ Java write base files completed successfully${NC}"
302+
else
303+
echo -e "${RED}✗ Java write base files failed${NC}"
304+
return 1
305+
fi
306+
307+
# Step 2-4: Python shard update (scan -> Java compact -> commit conflict detected)
308+
cd "$PAIMON_PYTHON_DIR"
309+
echo "Running Python test for JavaPyReadWriteTest.test_compact_conflict_shard_update..."
310+
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_compact_conflict_shard_update -v; then
311+
echo -e "${GREEN}✓ Python compact conflict test completed successfully${NC}"
312+
return 0
313+
else
314+
echo -e "${RED}✗ Python compact conflict test failed${NC}"
315+
return 1
316+
fi
317+
}
318+
293319
run_blob_alter_compact_test() {
294320
echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read) ===${NC}"
295321

@@ -324,6 +350,7 @@ main() {
324350
local compressed_text_result=0
325351
local tantivy_fulltext_result=0
326352
local lumina_vector_result=0
353+
local compact_conflict_result=0
327354
local blob_alter_compact_result=0
328355

329356
# Detect Python version
@@ -407,6 +434,13 @@ main() {
407434

408435
echo ""
409436

437+
# Run compact conflict test (Java write+compact, Python read)
438+
if ! run_compact_conflict_test; then
439+
compact_conflict_result=1
440+
fi
441+
442+
echo ""
443+
410444
# Run blob alter+compact test (Java write+alter+compact, Python read)
411445
if ! run_blob_alter_compact_test; then
412446
blob_alter_compact_result=1
@@ -470,6 +504,12 @@ main() {
470504
echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): FAILED${NC}"
471505
fi
472506

507+
if [[ $compact_conflict_result -eq 0 ]]; then
508+
echo -e "${GREEN}✓ Compact Conflict Test (Java Write+Compact, Python Read): PASSED${NC}"
509+
else
510+
echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python Read): FAILED${NC}"
511+
fi
512+
473513
if [[ $blob_alter_compact_result -eq 0 ]]; then
474514
echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): PASSED${NC}"
475515
else
@@ -481,7 +521,7 @@ main() {
481521
# Clean up warehouse directory after all tests
482522
cleanup_warehouse
483523

484-
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
524+
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
485525
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
486526
return 0
487527
else

paimon-python/pypaimon/read/plan.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
# limitations under the License.
1717
################################################################################
1818

19-
from dataclasses import dataclass
20-
from typing import List
19+
from dataclasses import dataclass, field
20+
from typing import List, Optional
2121

2222

2323
from pypaimon.read.split import Split
@@ -27,6 +27,7 @@
2727
class Plan:
2828
"""Implementation of Plan for native Python reading."""
2929
_splits: List[Split]
30+
snapshot_id: Optional[int] = field(default=None)
3031

3132
def splits(self) -> List[Split]:
3233
return self._splits

paimon-python/pypaimon/read/scanner/file_scanner.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import logging
1919
import os
2020
import time
21-
from typing import Callable, Dict, List, Optional, Set
21+
from typing import Callable, Dict, List, Optional, Set, Tuple
2222

2323
logger = logging.getLogger(__name__)
2424

@@ -40,6 +40,7 @@
4040
from pypaimon.read.scanner.primary_key_table_split_generator import \
4141
PrimaryKeyTableSplitGenerator
4242
from pypaimon.read.split import DataSplit
43+
from pypaimon.snapshot.snapshot import Snapshot
4344
from pypaimon.snapshot.snapshot_manager import SnapshotManager
4445
from pypaimon.table.bucket_mode import BucketMode
4546
from pypaimon.table.source.deletion_file import DeletionFile
@@ -165,7 +166,7 @@ class FileScanner:
165166
def __init__(
166167
self,
167168
table,
168-
manifest_scanner: Callable[[], List[ManifestFileMeta]],
169+
manifest_scanner: Callable[[], Tuple[List[ManifestFileMeta], Optional[Snapshot]]],
169170
predicate: Optional[Predicate] = None,
170171
limit: Optional[int] = None
171172
):
@@ -200,6 +201,8 @@ def __init__(
200201
self.data_evolution = options.data_evolution_enabled()
201202
self.deletion_vectors_enabled = options.deletion_vectors_enabled()
202203
self._global_index_result = None
204+
self._scanned_snapshot = None
205+
self._scanned_snapshot_id = None
203206

204207
def schema_fields_func(schema_id: int):
205208
return self.table.schema_manager.get_schema(schema_id).fields
@@ -216,7 +219,8 @@ def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple, Dict[
216219
bucket_files = set()
217220
for e in entries:
218221
bucket_files.add((tuple(e.partition.values), e.bucket))
219-
return self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
222+
snapshot = self._scanned_snapshot if self._scanned_snapshot else self.snapshot_manager.get_latest_snapshot()
223+
return self._scan_dv_index(snapshot, bucket_files)
220224

221225
def scan(self) -> Plan:
222226
start_ms = time.time() * 1000
@@ -241,7 +245,7 @@ def scan(self) -> Plan:
241245
)
242246

243247
if not entries:
244-
return Plan([])
248+
return Plan([], snapshot_id=self._scanned_snapshot_id)
245249

246250
# Configure sharding if needed
247251
if self.idx_of_this_subtask is not None:
@@ -258,7 +262,7 @@ def scan(self) -> Plan:
258262
"File store scan plan completed in %d ms. Files size: %d",
259263
duration_ms, len(entries)
260264
)
261-
return Plan(splits)
265+
return Plan(splits, snapshot_id=self._scanned_snapshot_id)
262266

263267
def _create_data_evolution_split_generator(self):
264268
row_ranges = None
@@ -272,7 +276,9 @@ def _create_data_evolution_split_generator(self):
272276
if row_ranges is None and self.predicate is not None:
273277
row_ranges = _row_ranges_from_predicate(self.predicate)
274278

275-
manifest_files = self.manifest_scanner()
279+
manifest_files, snapshot = self.manifest_scanner()
280+
self._scanned_snapshot = snapshot
281+
self._scanned_snapshot_id = snapshot.id if snapshot else None
276282

277283
# Filter manifest files by row ranges if available
278284
if row_ranges is not None:
@@ -293,7 +299,9 @@ def _create_data_evolution_split_generator(self):
293299
)
294300

295301
def plan_files(self) -> List[ManifestEntry]:
296-
manifest_files = self.manifest_scanner()
302+
manifest_files, snapshot = self.manifest_scanner()
303+
self._scanned_snapshot = snapshot
304+
self._scanned_snapshot_id = snapshot.id if snapshot else None
297305
if len(manifest_files) == 0:
298306
return []
299307
return self.read_manifest_entries(manifest_files)

paimon-python/pypaimon/read/streaming_table_scan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ def _filter_entries_for_shard(self, entries: List) -> List:
323323
def _create_initial_plan(self, snapshot: Snapshot) -> Plan:
324324
"""Create a Plan for the initial full scan of the latest snapshot."""
325325
def all_manifests():
326-
return self._manifest_list_manager.read_all(snapshot)
326+
return self._manifest_list_manager.read_all(snapshot), snapshot
327327

328328
starting_scanner = FileScanner(
329329
self.table,

paimon-python/pypaimon/read/table_scan.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ def _create_file_scanner(self) -> FileScanner:
5959
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
6060
latest_snapshot = snapshot_manager.get_latest_snapshot()
6161
if earliest_snapshot is None or latest_snapshot is None:
62-
return FileScanner(self.table, lambda: [])
62+
return FileScanner(self.table, lambda: ([], None))
6363
start_timestamp = int(ts[0])
6464
end_timestamp = int(ts[1])
6565
if start_timestamp >= end_timestamp:
6666
raise ValueError(
6767
"Ending timestamp %s should be >= starting timestamp %s." % (end_timestamp, start_timestamp))
6868
if (start_timestamp == end_timestamp or start_timestamp > latest_snapshot.time_millis
6969
or end_timestamp < earliest_snapshot.time_millis):
70-
return FileScanner(self.table, lambda: [])
70+
return FileScanner(self.table, lambda: ([], None))
7171

7272
starting_snapshot = snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
7373
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
@@ -84,8 +84,10 @@ def _create_file_scanner(self) -> FileScanner:
8484

8585
def incremental_manifest():
8686
snapshots_in_range = []
87+
end_snapshot = snapshot_manager.get_snapshot_by_id(end_id) if end_id >= 1 else None
8788
for snapshot_id in range(start_id + 1, end_id + 1):
8889
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
90+
end_snapshot = snapshot
8991
if snapshot.commit_kind == "APPEND":
9092
snapshots_in_range.append(snapshot)
9193

@@ -94,7 +96,7 @@ def incremental_manifest():
9496
for snapshot in snapshots_in_range:
9597
manifest_files = manifest_list_manager.read_delta(snapshot)
9698
manifests.extend(manifest_files)
97-
return manifests
99+
return manifests, end_snapshot
98100

99101
return FileScanner(self.table, incremental_manifest, self.predicate, self.limit)
100102
elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based reading
@@ -104,7 +106,7 @@ def tag_manifest_scanner():
104106
tag_manager = self.table.tag_manager()
105107
tag = tag_manager.get_or_throw(tag_name)
106108
snapshot = tag.trim_to_snapshot()
107-
return manifest_list_manager.read_all(snapshot)
109+
return manifest_list_manager.read_all(snapshot), snapshot
108110

109111
return FileScanner(
110112
self.table,
@@ -115,7 +117,7 @@ def tag_manifest_scanner():
115117

116118
def all_manifests():
117119
snapshot = snapshot_manager.get_latest_snapshot()
118-
return manifest_list_manager.read_all(snapshot)
120+
return manifest_list_manager.read_all(snapshot), snapshot
119121

120122
return FileScanner(
121123
self.table,

paimon-python/pypaimon/tests/binary_row_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def test_is_null_append(self):
117117

118118
def test_is_not_null_append(self):
119119
table = self.catalog.get_table('default.test_append')
120-
file_scanner = FileScanner(table, lambda: [])
120+
file_scanner = FileScanner(table, lambda: ([], None))
121121
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
122122
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
123123
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -254,7 +254,7 @@ def test_append_multi_cols(self):
254254
table_write.close()
255255
table_commit.close()
256256

257-
file_scanner = FileScanner(table, lambda: [])
257+
file_scanner = FileScanner(table, lambda: ([], None))
258258
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
259259
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
260260
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -293,7 +293,7 @@ def test_append_multi_cols(self):
293293
}
294294
self.assertEqual(expected_data, actual.to_pydict())
295295

296-
file_scanner = FileScanner(table, lambda: [])
296+
file_scanner = FileScanner(table, lambda: ([], None))
297297
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
298298
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
299299
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -324,7 +324,7 @@ def _transform_manifest_entries(self, manifest_entries: List[ManifestEntry], tri
324324
trimmed_pk_fields)
325325

326326
def _overwrite_manifest_entry(self, table):
327-
file_scanner = FileScanner(table, lambda: [])
327+
file_scanner = FileScanner(table, lambda: ([], None))
328328
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
329329
manifest_files = file_scanner.manifest_list_manager.read_all(latest_snapshot)
330330
manifest_entries = file_scanner.manifest_file_manager.read(manifest_files[0].file_name)

0 commit comments

Comments
 (0)