Skip to content

Commit b4d31d8

Browse files
jensensclaude
andcommitted
feat: add --upload-blobs for processing deferred blob manifests
Reads a TSV manifest (from --deferred-blobs) and uploads blobs to S3 with parallel workers and retry logic. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b3a6eff commit b4d31d8

3 files changed

Lines changed: 242 additions & 1 deletion

File tree

src/zodb_convert/cli.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,13 @@ def parse_args(argv):
115115
args = parser.parse_args(argv)
116116

117117
# Require at least one source/destination specification
118-
if not args.config_file and not args.source_zope_conf and not args.dest_zope_conf:
118+
# (--upload-blobs only needs a destination, validated later)
119+
if (
120+
not args.config_file
121+
and not args.source_zope_conf
122+
and not args.dest_zope_conf
123+
and not args.upload_blobs
124+
):
119125
parser.error(
120126
"At least one of config_file, --source-zope-conf, or --dest-zope-conf is required."
121127
)
@@ -145,11 +151,80 @@ def _setup_logging(verbose):
145151
logging.getLogger("zodb-convert").setLevel(level)
146152

147153

154+
def _open_destination(args):
155+
"""Open only the destination storage from CLI args.
156+
157+
Returns (destination_storage, closables).
158+
"""
159+
from zodb_convert.config import open_storage_from_zope_conf
160+
from zodb_convert.config import open_storages_from_config
161+
162+
destination = None
163+
closables = []
164+
165+
if args.config_file:
166+
_cfg_source, cfg_dest = open_storages_from_config(args.config_file)
167+
if cfg_dest is not None:
168+
destination = cfg_dest
169+
# Close unused source if opened
170+
if _cfg_source is not None:
171+
closables.append(_cfg_source)
172+
173+
if args.dest_zope_conf:
174+
if destination is not None:
175+
raise ValueError(
176+
"Destination specified in both config file and --dest-zope-conf"
177+
)
178+
db = open_storage_from_zope_conf(args.dest_zope_conf, args.dest_db)
179+
destination = db.storage
180+
closables.append(db)
181+
182+
if destination is None:
183+
raise ValueError(
184+
"No destination storage configured. Use a config file or --dest-zope-conf."
185+
)
186+
187+
return destination, closables
188+
189+
148190
def main(argv=None):
149191
"""Main entry point for zodb-convert."""
150192
args = parse_args(argv if argv is not None else sys.argv[1:])
151193
_setup_logging(args.verbose)
152194

195+
if args.upload_blobs:
196+
from zodb_convert.manifest import upload_from_manifest
197+
198+
closables = []
199+
try:
200+
destination, closables = _open_destination(args)
201+
s3_client = getattr(destination, "_s3_client", None)
202+
if s3_client is None:
203+
log.error("Destination storage has no S3 client configured")
204+
return 1
205+
stats = upload_from_manifest(
206+
args.upload_blobs,
207+
s3_client,
208+
workers=args.workers or 8,
209+
)
210+
if stats["failed"]:
211+
log.error("%d blob upload(s) failed", stats["failed"])
212+
return 1
213+
return 0
214+
except KeyboardInterrupt:
215+
log.warning("Interrupted by user, aborting...")
216+
return 130
217+
except (ValueError, FileNotFoundError) as e:
218+
log.error("%s", e)
219+
sys.exit(1)
220+
except Exception as e:
221+
log.error("Upload failed: %s", e, exc_info=True)
222+
sys.exit(2)
223+
finally:
224+
for obj in closables:
225+
with contextlib.suppress(Exception):
226+
obj.close()
227+
153228
closables = []
154229
try:
155230
source, destination, closables = open_storages(args)

src/zodb_convert/manifest.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Upload deferred blobs from a manifest file."""
2+
3+
from concurrent.futures import as_completed
4+
from concurrent.futures import ThreadPoolExecutor
5+
6+
import contextlib
7+
import logging
8+
import os
9+
import time
10+
11+
12+
logger = logging.getLogger(__name__)
13+
14+
_DEFAULT_MAX_RETRIES = 3
15+
_DEFAULT_RETRY_BASE_DELAY = 2.0
16+
17+
18+
def upload_from_manifest(
19+
manifest_path,
20+
s3_client,
21+
workers=8,
22+
max_retries=_DEFAULT_MAX_RETRIES,
23+
retry_base_delay=_DEFAULT_RETRY_BASE_DELAY,
24+
cleanup=False,
25+
):
26+
"""Read manifest TSV and upload blobs to S3.
27+
28+
Returns dict with counts: uploaded, failed, skipped.
29+
"""
30+
entries = []
31+
with open(manifest_path) as f:
32+
for line in f:
33+
line = line.strip()
34+
if not line:
35+
continue
36+
parts = line.split("\t")
37+
if len(parts) != 4:
38+
logger.warning("Skipping malformed manifest line: %s", line)
39+
continue
40+
blob_path, s3_key, zoid_str, size_str = parts
41+
entries.append((blob_path, s3_key, int(zoid_str), int(size_str)))
42+
43+
logger.info("Manifest: %d blob(s) to upload", len(entries))
44+
45+
uploaded = 0
46+
failed = 0
47+
skipped = 0
48+
49+
def _upload_one(blob_path, s3_key, zoid, size):
50+
if not os.path.exists(blob_path):
51+
logger.warning(
52+
"Blob file missing, skipping: %s (oid=0x%016x)", blob_path, zoid
53+
)
54+
return "skipped"
55+
last_exc = None
56+
for attempt in range(max_retries):
57+
try:
58+
s3_client.upload_file(blob_path, s3_key)
59+
if cleanup:
60+
with contextlib.suppress(OSError):
61+
os.unlink(blob_path)
62+
return "uploaded"
63+
except Exception as exc:
64+
last_exc = exc
65+
if attempt < max_retries - 1:
66+
delay = retry_base_delay * (2**attempt)
67+
logger.warning(
68+
"Upload oid=0x%016x attempt %d/%d failed (%s), "
69+
"retrying in %.0fs ...",
70+
zoid,
71+
attempt + 1,
72+
max_retries,
73+
exc,
74+
delay,
75+
)
76+
time.sleep(delay)
77+
logger.error(
78+
"Upload oid=0x%016x FAILED after %d attempts: %s",
79+
zoid,
80+
max_retries,
81+
last_exc,
82+
)
83+
return "failed"
84+
85+
t0 = time.time()
86+
with ThreadPoolExecutor(max_workers=workers) as pool:
87+
futures = {pool.submit(_upload_one, *entry): entry[1] for entry in entries}
88+
for fut in as_completed(futures):
89+
result = fut.result()
90+
if result == "uploaded":
91+
uploaded += 1
92+
elif result == "failed":
93+
failed += 1
94+
elif result == "skipped":
95+
skipped += 1
96+
97+
elapsed = time.time() - t0
98+
logger.info(
99+
"Manifest upload complete: %d uploaded, %d failed, %d skipped (%.1fs)",
100+
uploaded,
101+
failed,
102+
skipped,
103+
elapsed,
104+
)
105+
return {"uploaded": uploaded, "failed": failed, "skipped": skipped}

tests/test_manifest.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from unittest.mock import MagicMock
2+
from zodb_convert.manifest import upload_from_manifest
3+
4+
5+
class TestUploadFromManifest:
6+
def test_uploads_all_entries(self, tmp_path):
7+
s3 = MagicMock()
8+
manifest = tmp_path / "manifest.tsv"
9+
10+
entries = []
11+
for i in range(3):
12+
blob = tmp_path / f"blob{i}"
13+
blob.write_bytes(f"data{i}".encode())
14+
entries.append(f"{blob}\tblobs/{i}.blob\t{i}\t5")
15+
manifest.write_text("\n".join(entries) + "\n")
16+
17+
stats = upload_from_manifest(str(manifest), s3, workers=2)
18+
19+
assert s3.upload_file.call_count == 3
20+
assert stats["uploaded"] == 3
21+
assert stats["failed"] == 0
22+
23+
def test_cleans_temp_files_after_upload(self, tmp_path):
24+
s3 = MagicMock()
25+
manifest = tmp_path / "manifest.tsv"
26+
blob = tmp_path / "blob0"
27+
blob.write_bytes(b"data")
28+
manifest.write_text(f"{blob}\tblobs/0.blob\t0\t4\n")
29+
30+
upload_from_manifest(str(manifest), s3, workers=1, cleanup=True)
31+
32+
assert not blob.exists()
33+
34+
def test_retries_and_reports_failures(self, tmp_path):
35+
s3 = MagicMock()
36+
s3.upload_file.side_effect = Exception("permanent")
37+
manifest = tmp_path / "manifest.tsv"
38+
blob = tmp_path / "blob0"
39+
blob.write_bytes(b"data")
40+
manifest.write_text(f"{blob}\tblobs/0.blob\t0\t4\n")
41+
42+
stats = upload_from_manifest(
43+
str(manifest),
44+
s3,
45+
workers=1,
46+
max_retries=1,
47+
retry_base_delay=0,
48+
)
49+
50+
assert stats["failed"] == 1
51+
assert stats["uploaded"] == 0
52+
53+
def test_skips_missing_files(self, tmp_path):
54+
s3 = MagicMock()
55+
manifest = tmp_path / "manifest.tsv"
56+
manifest.write_text("/nonexistent/blob\tblobs/0.blob\t0\t4\n")
57+
58+
stats = upload_from_manifest(str(manifest), s3, workers=1)
59+
60+
assert stats["skipped"] == 1
61+
assert s3.upload_file.call_count == 0

0 commit comments

Comments
 (0)