Skip to content

Commit aa983c5

Browse files
authored
Merge pull request #3 from bluedynamics/feat/incremental-parallel-import
Pass start_tid through parallel delegation
2 parents 7a69895 + 952590b commit aa983c5

3 files changed

Lines changed: 56 additions & 20 deletions

File tree

CHANGES.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## 1.0.0b9
4+
5+
- Support `--incremental -w N` together. Previously, `--incremental`
6+
forced a fallback to sequential copy even when `-w` was specified.
7+
Now passes `start_tid` through to the destination's parallel
8+
`copyTransactionsFrom()` for resumable parallel imports.
9+
310
## 1.0.0b8
411

512
- Remove source transaction counting (saves a full iteration over 200k+

src/zodb_convert/copier.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def get_incremental_start_tid(source, destination):
8181
return p64(last_valid_int + 1)
8282

8383

84-
def _try_parallel_delegation(source, destination, workers):
84+
def _try_parallel_delegation(source, destination, workers, start_tid=None):
8585
"""Try delegating to the destination's parallel copyTransactionsFrom.
8686
8787
Returns (txn_count, obj_count, blob_count) on success, or None if the
@@ -93,19 +93,25 @@ def _try_parallel_delegation(source, destination, workers):
9393
log.info("Destination has no copyTransactionsFrom, using generic copier.")
9494
return None
9595

96+
kwargs = {"workers": workers}
97+
if start_tid is not None:
98+
kwargs["start_tid"] = start_tid
99+
96100
try:
97-
log.info("Delegating to destination.copyTransactionsFrom(workers=%d).", workers)
98-
copy_method(source, workers=workers)
101+
log.info(
102+
"Delegating to destination.copyTransactionsFrom(%s).",
103+
", ".join(f"{k}={v!r}" for k, v in kwargs.items()),
104+
)
105+
copy_method(source, **kwargs)
99106
except TypeError:
100-
# Destination's copyTransactionsFrom doesn't accept 'workers'.
107+
# Destination's copyTransactionsFrom doesn't accept these kwargs.
101108
log.info(
102-
"Destination doesn't support parallel copy (no workers parameter), "
109+
"Destination doesn't support parallel copy, "
103110
"falling back to generic sequential copier."
104111
)
105112
return None
106113

107114
# Destination handled everything including its own progress logging.
108-
# Exact per-object counts are unavailable; return None as sentinel.
109115
return None, None, None
110116

111117

@@ -125,8 +131,8 @@ def copy_transactions(
125131
126132
Returns (txn_count, obj_count, blob_count).
127133
"""
128-
if workers > 1 and not dry_run and start_tid is None:
129-
result = _try_parallel_delegation(source, destination, workers)
134+
if workers > 1 and not dry_run:
135+
result = _try_parallel_delegation(source, destination, workers, start_tid)
130136
if result is not None:
131137
return result
132138
# Fall through to generic sequential copier.

tests/test_copier.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,21 @@ def test_delegation_success(self):
326326
assert result == (None, None, None)
327327
dest.copyTransactionsFrom.assert_called_once_with(source, workers=4)
328328

329+
def test_delegation_with_start_tid(self):
330+
"""Delegation passes start_tid when set."""
331+
from ZODB.utils import p64
332+
333+
source = MagicMock()
334+
dest = MagicMock()
335+
dest.copyTransactionsFrom = MagicMock()
336+
337+
start = p64(100)
338+
result = _try_parallel_delegation(source, dest, workers=4, start_tid=start)
339+
assert result == (None, None, None)
340+
dest.copyTransactionsFrom.assert_called_once_with(
341+
source, workers=4, start_tid=start
342+
)
343+
329344
def test_delegation_no_method(self):
330345
"""Destination has no copyTransactionsFrom at all."""
331346
source = MagicMock()
@@ -343,38 +358,46 @@ def test_delegation_no_workers_support(self):
343358
result = _try_parallel_delegation(source, dest, workers=2)
344359
assert result is None
345360

361+
def test_delegation_start_tid_not_supported(self):
362+
"""Destination doesn't support start_tid -- falls back."""
363+
from ZODB.utils import p64
364+
365+
source = MagicMock()
366+
dest = MagicMock()
367+
dest.copyTransactionsFrom = MagicMock(side_effect=TypeError("unexpected kwarg"))
368+
369+
result = _try_parallel_delegation(source, dest, workers=2, start_tid=p64(100))
370+
assert result is None
371+
346372
def test_copy_transactions_delegates_when_workers_gt_1(
347373
self, populated_source, dest_filestorage
348374
):
349375
"""copy_transactions with workers>1 tries delegation, falls back on TypeError."""
350-
# FileStorage.copyTransactionsFrom doesn't accept workers → TypeError fallback
351376
txn_count, obj_count, _blob_count = copy_transactions(
352377
populated_source, dest_filestorage, workers=2
353378
)
354-
# Should have fallen back to sequential copy and succeeded
355379
assert txn_count == 4 # initial root + 3 explicit
356380
assert obj_count > 0
357381

358-
def test_copy_transactions_skips_delegation_on_dry_run(
382+
def test_copy_transactions_delegates_with_start_tid(
359383
self, populated_source, dest_filestorage
360384
):
361-
"""workers>1 is ignored when dry_run=True."""
385+
"""copy_transactions with workers>1 and start_tid delegates both."""
386+
from ZODB.utils import p64
387+
362388
txn_count, obj_count, _blob_count = copy_transactions(
363-
populated_source, dest_filestorage, dry_run=True, workers=4
389+
populated_source, dest_filestorage, start_tid=p64(0), workers=4
364390
)
365391
assert txn_count == 4
366392
assert obj_count > 0
367-
# Destination should still be empty (dry run)
368-
assert storage_has_data(dest_filestorage) is False
369393

370-
def test_copy_transactions_skips_delegation_on_incremental(
394+
def test_copy_transactions_skips_delegation_on_dry_run(
371395
self, populated_source, dest_filestorage
372396
):
373-
"""workers>1 is ignored when start_tid is set."""
374-
from ZODB.utils import p64
375-
397+
"""workers>1 is ignored when dry_run=True."""
376398
txn_count, obj_count, _blob_count = copy_transactions(
377-
populated_source, dest_filestorage, start_tid=p64(0), workers=4
399+
populated_source, dest_filestorage, dry_run=True, workers=4
378400
)
379401
assert txn_count == 4
380402
assert obj_count > 0
403+
assert storage_has_data(dest_filestorage) is False

0 commit comments

Comments
 (0)