Skip to content

Commit 52fdfc7

Browse files
Merge pull request #9828 from mr-raj12/chunkindex-pending-flag
hashindex: track unresolved pack location with F_PENDING flag
2 parents 2ae0420 + 1206808 commit 52fdfc7

6 files changed

Lines changed: 79 additions & 38 deletions

File tree

src/borg/archive.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from .crypto.low_level import IntegrityError as IntegrityErrorBase
3030
from .helpers import BackupError, BackupRaceConditionError, BackupItemExcluded
3131
from .helpers import BackupOSError, BackupPermissionError, BackupFileNotFoundError, BackupIOError
32-
from .hashindex import ChunkIndex, ChunkIndexEntry
3332
from .helpers import HardLinkManager
3433
from .helpers import ChunkIteratorFileWrapper, open_item
3534
from .helpers import Error, IntegrityError, set_ec
@@ -1968,13 +1967,7 @@ def add_reference(id_, size, cdata):
19681967
# either we already have this chunk in repo and chunks index or we add it now
19691968
if id_ not in self.chunks:
19701969
assert cdata is not None
1971-
self.chunks[id_] = ChunkIndexEntry(
1972-
flags=ChunkIndex.F_USED,
1973-
size=size,
1974-
pack_id=UNKNOWN_BYTES32,
1975-
obj_offset=UNKNOWN_INT32,
1976-
obj_size=UNKNOWN_INT32,
1977-
)
1970+
self.chunks.add(id_, size)
19781971
if self.repair:
19791972
pack_results = self.repository.put(id_, cdata)
19801973
self.chunks.update_pack_info(pack_results)

src/borg/constants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
# Grep for UNKNOWN_INT32 to find every site that still needs updating.
5656
UNKNOWN_INT32 = 0xFFFFFFFF
5757

58-
# Placeholder for pack_id (32-byte field) when the value is not yet known.
58+
# Filler for the pack_id (32-byte field) while the real value is unknown. Never interpreted;
59+
# an unresolved pack location is tracked by the ChunkIndex.F_PENDING flag.
5960
UNKNOWN_BYTES32 = b"\xff" * 32
6061

6162
# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header)

src/borg/hashindex.pyx

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
5252
# user flags:
5353
F_USED = 2 ** 0 # chunk is used/referenced
5454
F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed
55+
F_PENDING = 2 ** 2 # pack location (pack_id, obj_offset, obj_size) not resolved yet.
5556
# system flags (internal use, always 0 to user, not changeable by user):
5657
F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet.
5758

@@ -81,8 +82,11 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
8182
else:
8283
flags = v.flags | self.F_USED
8384
assert v.size == 0 or v.size == size
85+
# F_PENDING marks the pack location (pack_id, obj_offset, obj_size) as not yet set.
86+
# Re-adding a chunk resets it to UNKNOWN/pending, dropping any prior location until the next flush().
8487
self[key] = ChunkIndexEntry(
85-
flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
88+
flags=flags | self.F_PENDING, size=size,
89+
pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
8690
)
8791

8892
def __getitem__(self, key):
@@ -109,12 +113,19 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
109113
self.ht[key] = value._replace(flags=system_flags | user_flags)
110114

111115
def update_pack_info(self, pack_results):
112-
"""Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples."""
116+
"""Set pack_id, obj_offset and obj_size from a list of (chunk_id, pack_id, obj_offset, obj_size)
117+
tuples and clear F_PENDING."""
113118
if not pack_results:
114119
return
115120
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
116121
existing = self[chunk_id]
117-
self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size)
122+
self[chunk_id] = existing._replace(
123+
flags=existing.flags & ~self.F_PENDING, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size
124+
)
125+
126+
def is_pending(self, key):
127+
"""Return whether the chunk's pack location (pack_id, obj_offset, obj_size) is unresolved."""
128+
return bool(self[key].flags & self.F_PENDING)
118129

119130
def clear_new(self):
120131
"""Clears the F_NEW flag of all items."""

src/borg/repository.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ def build_rest_backend(location):
106106
class PackWriter:
107107
"""Buffers chunks into a pack file and writes it to the store when full.
108108
109-
add() buffers a (chunk_id, cdata) pair and marks the chunk pending in the
110-
ChunkIndex (pack_id=UNKNOWN_BYTES32); flush() writes the pack and sets each
111-
entry's real pack_id and obj_offset.
109+
add() buffers a (chunk_id, cdata) pair and marks the chunk pending (F_PENDING);
110+
flush() writes the pack and sets each entry's pack_id, obj_offset and obj_size,
111+
clearing F_PENDING.
112112
113113
The ChunkIndex comes from the repository, or from an explicit chunks index when
114114
there is no repository (see the chunks property).
@@ -142,8 +142,6 @@ def chunks(self):
142142
def add(self, chunk_id, cdata):
143143
"""Buffer a chunk. Returns flush results if the pack is now full, else None."""
144144
self.chunks.add(chunk_id, 0) # size: plaintext chunk size, set by the cache layer
145-
# obj_size is final; pack_id and obj_offset get their real values in flush().
146-
self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))])
147145
self._pieces.append((chunk_id, cdata))
148146
self._size += len(cdata)
149147
if (self.max_count is not None and len(self._pieces) >= self.max_count) or (
@@ -179,22 +177,19 @@ def flush(self):
179177
offset += obj_size
180178

181179
key = "packs/" + bin_to_hex(pack_id)
182-
# ids this flush pre-marked in the index via add() (pack_id still UNKNOWN_BYTES32).
183180
pending_ids = [chunk_id for chunk_id, _ in self._pieces]
184181
try:
185182
self.store.store(key, pack_data)
186183
except Exception:
187-
# The pack is not stored, so drop the pending entries: keeping them would let
188-
# seen_chunk() dedup later chunks against bytes that were never written.
184+
# the pack was not stored: drop the index entries for its chunks.
189185
for chunk_id in pending_ids:
190-
entry = self.chunks.get(chunk_id)
191-
if entry is not None and entry.pack_id == UNKNOWN_BYTES32:
186+
if chunk_id in self.chunks: # a chunk_id may appear more than once in this pack
192187
del self.chunks[chunk_id]
193188
raise
194189
finally:
195190
self._pieces = [] # cleared on success and on failure
196191
self._size = 0
197-
self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id
192+
self.chunks.update_pack_info(results) # set the real location and clear F_PENDING
198193
return results
199194

200195

@@ -711,7 +706,7 @@ def list(self, limit=None, marker=None):
711706
collect = marker is None
712707
result = []
713708
for chunk_id, entry in self.chunks.iteritems():
714-
if entry.pack_id == UNKNOWN_BYTES32:
709+
if self.chunks.is_pending(chunk_id):
715710
continue # buffered in PackWriter, not flushed to a pack yet
716711
if collect:
717712
result.append((chunk_id, entry.obj_size))
@@ -728,11 +723,9 @@ def get(self, id, read_data=True, raise_missing=True):
728723
if raise_missing:
729724
raise self.ObjectNotFound(id, str(self._location))
730725
return None
731-
if entry.pack_id == UNKNOWN_BYTES32:
732-
# chunk is buffered in PackWriter, not yet flushed to a pack. Everything must be flushed
733-
# before it can be read back, so reaching here points at a flush / index-update ordering
734-
# bug, not a genuinely missing object. this is a code bug, so we crash loudly regardless
735-
# of raise_missing instead of pretending the object is absent.
726+
if self.chunks.is_pending(id):
727+
# buffered but not flushed; a chunk must be flushed before any read, so this is a code
728+
# bug (wrong flush/index ordering), not a missing object: raise regardless of raise_missing.
736729
raise self.PackLocationUnknown(id, str(self._location))
737730
pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size
738731
id_hex = bin_to_hex(id)

src/borg/testsuite/hashindex_test.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ def test_chunkindex_add():
2121
chunks = ChunkIndex()
2222
x = H2(1)
2323
chunks.add(x, 0)
24+
assert chunks.is_pending(x)
25+
pending = ChunkIndex.F_USED | ChunkIndex.F_PENDING # add() sets F_PENDING alongside F_USED
2426
assert chunks[x] == ChunkIndexEntry(
25-
flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
27+
flags=pending, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
2628
)
2729
chunks.add(x, 2) # updating size (we do not have a size yet)
2830
assert chunks[x] == ChunkIndexEntry(
29-
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
31+
flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
3032
)
3133
chunks.add(x, 2)
3234
assert chunks[x] == ChunkIndexEntry(
33-
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
35+
flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
3436
)
3537
with pytest.raises(AssertionError):
3638
chunks.add(x, 3) # inconsistent size (we already have a different size)
@@ -43,10 +45,15 @@ def test_chunkindex_update_pack_info():
4345
chunks.add(x2, 20)
4446
assert chunks[x1].obj_offset == UNKNOWN_INT32
4547
assert chunks[x2].obj_offset == UNKNOWN_INT32
48+
assert chunks.is_pending(x1)
49+
assert chunks.is_pending(x2)
4650

4751
pack_id = H2(3)
4852
# Both chunks land in the same pack: batch update in one call.
4953
chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)])
54+
# resolving the location clears the pending flag
55+
assert not chunks.is_pending(x1)
56+
assert not chunks.is_pending(x2)
5057
# Location fields updated; flags and size must be unchanged.
5158
assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50)
5259
assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60)

src/borg/testsuite/repository_test.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import pytest
66
from ..helpers import IntegrityError, Location, bin_to_hex
77
from ..hashindex import ChunkIndex
8-
from ..constants import UNKNOWN_BYTES32
98
from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter, PackReader
109
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
1110
from .hashindex_test import H
@@ -187,11 +186,11 @@ def test_multi_object_pack_roundtrip(repo_fixtures, request):
187186
with get_repository_from_fixture(repo_fixtures, request) as repository:
188187
repository._pack_writer.max_count = 2 # this test is written for exactly two objects per pack
189188
repository.put(H(0), chunk0)
190-
assert repository.chunks[H(0)].pack_id == UNKNOWN_BYTES32 # buffered: the pack is not full yet
189+
assert repository.chunks.is_pending(H(0)) # buffered: the pack is not full yet
191190
repository.put(H(1), chunk1) # fills the pack, flushing both objects at once
192191
# both objects share one pack, written exactly once, laid out in put() order
193192
pack_id = repository.chunks[H(0)].pack_id
194-
assert pack_id != UNKNOWN_BYTES32
193+
assert not repository.chunks.is_pending(H(0))
195194
assert repository.chunks[H(1)].pack_id == pack_id
196195
assert [info.name for info in repository.store_list("packs")] == [bin_to_hex(pack_id)]
197196
assert repository.chunks[H(0)].obj_offset == 0
@@ -509,20 +508,57 @@ def test_get_uses_chunk_index_location(tmp_path):
509508

510509

511510
def test_put_marks_id_in_chunk_index(tmp_path):
512-
# put() marks the id pending (pack_id=UNKNOWN_BYTES32); flush() then fills in the
513-
# real pack location for the current session.
511+
# put() marks the id pending; flush() sets the real pack location and clears the pending flag.
514512
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
515513
id1 = H(1)
516514
repository.put(id1, fchunk(b"ZEROS"))
517515
entry = repository._chunks.get(id1)
518516
assert entry is not None
519-
assert entry.pack_id == UNKNOWN_BYTES32 # buffered, not yet flushed
517+
assert repository._chunks.is_pending(id1) # buffered, not yet flushed
520518
repository.flush()
521519
entry = repository._chunks.get(id1)
520+
assert not repository._chunks.is_pending(id1)
522521
assert entry.pack_id == sha256(fchunk(b"ZEROS")).digest()
523522
assert entry.size == 0 # uncompressed size filled in by cache layer
524523

525524

525+
def test_list_skips_pending_chunk(tmp_path):
526+
# list() skips a pending chunk and yields it once flushed.
527+
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
528+
repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet
529+
assert repository._chunks.is_pending(H(1))
530+
assert repository.list() == []
531+
repository.flush()
532+
assert [chunk_id for chunk_id, _ in repository.list()] == [H(1)]
533+
534+
535+
def test_get_pending_chunk_raises(tmp_path):
536+
# get() on a pending chunk raises PackLocationUnknown, also with raise_missing=False.
537+
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
538+
repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet
539+
assert repository._chunks.is_pending(H(1))
540+
with pytest.raises(Repository.PackLocationUnknown):
541+
repository.get(H(1))
542+
with pytest.raises(Repository.PackLocationUnknown):
543+
repository.get(H(1), raise_missing=False)
544+
repository.flush() # close() requires the buffer to be empty
545+
546+
547+
def test_flush_store_failure_drops_pending_entries(tmp_path):
548+
# flush() removes the pending index entries when storing the pack fails.
549+
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
550+
repository.put(H(1), fchunk(b"BUFFERED"))
551+
assert repository._chunks.is_pending(H(1))
552+
553+
def boom(*args, **kwargs):
554+
raise OSError("store failed")
555+
556+
repository._pack_writer.store.store = boom
557+
with pytest.raises(OSError):
558+
repository.flush()
559+
assert H(1) not in repository._chunks
560+
561+
526562
def test_check_detects_corruption_in_later_object(tmp_path):
527563
# Corruption anywhere in a multi-object pack must be caught, not just in the first object: the pack
528564
# is named by sha256(content), so flipping any byte makes its stored hash differ from its name.

0 commit comments

Comments
 (0)