Skip to content

Commit e57e22d

Browse files
Merge pull request #9801 from mr-raj12/pack-files-compact
compact: free space per pack with --threshold
2 parents 13fa623 + 4a28995 commit e57e22d

2 files changed

Lines changed: 190 additions & 64 deletions

File tree

src/borg/archiver/compact_cmd.py

Lines changed: 128 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections import defaultdict
12
from pathlib import Path
23

34
from ._common import with_repository
@@ -19,7 +20,7 @@
1920

2021

2122
class ArchiveGarbageCollector:
22-
def __init__(self, repository, manifest, *, stats, iec):
23+
def __init__(self, repository, manifest, *, stats, iec, threshold, dry_run=False):
2324
self.repository = repository
2425
assert isinstance(repository, Repository)
2526
self.manifest = manifest
@@ -28,7 +29,9 @@ def __init__(self, repository, manifest, *, stats, iec):
2829
self.total_size = None # overall size of source file content data written to all archives
2930
self.archives_count = None # number of archives
3031
self.stats = stats # compute repo space usage before/after - lists all repo objects, can be slow.
32+
self.threshold = threshold # rewrite a mixed pack only when its wasted-bytes fraction reaches this percent
3133
self.iec = iec # formats statistics using IEC units (1KiB = 1024B)
34+
self.dry_run = dry_run
3235

3336
@property
3437
def repository_size(self):
@@ -43,20 +46,17 @@ def garbage_collect(self):
4346
logger.info("Computing object IDs used by archives...")
4447
(self.missing_chunks, self.total_files, self.total_size, self.archives_count) = self.analyze_archives()
4548
self.report_and_delete()
46-
self.save_chunk_index()
47-
self.cleanup_files_cache()
49+
if not self.dry_run:
50+
self.save_chunk_index()
51+
self.cleanup_files_cache()
4852
logger.info("Finished compaction / garbage collection...")
4953

5054
def get_repository_chunks(self) -> ChunkIndex:
5155
"""return a chunks index"""
52-
if self.stats:
53-
# slow but thorough: scan the pack headers for real sizes/locations and to catch objects
54-
# missing from the cached index. Start unused (F_NONE); analyze_archives marks used ones.
55-
logger.info("Getting object IDs present in the repository...")
56-
chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, init_flags=ChunkIndex.F_NONE)
57-
else: # faster: rely on existing chunks index (with flags F_NONE and size 0).
58-
logger.info("Getting object IDs from cached chunks index...")
59-
chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
56+
# The cached index already has each object's obj_size and starts entries as F_NONE, so it
57+
# serves both GC and --stats; no need to force the slow pack-header scan just to get sizes.
58+
logger.info("Getting object IDs from the cached chunks index...")
59+
chunks = build_chunkindex_from_repo(self.repository, cache_immediately=not self.dry_run)
6060
return chunks
6161

6262
def save_chunk_index(self):
@@ -127,7 +127,6 @@ def use_it(id):
127127
)
128128
total_size, total_files = 0, 0
129129
for i, info in enumerate(archive_infos):
130-
pi.show(i)
131130
logger.info(
132131
f"Analyzing archive {info.name} {info.ts.astimezone()} {bin_to_hex(info.id)} ({i + 1}/{num_archives})"
133132
)
@@ -145,6 +144,7 @@ def use_it(id):
145144
for id, size in item.chunks:
146145
total_size += size # original, uncompressed file content size
147146
use_it(id)
147+
pi.show(i + 1) # report after each archive, so the last one lands on 100%
148148
pi.finish()
149149
return missing_chunks, total_files, total_size, num_archives
150150

@@ -154,41 +154,18 @@ def report_and_delete(self):
154154
for id in sorted(self.missing_chunks):
155155
logger.debug(f"Missing object {bin_to_hex(id)}")
156156
set_ec(EXIT_ERROR)
157-
158-
logger.info("Cleaning archives directory from soft-deleted archives...")
159-
archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True)
160-
for archive_info in archive_infos:
161-
name, id, hex_id = archive_info.name, archive_info.id, bin_to_hex(archive_info.id)
162-
try:
163-
self.manifest.archives.nuke_by_id(id)
164-
except self.repository.ObjectNotFound:
165-
logger.warning(f"Soft-deleted archive {name} {hex_id} not found.")
157+
if not self.dry_run: # nuking removes the soft-deleted archives from the archives directory; skip on a dry run
158+
logger.info("Cleaning archives directory from soft-deleted archives...")
159+
archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True)
160+
for archive_info in archive_infos:
161+
name, id, hex_id = archive_info.name, archive_info.id, bin_to_hex(archive_info.id)
162+
try:
163+
self.manifest.archives.nuke_by_id(id)
164+
except self.repository.ObjectNotFound:
165+
logger.warning(f"Soft-deleted archive {name} {hex_id} not found.")
166166

167167
repo_size_before = self.repository_size
168-
logger.info("Determining unused objects...")
169-
unused = set()
170-
for id, entry in self.chunks.iteritems():
171-
if not (entry.flags & ChunkIndex.F_USED):
172-
unused.add(id)
173-
logger.info(f"Deleting {len(unused)} unused objects...")
174-
if unused:
175-
# Before deleting any repository object, invalidate all centrally cached chunk indexes.
176-
# Otherwise, if we get interrupted within the deletion loop, the still-existing index/*
177-
# would claim that already-deleted objects are still present. A later "borg create" would then
178-
# trust that stale index, not re-upload the affected chunks and silently create an archive with
179-
# dangling object references (see issue #9748). By removing the cached indexes first, an
180-
# interruption is conservative: clients must rebuild the index from actual repository contents
181-
# and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards.
182-
delete_chunkindex_from_repo(self.repository)
183-
pi = ProgressIndicatorPercent(
184-
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
185-
)
186-
for i, id in enumerate(unused):
187-
pi.show(i)
188-
# N=1: the chunk is alone in its pack, so dropping the pack frees just it; N>1 needs compaction.
189-
self.repository.store_delete("packs/" + bin_to_hex(self.chunks[id].pack_id))
190-
del self.chunks[id]
191-
pi.finish()
168+
self.compact_packs()
192169
repo_size_after = self.repository_size
193170

194171
count = len(self.chunks)
@@ -202,20 +179,104 @@ def report_and_delete(self):
202179
f"Repository size is {format_file_size(repo_size_after, precision=0, iec=self.iec)} "
203180
f"in {count} objects."
204181
)
205-
logger.info(
206-
f"Compaction saved "
207-
f"{format_file_size(repo_size_before - repo_size_after, precision=0, iec=self.iec)}."
208-
)
182+
if not self.dry_run: # nothing was deleted on a dry run, so before == after
183+
logger.info(
184+
f"Compaction saved "
185+
f"{format_file_size(repo_size_before - repo_size_after, precision=0, iec=self.iec)}."
186+
)
209187
else:
210188
logger.info(f"Repository has data stored in {count} objects.")
211189

190+
def compact_packs(self):
191+
"""Free space one pack at a time (the store can only delete whole packs).
192+
193+
analyze_archives() has flagged the used objects F_USED. Per pack:
194+
195+
- all objects unused -> delete the pack.
196+
- all objects used -> keep it.
197+
- mixed -> rewrite only if the unused bytes reach --threshold percent: copy the
198+
used objects into a new pack via compact_pack and drop the old one.
199+
Below the threshold keep the pack, so we don't rewrite a large pack
200+
to reclaim little.
201+
202+
Two passes bound the memory use: the first keeps only per-pack byte counts to pick the packs
203+
to change, the second collects object ids for just those packs, not the whole index.
204+
"""
205+
# Pass 1: one index scan, keep only per-pack byte tallies (two ints per pack, no id lists).
206+
pack_total, pack_unused = defaultdict(int), defaultdict(int)
207+
for id, entry in self.chunks.iteritems():
208+
pid = entry.pack_id
209+
pack_total[pid] += entry.obj_size
210+
if not (entry.flags & ChunkIndex.F_USED):
211+
pack_unused[pid] += entry.obj_size
212+
213+
# decide each pack's fate from the tallies
214+
drop_packs, rewrite_packs = set(), set()
215+
for pid, total in pack_total.items():
216+
unused = pack_unused.get(pid, 0) # .get, not [pid]: don't insert all-used packs into the dict
217+
if unused == 0:
218+
continue # all used -> leave alone
219+
if unused == total:
220+
drop_packs.add(pid) # all unused -> drop the whole file
221+
elif 100 * unused / total >= self.threshold:
222+
rewrite_packs.add(pid) # mixed and wasteful enough -> copy survivors forward
223+
# else: mixed but below threshold -> leave alone
224+
if self.dry_run:
225+
freed = sum(pack_unused[pid] for pid in drop_packs) + sum(pack_unused[pid] for pid in rewrite_packs)
226+
logger.info(
227+
f"Would free {format_file_size(freed, iec=self.iec)} "
228+
f"by dropping {len(drop_packs)} packs and rewriting {len(rewrite_packs)} packs."
229+
)
230+
return # dry run: report only, change nothing
231+
if not drop_packs and not rewrite_packs:
232+
logger.info("Deleting 0 unused objects...")
233+
return # nothing to reclaim; do not touch the cached chunk indexes
234+
235+
# crash-safety (#9748): invalidate cached chunk indexes before the first store change
236+
delete_chunkindex_from_repo(self.repository)
237+
238+
# Pass 2: collect object ids only for the affected packs (a subset, not the whole index)
239+
keep = {pid: set() for pid in rewrite_packs} # survivors to copy forward, per pack
240+
drop = {pid: set() for pid in rewrite_packs} # unused objects in those same packs
241+
forget = [] # ids living in fully-unused packs we delete outright
242+
for id, entry in self.chunks.iteritems():
243+
pid = entry.pack_id
244+
if pid in rewrite_packs:
245+
(keep if entry.flags & ChunkIndex.F_USED else drop)[pid].add(id)
246+
elif pid in drop_packs:
247+
forget.append(id)
248+
249+
# count what we remove: every object of a dropped pack, plus the unused objects cut from
250+
# rewritten packs. unused objects in below-threshold packs stay, so they don't count.
251+
deleted = len(forget) + sum(len(ids) for ids in drop.values())
252+
logger.info(f"Deleting {deleted} unused objects...")
253+
pi = ProgressIndicatorPercent(
254+
total=len(drop_packs) + len(rewrite_packs),
255+
msg="Compacting packs %3.1f%%",
256+
step=0.1,
257+
msgid="compact.compact_packs",
258+
)
259+
progress = 0
260+
for pid in drop_packs:
261+
self.repository.store_delete("packs/" + bin_to_hex(pid))
262+
progress += 1
263+
pi.show(progress) # report after the work, so the final pack lands on 100%
264+
for id in forget:
265+
del self.chunks[id] # their pack file is gone, so drop their index entries too
266+
for pid in rewrite_packs:
267+
self.repository.compact_pack(pid, keep_ids=keep[pid], drop_ids=drop[pid]) # helper owns index update
268+
progress += 1
269+
pi.show(progress)
270+
pi.finish()
271+
212272

213273
class CompactMixIn:
214274
@with_repository(exclusive=True, compatibility=(Manifest.Operation.DELETE,))
215275
def do_compact(self, args, repository, manifest):
216276
"""Collects garbage in the repository."""
217-
if not args.dry_run: # support --dry-run to simplify scripting
218-
ArchiveGarbageCollector(repository, manifest, stats=args.stats, iec=args.iec).garbage_collect()
277+
ArchiveGarbageCollector(
278+
repository, manifest, stats=args.stats, iec=args.iec, threshold=args.threshold, dry_run=args.dry_run
279+
).garbage_collect()
219280

220281
def build_parser_compact(self, subparsers, common_parser, mid_common_parser):
221282
from ._common import process_epilog
@@ -251,22 +312,27 @@ def build_parser_compact(self, subparsers, common_parser, mid_common_parser):
251312
seeing fatal errors when creating backups or when archives are missing in
252313
``borg repo-list``).
253314
254-
When using the ``--stats`` option, borg will internally list all repository
255-
objects to determine their existence and stored size. It will build a fresh
256-
chunks index from that information and cache it in the repository. For some
257-
types of repositories, this might be very slow. It will tell you the sum of
258-
stored object sizes, before and after compaction.
259-
260-
Without ``--stats``, borg will rely on the cached chunks index to determine
261-
existing object IDs (but there is no stored size information in the index,
262-
thus it cannot compute before/after compaction size statistics).
315+
With ``--stats``, borg additionally reports the sum of stored object sizes
316+
before and after compaction.
263317
"""
264318
)
265319
subparser = ArgumentParser(parents=[common_parser], description=self.do_compact.__doc__, epilog=compact_epilog)
266320
subparsers.add_subcommand("compact", subparser, help="compact the repository")
267321
subparser.add_argument(
268-
"-n", "--dry-run", dest="dry_run", action="store_true", help="do not change the repository"
322+
"-n",
323+
"--dry-run",
324+
dest="dry_run",
325+
action="store_true",
326+
help="do not change the repository, just show what compact would free",
327+
)
328+
subparser.add_argument(
329+
"-s", "--stats", dest="stats", action="store_true", help="print repository size statistics"
269330
)
270331
subparser.add_argument(
271-
"-s", "--stats", dest="stats", action="store_true", help="print statistics (might be much slower)"
332+
"--threshold",
333+
metavar="PERCENT",
334+
dest="threshold",
335+
type=int,
336+
default=10,
337+
help="rewrite a pack when at least PERCENT of its bytes are unused (default: 10)",
272338
)

src/borg/testsuite/archiver/compact_cmd_test.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
import pytest
55

66
from ...constants import * # NOQA
7-
from ...helpers import get_cache_dir
7+
from ...helpers import get_cache_dir, bin_to_hex
8+
from ...hashindex import ChunkIndex
9+
from ...repository import Repository
810
from ...cache import files_cache_name, discover_files_cache_names, list_chunkindex_hashes
911
from ...manifest import Manifest
1012
from . import cmd, create_regular_file, create_src_archive, generate_archiver_tests, open_repository, RK_ENCRYPTION
1113
from . import changedir
14+
from ..repository_test import H, fchunk, pdchunk
1215

1316
pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA
1417

@@ -119,7 +122,7 @@ def test_compact_interrupted_does_not_poison_chunk_index(archivers, request, mon
119122
repository = open_repository(archiver)
120123
with repository:
121124
manifest = Manifest.load(repository, (Manifest.Operation.DELETE,))
122-
gc = ArchiveGarbageCollector(repository, manifest, stats=stats, iec=False)
125+
gc = ArchiveGarbageCollector(repository, manifest, stats=stats, iec=False, threshold=40.0)
123126

124127
def interrupt():
125128
raise KeyboardInterrupt("simulated interruption before save_chunk_index")
@@ -143,6 +146,63 @@ def interrupt():
143146
assert fd.read() == content
144147

145148

149+
def test_compact_packs_respects_threshold(tmp_path):
150+
# Two multi-object packs in one repo, then pack-level compaction at a 40% threshold. The pack that
151+
# wastes 2/3 of its bytes is rewritten down to its single survivor (and its old file deleted); the
152+
# pack that wastes only 1/3 stays untouched, since copying its survivors to reclaim that little is
153+
# not worth it. This covers the rewrite, leave-alone and keep/drop split that N=1 can't reach.
154+
from ...archiver.compact_cmd import ArchiveGarbageCollector
155+
156+
location = os.fspath(tmp_path / "repo")
157+
with Repository(location, exclusive=True, create=True) as repository:
158+
repository._pack_writer.max_count = 4 # buffer several objects, so each flush() writes one pack
159+
for i in range(3): # H0..H2 -> wasteful pack
160+
repository.put(H(i), fchunk(f"DATA{i}".encode(), chunk_id=H(i)))
161+
repository.flush()
162+
for i in range(3, 6): # H3..H5 -> frugal pack
163+
repository.put(H(i), fchunk(f"DATA{i}".encode(), chunk_id=H(i)))
164+
repository.flush()
165+
166+
wasteful_pack = repository.chunks[H(0)].pack_id
167+
frugal_pack = repository.chunks[H(3)].pack_id
168+
# wasteful pack keeps only H0 (2/3 wasted), frugal pack keeps H3 and H4 (1/3 wasted)
169+
used = {H(0), H(3), H(4)}
170+
for i in range(6):
171+
entry = repository.chunks[H(i)]
172+
flags = ChunkIndex.F_USED if H(i) in used else ChunkIndex.F_NONE
173+
repository.chunks[H(i)] = entry._replace(flags=flags)
174+
175+
gc = ArchiveGarbageCollector(repository, manifest=None, stats=False, iec=False, threshold=40)
176+
gc.chunks = repository.chunks
177+
gc.compact_packs()
178+
179+
# wasteful pack was rewritten: the survivor reads back, the dropped objects and the old file are gone
180+
assert pdchunk(repository.get(H(0))) == b"DATA0"
181+
assert repository.get(H(1), raise_missing=False) is None
182+
assert repository.get(H(2), raise_missing=False) is None
183+
# frugal pack stayed below threshold: untouched, every object (even the unused H5) still present
184+
for i in range(3, 6):
185+
assert pdchunk(repository.get(H(i))) == f"DATA{i}".encode()
186+
pack_names = [info.name for info in repository.store_list("packs")]
187+
assert bin_to_hex(wasteful_pack) not in pack_names
188+
assert bin_to_hex(frugal_pack) in pack_names
189+
190+
191+
def test_compact_dry_run_reports_and_changes_nothing(archivers, request):
192+
# --dry-run prints the would-free estimate (issue #9379) and must not touch the repository.
193+
archiver = request.getfixturevalue(archivers)
194+
cmd(archiver, "repo-create", RK_ENCRYPTION)
195+
create_src_archive(archiver, "archive")
196+
cmd(archiver, "delete", "-a", "archive")
197+
198+
out = cmd(archiver, "compact", "--dry-run", "-v", exit_code=0)
199+
assert "Would free" in out # the estimate is reported
200+
201+
# proof nothing was removed: a real compact afterwards still finds the unused objects to delete
202+
out2 = cmd(archiver, "compact", "-v", exit_code=0)
203+
assert "Deleting 0 unused objects" not in out2
204+
205+
146206
def test_compact_files_cache_cleanup(archivers, request):
147207
"""Test that files cache files for deleted archives are removed during compact."""
148208
archiver = request.getfixturevalue(archivers)

0 commit comments

Comments
 (0)