Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1842,9 +1842,9 @@ def verify_data(self):
if defect_chunks:
if self.repair:
# We would remove the defect chunks here, but single-object delete is not implemented
# yet (Repository.delete is a no-op: dropping a chunk means dropping its whole pack,
# which at N>1 takes good chunks with it). So we recheck each chunk and only report
# the ones that keep failing; real removal will come via compact once delete works at N>1.
# yet (Repository.delete is a no-op: a pack holds multiple objects, so dropping the
# whole pack to drop one chunk would take the good ones with it). So we recheck each
# chunk and only report the ones that keep failing; real removal will come via compact.
logger.warning("Found defect chunks. They can not be removed yet and are only reported.")
for defect_chunk in defect_chunks:
# remote repo (ssh): retry might help for strange network / NIC / RAM errors
Expand All @@ -1858,10 +1858,10 @@ def verify_data(self):
self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE)
except IntegrityErrorBase:
# failed twice -> we would like to get rid of this defect chunk. We must not
# drop its whole pack here: at N>1 the pack holds other, good chunks too.
# drop its whole pack here: the pack holds other, good chunks too.
# Repository.delete is a no-op for now (it just logs); real single-object
# removal will happen via compact.
# TODO: actually remove the defect chunk once delete works at N>1.
# TODO: actually remove the defect chunk once single-object delete exists.
self.repository.delete(defect_chunk)
else:
logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk))
Expand Down
17 changes: 5 additions & 12 deletions src/borg/archiver/debug_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ..helpers.argparsing import ArgumentParser
from ..manifest import Manifest
from ..platform import get_process_id
from ..repository import Repository, LIST_SCAN_LIMIT, StoreObjectNotFound, repo_lister
from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister
from ..repoobj import RepoObj

from ._common import with_repository, Highlander
Expand Down Expand Up @@ -293,19 +293,12 @@ def do_debug_delete_obj(self, args, repository):
except ValueError:
print("object id %s is invalid." % hex_id)
else:
entry = repository.chunks.get(id)
if entry is None:
try:
repository.delete(id)
except Repository.ObjectNotFound:
print("object %s not found." % hex_id)
else:
# N=1: one chunk per pack, so dropping the pack removes just this object; N>1 needs compaction.
try:
repository.store_delete("packs/" + bin_to_hex(entry.pack_id))
except StoreObjectNotFound:
# index points at an already-gone pack (stale entry)
print("object %s not found." % hex_id)
else:
del repository.chunks[id]
print("object %s deleted." % hex_id)
print("object %s deleted." % hex_id)
print("Done.")

def do_debug_convert_profile(self, args):
Expand Down
2 changes: 1 addition & 1 deletion src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def build_chunkindex_from_repo(
# Read each pack's object headers with borgstore range requests, fetching only the fixed-size
# headers and skipping the (much larger) encrypted payloads. Don't call Repository.list() here:
# it iterates this same index we are building, so it would recurse. The headers also give each
# object's real (chunk_id, offset, size), so this is not limited to one object per pack.
# object's real (chunk_id, offset, size), so every object in a pack is indexed individually.
for info in repository.store_list("packs"):
# PackReader uses the store directly, so refresh the lock here; a full rebuild can be slow.
repository._lock_refresh()
Expand Down
2 changes: 1 addition & 1 deletion src/borg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def create(self, name, id, ts, *, overwrite=False):
if isinstance(ts, datetime):
ts = ts.isoformat(timespec="microseconds")
assert isinstance(ts, str)
# flush buffered packs first: the pointer must not reference objects still buffered at N>1.
# flush buffered packs first: the pointer must not reference objects still sitting in the pack writer.
self.repository.flush()
# we only create a directory entry, its name points to the archive item:
self.repository.store_store(f"archives/{bin_to_hex(id)}", b"")
Expand Down
61 changes: 40 additions & 21 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,10 +736,10 @@ def get(self, id, read_data=True, raise_missing=True):
raise self.ObjectNotFound(id, str(self._location))
return None
if entry.pack_id == UNKNOWN_BYTES32:
# chunk is buffered in PackWriter, not yet flushed to a pack. at N=1 put() flushes
# immediately, so reaching here points at a flush / index-update ordering bug, not a
# genuinely missing object. this is a code bug, so we crash loudly regardless of
# raise_missing instead of pretending the object is absent.
# chunk is buffered in PackWriter, not yet flushed to a pack. Everything must be flushed
# before it can be read back, so reaching here points at a flush / index-update ordering
# bug, not a genuinely missing object. this is a code bug, so we crash loudly regardless
# of raise_missing instead of pretending the object is absent.
raise self.PackLocationUnknown(id, str(self._location))
pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size
id_hex = bin_to_hex(id)
Expand All @@ -753,10 +753,10 @@ def get(self, id, read_data=True, raise_missing=True):
hdr_size = RepoObj.obj_header.size
extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips
load_size = hdr_size + extra_size
# keep the read inside this object: at N>1 a pack holds neighbouring objects, so
# don't pull bytes past obj_size into the next one. (an overshoot would be harmless
# -- parse_meta uses the header's length and ignores trailing bytes -- this is just
# tidy.) obj_size comes from the same index we already route with.
# keep the read inside this object: a pack holds neighbouring objects, so don't pull
# bytes past obj_size into the next one. (an overshoot would be harmless -- parse_meta
# uses the header's length and ignores trailing bytes -- this is just tidy.) obj_size
# comes from the same index we already route with.
load_size = min(load_size, obj_size)
obj = self.store.load(key, offset=obj_offset, size=load_size)
hdr = obj[0:hdr_size]
Expand Down Expand Up @@ -787,9 +787,9 @@ def get_many(self, ids, read_data=True, raise_missing=True):
def put(self, id, data):
"""put a repo object

Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for
every chunk written to disk this call. At max_count=1 this is always
one entry.
Buffers the chunk in the pack writer. When the chunk fills the pack and
triggers a flush, returns a list of (chunk_id, pack_id, obj_offset, obj_size)
tuples, one per chunk written to disk by that flush; otherwise returns None.
"""
self._lock_refresh()
data_size = len(data)
Expand All @@ -799,22 +799,39 @@ def put(self, id, data):
return self._pack_writer.add(id, data)

def delete(self, id):
"""delete a repo object"""
"""Delete a single repo object by rewriting its pack without it.

A pack holds several objects, so we can not just drop the whole pack: that would take the
target's innocent neighbours with it. Route through compact_pack, which copies the survivors
into a new pack and repoints them in the chunk index. Rewriting a whole pack to remove one
object is slow, but delete is only used by special-purpose callers (tests, the debug command,
check --repair), never on a hot path, so the cost does not matter.
"""
self._lock_refresh()
# We can not remove one object by dropping its whole pack without losing the pack's other
# objects; real removal is store_delete at the pack level (compact). For now just check the
# object exists (ObjectNotFound contract), log, and do nothing.
# TODO: delete a single object once a pack can hold more than one (N>1).
from .cache import write_chunkindex_to_repo

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a circular import issue if you import this on module level?


entry = self.chunks.get(id)
if entry is None:
raise self.ObjectNotFound(id, str(self._location))
logger.warning("ignoring deletion of %s in %s", bin_to_hex(id), bin_to_hex(entry.pack_id))
pack_id = entry.pack_id
# keep every other object in the same pack; compact_pack rewrites the pack without the target.
# work off the live index (each entry already carries its offset/size after flush): do not
# rebuild from the packs here, that could not tell the target apart from stale duplicate copies
# of the same id left in other packs by re-puts (build keeps an arbitrary one of them).
keep_ids = {cid for cid, e in self.chunks.iteritems() if e.pack_id == pack_id}
keep_ids.discard(id)
self.compact_pack(pack_id, keep_ids=keep_ids, drop_ids={id})
# persist a full index so a following borg process sees the deletion; close() only writes new
# entries incrementally and would not record the removal.
write_chunkindex_to_repo(self, self.chunks, incremental=False, force_write=True, delete_other=True)

def compact_pack(self, pack_id, *, keep_ids: set, drop_ids: set):
"""Rewrite pack <pack_id>, keeping <keep_ids> and dropping <drop_ids>, then delete the old pack.

keep_ids and drop_ids are sets of chunk ids that must together cover the whole pack (asserted:
their ranges tile it with no gap or overlap, and their intersection is empty). Kept objects are
keep_ids and drop_ids are sets of chunk ids that must together be every object of the pack, as
recorded in the chunk index. The caller guarantees that completeness (both callers build the two
sets by iterating the index for this pack_id); here we only assert their ranges tile contiguously
from offset 0 with no gap or overlap, and that their intersection is empty. Kept objects are
copied into a new pack via store.defrag and repointed in the chunk index; dropped objects' index
entries are removed.

Expand All @@ -838,15 +855,17 @@ def compact_pack(self, pack_id, *, keep_ids: set, drop_ids: set):
located.append((entry.obj_offset, obj_id, entry.obj_size, keep))
located.sort()

# keep + drop must tile the whole pack; collect the objects to keep in the same pass.
# keep + drop tile the pack contiguously from offset 0; collect the objects to keep in the same
# pass. we do not cross-check against the pack's on-disk size: that needs a store.info() HEAD,
# which the stdio rest backend mishandles (it blocks reading a body the HEAD response never
# carries), and the caller already guarantees the two sets are the pack's complete object set.
Comment on lines +859 to +861

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like it should be in a issue on the borgstore repository.

kept = [] # (obj_offset, obj_id, obj_size), offset-ordered
covered = 0
for offset, obj_id, size, keep in located:
assert offset == covered, f"gap or overlap in pack {bin_to_hex(pack_id)} at offset {covered}"
covered += size
if keep:
kept.append((offset, obj_id, size))
assert covered == self.store.info(pack_key).size, f"pack {bin_to_hex(pack_id)} not fully covered"

for drop_id in drop_ids: # remove dropped objects from the index; their bytes are not copied forward
del self.chunks[drop_id]
Expand Down
14 changes: 2 additions & 12 deletions src/borg/testsuite/archiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from ...constants import * # NOQA
from ...helpers import Location, umount
from ...helpers import EXIT_SUCCESS
from ...helpers import bin_to_hex
from ...helpers import init_ec_warnings
from ...logger import flush_logging
from ...manifest import Manifest
Expand Down Expand Up @@ -181,17 +180,8 @@ def open_archive(repo_path, name):


def delete_chunk(repository, id):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess you just implemented Repository.delete here?

It is not very fast, but considering it is rarely ever used, this is not a problem.

"""Drop the pack holding chunk `id` (test damage helper).

Repository.delete is a no-op now, so tests that need a chunk to really vanish drop its whole
pack at the store level. Works at N=1 (one chunk per pack). The pack is resolved through the
chunk index, since the pack file name is the pack_id, which need not equal the chunk_id.
"""
entry = repository.chunks.get(id)
if entry is not None:
repository.store_delete("packs/" + bin_to_hex(entry.pack_id))
else:
raise Repository.ObjectNotFound(id, repository)
"""Remove a single chunk from the repo, leaving the rest of its pack intact (test damage helper)."""
repository.delete(id)
Comment on lines 182 to +184

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a special reason why you kept this wrapper?



def open_repository(archiver):
Expand Down
22 changes: 3 additions & 19 deletions src/borg/testsuite/archiver/check_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@ def test_date_matching(archivers, request):
assert archive not in output


@pytest.mark.skip(
reason="TODO: a non-repair check verifies index and packs by sha256, then runs the archive checks "
"(--archives-only) against that verified index instead of rebuilding it from the packs. A real missing "
"chunk would be a corrupted pack (caught by the sha256 pack check) or a borg index bug; detecting this "
"artificial one needs the index rebuild that --repair does. Rework with the index/repair redesign, refs #8572."
)
def test_missing_file_chunk(archivers, request):
archiver = request.getfixturevalue(archivers)
check_cmd_setup(archiver)
Expand Down Expand Up @@ -199,11 +193,6 @@ def test_missing_file_chunk(archivers, request):
assert "Missing file chunk detected" not in output


@pytest.mark.skip(
reason="TODO: a non-repair check verifies index and packs by sha256 and uses that verified index (it "
"does not rebuild it); the index still lists chunks whose pack was removed here, so reading them raises "
"ObjectNotFound instead of being reported as missing. Needs the index/repair redesign, refs #8572."
)
def test_missing_archive_item_chunk(archivers, request):
archiver = request.getfixturevalue(archivers)
check_cmd_setup(archiver)
Expand All @@ -215,11 +204,6 @@ def test_missing_archive_item_chunk(archivers, request):
cmd(archiver, "check", exit_code=0)


@pytest.mark.skip(
reason="TODO: a non-repair check verifies index and packs by sha256 and uses that verified index (it "
"does not rebuild it); the index still lists chunks whose pack was removed here, so reading them raises "
"ObjectNotFound instead of being reported as missing. Needs the index/repair redesign, refs #8572."
)
def test_missing_archive_metadata(archivers, request):
archiver = request.getfixturevalue(archivers)
check_cmd_setup(archiver)
Expand Down Expand Up @@ -308,7 +292,7 @@ def test_manifest_rebuild_corrupted_chunk(archivers, request):
# framing rather than the authenticated payload, which made the repair flaky on Windows.
corrupted_chunk = corrupt(chunk, len(chunk) // 2)
repository.put(archive.id, corrupted_chunk)
repository.flush() # N>1: make the put durable before close()/the check below
repository.flush() # make the put durable before close()/the check below
cmd(archiver, "check", exit_code=1)
output = cmd(archiver, "check", "-v", "--repair", exit_code=0)
assert "archive2" in output
Expand Down Expand Up @@ -367,7 +351,7 @@ def test_spoofed_archive(archivers, request):
ro_type=ROBJ_FILE_STREAM, # a real archive is stored with ROBJ_ARCHIVE_META
),
)
repository.flush() # N>1: make the put durable before close()/the check below
repository.flush() # make the put durable before close()/the check below
cmd(archiver, "check", exit_code=1)
cmd(archiver, "check", "--repair", "--debug", exit_code=0)
output = cmd(archiver, "repo-list")
Expand All @@ -386,7 +370,7 @@ def test_extra_chunks(archivers, request):
key = b"01234567890123456789012345678901"
chunk = fchunk(b"xxxx", chunk_id=key)
repository.put(key, chunk)
repository.flush() # N>1: make the put durable before close()/the check below
repository.flush() # make the put durable before close()/the check below
cmd(archiver, "check", "-v", exit_code=0) # check does not deal with orphans anymore


Expand Down
2 changes: 1 addition & 1 deletion src/borg/testsuite/archiver/compact_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_compact_packs_respects_threshold(tmp_path):
# Two multi-object packs in one repo, then pack-level compaction at a 40% threshold. The pack that
# wastes 2/3 of its bytes is rewritten down to its single survivor (and its old file deleted); the
# pack that wastes only 1/3 stays untouched, since copying its survivors to reclaim that little is
# not worth it. This covers the rewrite, leave-alone and keep/drop split that N=1 can't reach.
# not worth it. This covers the rewrite, leave-alone and keep/drop split unique to multi-object packs.
from ...archiver.compact_cmd import ArchiveGarbageCollector

location = os.fspath(tmp_path / "repo")
Expand Down
1 change: 1 addition & 0 deletions src/borg/testsuite/archiver/debug_cmds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_debug_put_get_delete_obj(archivers, request):
output = cmd(archiver, "debug", "delete-obj", id_hash)
assert "deleted" in output

# the object is gone now: deleting it again reports it is not there
output = cmd(archiver, "debug", "delete-obj", id_hash)
assert "not found" in output

Expand Down
2 changes: 1 addition & 1 deletion src/borg/testsuite/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def repository(self, tmpdir):
self.repository_location = os.path.join(str(tmpdir), "repository")
with Repository(self.repository_location, exclusive=True, create=True) as repository:
repository.put(H(1), b"1234")
repository.flush() # N>1: the lone put would stay buffered; make it durable
repository.flush() # the lone put would stay buffered; make it durable
yield repository
repository.flush() # flush anything a test buffered via the cache before close()

Expand Down
2 changes: 1 addition & 1 deletion src/borg/testsuite/hashindex_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_chunkindex_update_pack_info():
assert chunks[x2].obj_offset == UNKNOWN_INT32

pack_id = H2(3)
# Both chunks land in the same pack (N>1 scenario): batch update in one call.
# Both chunks land in the same pack: batch update in one call.
chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)])
# Location fields updated; flags and size must be unchanged.
assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50)
Expand Down
Loading
Loading