From 7097ea914fcb020a4e8624501794ce11ee429e26 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 25 Jun 2026 22:11:31 +0200 Subject: [PATCH 1/5] create: implement retries for individual fs files (backport of #7351) Backport of #7351 to 1.4-maint. Retry reading a backup source file when it raises a BackupError / BackupOSError (e.g. transient I/O error, or the file changed while we read it): - retry the same file after some sleep time (1ms, growing exponentially up to 10s, 10 tries) - BackupOSError: if retries do not help, skip the file, log it with "E" status - BackupError (changed while reading): last try backs it up, logged with "C" status Works for borg create's builtin fs recursion, --paths-from-command and --paths-from-stdin. 1.4-specific: do NOT retry a file once checkpoint part files (.borg_part_N) have been written for it. Re-reading it from the start would create duplicate / inconsistent part files (concatenating all part files would then no longer yield the complete file). ChunksProcessor.last_part_number tracks this; such a file is kept as-is ("C") or skipped ("E") instead of being retried. Permission errors (EPERM/EACCES) are not retried (they won't get better), so we don't waste ~15s of backoff on every unreadable file. Adds a ChunkerFailing test chunker (chunker-params=fail,block_size,map) to drive the error handling, and tests: - test_create_erroneous_file: file is read on a later retry - test_create_no_permission_file: permission error is not retried, file skipped - test_create_erroneous_file_with_part_files: file with part files is not retried Co-Authored-By: Claude Opus 4.8 --- src/borg/archive.py | 36 +++++++-- src/borg/archiver.py | 131 ++++++++++++++++++++------------ src/borg/chunker.pyx | 48 ++++++++++++ src/borg/constants.py | 1 + src/borg/helpers/parseformat.py | 4 + src/borg/testsuite/archiver.py | 102 ++++++++++++++++++++++++- 6 files changed, 266 insertions(+), 56 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 20cef9f677..b8536c87db 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1274,6 +1274,11 @@ def __init__(self, *, key, cache, self.checkpoint_interval = checkpoint_interval self.last_checkpoint = time.monotonic() self.rechunkify = rechunkify + # number of the last checkpoint part file we wrote for the file currently being processed. + # 0 means "no part file written (yet)". used to decide whether a file may still be retried: + # once we have written part files, re-reading the file from the start would create duplicate + # part files, so such a file must not be retried (see Archiver._process_any). + self.last_part_number = 0 def write_part_file(self, item, from_chunk, number): item = Item(internal_dict=item.as_dict()) @@ -1285,10 +1290,11 @@ def write_part_file(self, item, from_chunk, number): item.get_size(memorize=True, from_chunks=True) item.path += '.borg_part_%d' % number item.part = number - number += 1 self.add_item(item, show_progress=False) self.write_checkpoint() - return length, number + # remember that we have committed a part file for the current file, so it must not be retried: + self.last_part_number = number + return length, number + 1 def maybe_checkpoint(self, item, from_chunk, part_number, forced=False): sig_int_triggered = sig_int and sig_int.action_triggered() @@ -1336,6 +1342,8 @@ def chunk_processor(chunk): for chunk in item.chunks: cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True) stats.nfiles_parts += part_number - 1 + # part_number > 1 means we wrote part files (checkpoints) for this file: + return part_number class FilesystemObjectProcessors: @@ -1344,7 +1352,7 @@ class FilesystemObjectProcessors: # and process_file becomes a callback passed to __init__. def __init__(self, *, metadata_collector, cache, key, - add_item, process_file_chunks, + add_item, process_file_chunks, chunks_processor, chunker_params, show_progress, sparse, log_json, iec, file_status_printer=None, files_changed='ctime'): @@ -1353,6 +1361,9 @@ def __init__(self, *, metadata_collector, cache, key, self.key = key self.add_item = add_item self.process_file_chunks = process_file_chunks + # the ChunksProcessor (also providing process_file_chunks) - we need it to find out whether + # checkpoint part files have already been written for the file currently being processed. + self.chunks_processor = chunks_processor self.show_progress = show_progress self.print_file_status = file_status_printer or (lambda *args: None) self.files_changed = files_changed @@ -1478,7 +1489,7 @@ def process_pipe(self, *, path, cache, fd, mode, user, group): self.add_item(item, stats=self.stats) return status - def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, strip_prefix): + def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, strip_prefix, last_try=False): with self.create_helper(path, st, None, strip_prefix=strip_prefix) as (item, status, hardlinked, hardlink_master): # no status yet if item is None: return status @@ -1523,7 +1534,9 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, item.chunks = chunks else: with backup_io('read'): - self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(None, fd))) + part_number = self.process_file_chunks( + item, cache, self.stats, self.show_progress, + backup_io_iter(self.chunker.chunkify(None, fd))) if is_win32: changed_while_backup = False # TODO else: @@ -1541,7 +1554,18 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, else: raise ValueError('invalid files_changed value: %r' % self.files_changed) if changed_while_backup: - status = 'C' # regular file changed while we backed it up, might be inconsistent/corrupt! + # regular file changed while we backed it up, might be inconsistent/corrupt! + if last_try or part_number > 1: + # this was the last try or we already wrote part files (checkpoints) for + # this file. in both cases we must not retry (re-reading the file from the + # start would create duplicate / inconsistent part files), so we keep what + # we have and flag it as potentially inconsistent/corrupt. + status = 'C' + else: + # not the last try and no part files written yet: trigger a retry by raising, + # hoping that re-reading the file gives us a consistent copy. the retry is + # done by the caller (Archiver._process_any). + raise BackupError('file changed while we read it!') if not is_special_file and not changed_while_backup: # we must not memorize special files, because the contents of e.g. a # block or char device will change without its mtime/size/inode changing. diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 60c0d0e6d9..0779c0aa28 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -16,6 +16,7 @@ import argparse import collections import configparser + import errno import faulthandler import functools import inspect @@ -730,7 +731,7 @@ def create_inner(archive, cache, fso): add_item=archive.add_item, write_checkpoint=archive.write_checkpoint, checkpoint_interval=args.checkpoint_interval, rechunkify=False) fso = FilesystemObjectProcessors(metadata_collector=metadata_collector, cache=cache, key=key, - process_file_chunks=cp.process_file_chunks, add_item=archive.add_item, + process_file_chunks=cp.process_file_chunks, chunks_processor=cp, add_item=archive.add_item, chunker_params=args.chunker_params, show_progress=args.progress, sparse=args.sparse, log_json=args.log_json, iec=args.iec, file_status_printer=self.print_file_status, files_changed=args.files_changed) @@ -745,55 +746,87 @@ def _process_any(self, *, path, parent_fd, name, st, fso, cache, read_special, d if dry_run: return '-' - elif stat.S_ISREG(st.st_mode): - return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, strip_prefix=strip_prefix) - elif stat.S_ISDIR(st.st_mode): - return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) - elif stat.S_ISLNK(st.st_mode): - if not read_special: - return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) - else: - try: - st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True) - except OSError: - special = False - else: - special = is_special(st_target.st_mode) - if special: - return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st_target, - cache=cache, flags=flags_special_follow, strip_prefix=strip_prefix) + # the ChunksProcessor remembers (in last_part_number) whether it has already written checkpoint + # part files for the file we are about to process. we must not retry a file once part files have + # been written for it, because re-reading it from the start would create duplicate / inconsistent + # part files (concatenating all part files would then not yield the complete file any more). + cp = fso.chunks_processor # the ChunksProcessor that fso uses + MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0") + for retry in range(MAX_RETRIES): + cp.last_part_number = 0 # reset part file tracking for this (re)try of the file + last_try = retry == MAX_RETRIES - 1 + try: + if stat.S_ISREG(st.st_mode): + return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, + strip_prefix=strip_prefix, last_try=last_try) + elif stat.S_ISDIR(st.st_mode): + return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) + elif stat.S_ISLNK(st.st_mode): + if not read_special: + return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) + else: + try: + st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True) + except OSError: + special = False + else: + special = is_special(st_target.st_mode) + if special: + return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st_target, + cache=cache, flags=flags_special_follow, strip_prefix=strip_prefix, + last_try=last_try) + else: + return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) + elif stat.S_ISFIFO(st.st_mode): + if not read_special: + return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) + else: + return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, + cache=cache, flags=flags_special, strip_prefix=strip_prefix, last_try=last_try) + elif stat.S_ISCHR(st.st_mode): + if not read_special: + return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='c', strip_prefix=strip_prefix) + else: + return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, + cache=cache, flags=flags_special, strip_prefix=strip_prefix, last_try=last_try) + elif stat.S_ISBLK(st.st_mode): + if not read_special: + return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='b', strip_prefix=strip_prefix) + else: + return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, + cache=cache, flags=flags_special, strip_prefix=strip_prefix, last_try=last_try) + elif stat.S_ISSOCK(st.st_mode): + # Ignore unix sockets + return + elif stat.S_ISDOOR(st.st_mode): + # Ignore Solaris doors + return + elif stat.S_ISPORT(st.st_mode): + # Ignore Solaris event ports + return else: - return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) - elif stat.S_ISFIFO(st.st_mode): - if not read_special: - return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) - else: - return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, - cache=cache, flags=flags_special, strip_prefix=strip_prefix) - elif stat.S_ISCHR(st.st_mode): - if not read_special: - return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='c', strip_prefix=strip_prefix) - else: - return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, - cache=cache, flags=flags_special, strip_prefix=strip_prefix) - elif stat.S_ISBLK(st.st_mode): - if not read_special: - return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='b', strip_prefix=strip_prefix) - else: - return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, - cache=cache, flags=flags_special, strip_prefix=strip_prefix) - elif stat.S_ISSOCK(st.st_mode): - # Ignore unix sockets - return - elif stat.S_ISDOOR(st.st_mode): - # Ignore Solaris doors - return - elif stat.S_ISPORT(st.st_mode): - # Ignore Solaris event ports - return - else: - self.print_warning('Unknown file type: %s', path) - return + self.print_warning('Unknown file type: %s', path) + return + except BackupError as err: + if getattr(err, 'errno', None) in (errno.EPERM, errno.EACCES): + # permission errors will not get better when retrying, so don't waste time on it. + raise + if last_try or cp.last_part_number > 0: + # giving up: this was the last try, or we have already written part files for this + # file and thus must not retry. the error will be dealt with (logged) by the caller. + raise + # sleep a bit, so temporary problems (e.g. a file briefly being inaccessible or + # being modified) might go away before we try to read the file again... + sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ... + time.sleep(sleep_s) + logger.warning( + f'{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}...' + ) + # we better do a fresh stat on the file, just to make sure to get the current file + # mode right (which could have changed due to a race condition and is important for + # dispatching) and also to get current inode number of that file. + with backup_io('stat'): + st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False) def _rec_walk(self, *, path, parent_fd, name, fso, cache, matcher, exclude_caches, exclude_if_present, keep_exclude_tags, diff --git a/src/borg/chunker.pyx b/src/borg/chunker.pyx index 6acc325ff8..a1d88f1ebd 100644 --- a/src/borg/chunker.pyx +++ b/src/borg/chunker.pyx @@ -124,6 +124,52 @@ def sparsemap(fd=None, fh=-1): dseek(curr, os.SEEK_SET, fd, fh) +class ChunkerFailing: + """ + This is a very simple chunker for testing purposes. + + Reads block_size sized blocks. The map parameter controls the behaviour for each + block: 'R' (or 'r') == successful read, 'E' (or 'e') == I/O Error. Blocks beyond + the end of the map will behave like the last map character indicates. + """ + def __init__(self, block_size, map): + self.block_size = block_size + # one char per block: r/R = successful read, e/E = I/O Error, e.g.: "rrrrErrrEEr" + # blocks beyond the map will behave like the last map char indicates. + map = map.upper() + if not set(map).issubset({'R', 'E'}): + raise ValueError('unsupported map character') + self.map = map + self.count = 0 + self.chunking_time = 0.0 # not updated, just provided so that caller does not crash + + def chunkify(self, fd=None, fh=-1, fmap=None): + """ + Cut a file into chunks. + + :param fd: Python file object + :param fh: OS-level file handle (if available), + defaults to -1 which means not to use OS-level fd. + """ + use_fh = fh >= 0 + wanted = self.block_size + while True: + data = os.read(fh, wanted) if use_fh else fd.read(wanted) + got = len(data) + if got > 0: + idx = self.count if self.count < len(self.map) else -1 + behaviour = self.map[idx] + self.count += 1 + if behaviour == 'E': + fname = None if use_fh else getattr(fd, 'name', None) + raise OSError(errno.EIO, 'simulated I/O error', fname) + elif behaviour == 'R': + yield Chunk(data, size=got, allocation=CH_DATA) + if got < wanted: + # we did not get enough data, looks like EOF. + return + + class ChunkerFixed: """ This is a simple chunker for input data with data usually staying at same @@ -289,6 +335,8 @@ def get_chunker(algo, *params, **kw): if algo == 'fixed': sparse = kw['sparse'] return ChunkerFixed(*params, sparse=sparse) + if algo == 'fail': + return ChunkerFailing(*params) raise TypeError('unsupported chunker algo %r' % algo) diff --git a/src/borg/constants.py b/src/borg/constants.py index 9fc572fee6..d4fef17f40 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -78,6 +78,7 @@ # chunker algorithms CH_BUZHASH = 'buzhash' CH_FIXED = 'fixed' +CH_FAIL = 'fail' # only for testing the error handling of borg create # defaults, use --chunker-params to override CHUNKER_PARAMS = (CH_BUZHASH, CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE) diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index cf5e117e55..25f89a688d 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -124,6 +124,10 @@ def ChunkerParams(s): if block_size > MAX_DATA_SIZE or header_size > MAX_DATA_SIZE: raise argparse.ArgumentTypeError('block_size and header_size must not exceed MAX_DATA_SIZE [%d]' % MAX_DATA_SIZE) return algo, block_size, header_size + if algo == CH_FAIL and count == 3: # fail, block_size, map (only for testing the error handling) + block_size = int(params[1]) + map = str(params[2]) + return algo, block_size, map if algo == 'default' and count == 1: # default return CHUNKER_PARAMS # this must stay last as it deals with old-style compat mode (no algorithm, 4 params, buzhash): diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index dc07367fdc..f5bbb2e2b9 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -30,7 +30,7 @@ import borg import borg.helpers.errors from .. import xattr, helpers, platform -from ..archive import Archive, ChunkBuffer +from ..archive import Archive, ChunkBuffer, ChunksProcessor from ..archiver import Archiver, parse_storage_quota, PURE_PYTHON_MSGPACK_WARNING from ..cache import Cache, LocalCache from ..chunker import has_seek_hole @@ -1237,6 +1237,106 @@ def test_create_paths_from_stdin(self): paths = [json.loads(line)['path'] for line in archive_list.split('\n') if line] assert paths == ['input/file1', 'input/dir1', 'input/file4'] + def test_create_erroneous_file(self): + chunk_size = 1000 # fixed chunker with this block size, fail chunker reads same size blocks + self.create_regular_file('file1', size=chunk_size * 2) + self.create_regular_file('file2', size=chunk_size * 2) + self.create_regular_file('file3', size=chunk_size * 2) + self.cmd('init', '--encryption=repokey', self.repository_location) + flist = ''.join(f'input/file{n}\n' for n in range(1, 4)) + out = self.cmd( + 'create', + f'--chunker-params=fail,{chunk_size},rrrEEErrrr', + '--paths-from-stdin', + '--list', + self.repository_location + '::test', + input=flist.encode(), + exit_code=0, + ) + assert 'retry: 3 of ' in out + assert 'E input/file2' not in out # we managed to read it in the 3rd retry (after 3 failed reads) + # repo looking good overall? checks for rc == 0. + self.cmd('check', '--debug', self.repository_location) + # check files in created archive + out = self.cmd('list', self.repository_location + '::test') + assert 'input/file1' in out + assert 'input/file2' in out + assert 'input/file3' in out + + def test_create_erroneous_file_with_part_files(self): + # if we have already written part files (checkpoints) for a file, a later read error must + # NOT trigger a retry: re-reading the file from the start would create duplicate / inconsistent + # part files (concatenating all part files would then not yield the complete file any more). + # so such a file must be skipped (status "E") instead of being retried. + chunk_size = 1000 + self.create_regular_file('file1', size=chunk_size * 2) + self.create_regular_file('file2', size=chunk_size * 2) + self.create_regular_file('file3', size=chunk_size * 2) + self.cmd('init', '--encryption=repokey', self.repository_location) + flist = ''.join(f'input/file{n}\n' for n in range(1, 4)) + + orig_maybe_checkpoint = ChunksProcessor.maybe_checkpoint + + def always_checkpoint(self, item, from_chunk, part_number, forced=False): + # force writing a checkpoint part file after every chunk + return orig_maybe_checkpoint(self, item, from_chunk, part_number, forced=True) + + with patch.object(ChunksProcessor, 'maybe_checkpoint', always_checkpoint): + # file2: 1st block reads ok (a part file gets written for it), 2nd block fails with an + # I/O error. as a part file was already written, file2 must not be retried, but skipped. + out = self.cmd( + 'create', + f'--chunker-params=fail,{chunk_size},rrrErr', + '--paths-from-stdin', + '--list', + self.repository_location + '::test', + input=flist.encode(), + exit_code=1, + ) + assert 'retry: ' not in out # the read error happened after a part file was written -> no retry + assert 'E input/file2' in out # file2 was skipped, not retried into success + # repo looking good overall? checks for rc == 0. + self.cmd('check', '--debug', self.repository_location) + # file1 and file3 were backed up ok; file2 only exists as an orphan part file (hidden by default): + out = self.cmd('list', self.repository_location + '::test') + assert 'input/file1' in out + assert 'input/file2' not in out + assert 'input/file3' in out + + @pytest.mark.skipif(not is_win32 and os.getuid() == 0, reason='test must not be run as (fake)root') + def test_create_no_permission_file(self): + # retries are pointless for permission errors (they will not get better), so a file we have + # no read permission for must NOT be retried, but skipped right away (status "E"). + self.create_regular_file('file1', size=1000) + self.create_regular_file('file2', size=1000) + self.create_regular_file('file3', size=1000) + file2 = os.path.join(self.input_path, 'file2') + # revoke read permissions on file2 for everybody, including us: + if is_win32: + subprocess.run(['icacls.exe', file2, '/deny', 'everyone:(R)']) + else: + # note: this will NOT take away read permissions for root + os.chmod(file2, 0o000) + self.cmd('init', '--encryption=repokey', self.repository_location) + flist = ''.join(f'input/file{n}\n' for n in range(1, 4)) + out = self.cmd( + 'create', + '--paths-from-stdin', + '--list', + self.repository_location + '::test', + input=flist.encode(), + exit_code=EXIT_WARNING, # WARNING status: could not back up file2. + ) + assert 'retry: 1 of ' not in out # retries were NOT attempted! + assert 'E input/file2' in out # no permissions! + # repo looking good overall? checks for rc == 0. + self.cmd('check', '--debug', self.repository_location) + # check files in created archive + out = self.cmd('list', self.repository_location + '::test') + assert 'input/file1' in out + assert 'input/file2' not in out # it skipped file2 + assert 'input/file3' in out + def test_create_paths_from_command(self): self.cmd('init', '--encryption=repokey', self.repository_location) self.create_regular_file("file1", size=1024 * 80) From 50783603d2c3b32ec59e2ea098f81dafef2b9294 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 25 Jun 2026 23:10:50 +0200 Subject: [PATCH 2/5] Fix retry rollback for changed files Co-authored-by: Junie --- src/borg/archive.py | 3 +++ src/borg/testsuite/archiver.py | 35 ++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/borg/archive.py b/src/borg/archive.py index b8536c87db..b81c8e8bef 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1565,6 +1565,9 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, # not the last try and no part files written yet: trigger a retry by raising, # hoping that re-reading the file gives us a consistent copy. the retry is # done by the caller (Archiver._process_any). + for chunk in item.chunks: + cache.chunk_decref(chunk.id, self.stats) + item.chunks = [] raise BackupError('file changed while we read it!') if not is_special_file and not changed_while_backup: # we must not memorize special files, because the contents of e.g. a diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index f5bbb2e2b9..c53a4c7a0c 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -1263,6 +1263,41 @@ def test_create_erroneous_file(self): assert 'input/file2' in out assert 'input/file3' in out + def test_create_changed_file_retry_rolls_back_chunks(self): + self.create_regular_file('file1', contents=b'a' * 1000) + self.cmd('init', '--encryption=repokey', self.repository_location) + + orig_fstat = os.fstat + fstat_count = 0 + + def change_file_on_post_read_fstat(fd): + nonlocal fstat_count + st = orig_fstat(fd) + if stat.S_ISREG(st.st_mode): + fstat_count += 1 + if fstat_count == 2: + with open('input/file1', 'wb') as file_fd: + file_fd.write(b'b' * 1000) + os.utime('input/file1', ns=(st.st_atime_ns, st.st_mtime_ns + 1000**3)) + return orig_fstat(fd) + return st + + with patch('borg.archive.os.fstat', change_file_on_post_read_fstat): + out = self.cmd('create', '--list', self.repository_location + '::test', 'input') + + assert 'retry: 1 of ' in out + assert 'E input/file1' not in out + + with Repository(self.repository_path, exclusive=True) as repository: + manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) + archive = Archive(repository, key, manifest, 'test') + referenced_ids = {archive.id, *archive.metadata.items} + for item in archive.iter_items(): + referenced_ids.update(chunk.id for chunk in item.get('chunks', [])) + with Cache(repository, key, manifest) as cache: + cached_ids = {id for id, entry in cache.chunks.iteritems()} + assert cached_ids == referenced_ids + def test_create_erroneous_file_with_part_files(self): # if we have already written part files (checkpoints) for a file, a later read error must # NOT trigger a retry: re-reading the file from the start would create duplicate / inconsistent From ff0eb8767032d9f3a9961892772b9951339666e8 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 25 Jun 2026 23:26:45 +0200 Subject: [PATCH 3/5] Document retry source path assumption Co-authored-by: Junie --- src/borg/archiver.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 0779c0aa28..caaf4a22ad 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -822,9 +822,9 @@ def _process_any(self, *, path, parent_fd, name, st, fso, cache, read_special, d logger.warning( f'{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}...' ) - # we better do a fresh stat on the file, just to make sure to get the current file - # mode right (which could have changed due to a race condition and is important for - # dispatching) and also to get current inode number of that file. + # We retry by source path, assuming that this path or symlink target has not been + # exchanged against a different filesystem object while processing. Refresh the stat + # before dispatching again, so temporary changes are reflected in the next attempt. with backup_io('stat'): st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False) From 7133066c9e3918d5d3f9116b613d0df8e6bfa0ba Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 Jun 2026 00:05:06 +0200 Subject: [PATCH 4/5] Roll back chunks also on read errors (avoid bad refcounts) The changed-file retry path already rolls back the chunks it added before re-reading. Do the same when a read error (BackupOSError) interrupts reading a file: the chunks added since the last checkpoint (item.chunks[from_chunk:]) are not referenced by any committed part item, so without rolling them back they leak as an inflated chunk refcount (or, if content also shifted, an orphan chunk). The rollback lives in process_file_chunks, where from_chunk is known: chunks before from_chunk are referenced by part files we already wrote and must be kept; only the uncommitted tail is decref'd. Adds test_create_erroneous_file_read_retry_rolls_back_chunks: the re-read chunks dedup to the same ids, so the leak shows up as a too-high refcount (not an orphan id) - the test checks refcounts directly. Co-Authored-By: Claude Opus 4.8 --- src/borg/archive.py | 46 +++++++++++++++++++++------------- src/borg/testsuite/archiver.py | 27 ++++++++++++++++++++ 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index b81c8e8bef..4a24e78c06 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1324,24 +1324,34 @@ def chunk_processor(chunk): del item.chunks_healthy from_chunk = 0 part_number = 1 - for chunk in chunk_iter: - item.chunks.append(chunk_processor(chunk)) - if show_progress: - stats.show_progress(item=item, dt=0.2) - from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False) - else: - if part_number > 1: - if item.chunks[from_chunk:]: - # if we already have created a part item inside this file, we want to put the final - # chunks (if any) into a part item also (so all parts can be concatenated to get - # the complete file): - from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True) - - # if we created part files, we have referenced all chunks from the part files, - # but we also will reference the same chunks also from the final, complete file: - for chunk in item.chunks: - cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True) - stats.nfiles_parts += part_number - 1 + try: + for chunk in chunk_iter: + item.chunks.append(chunk_processor(chunk)) + if show_progress: + stats.show_progress(item=item, dt=0.2) + from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False) + else: + if part_number > 1: + if item.chunks[from_chunk:]: + # if we already have created a part item inside this file, we want to put the final + # chunks (if any) into a part item also (so all parts can be concatenated to get + # the complete file): + from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True) + + # if we created part files, we have referenced all chunks from the part files, + # but we also will reference the same chunks also from the final, complete file: + for chunk in item.chunks: + cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True) + stats.nfiles_parts += part_number - 1 + except BackupOSError: + # a read error happened after we already read (and added to the repo/cache) some chunks. + # the chunks we added since the last checkpoint (item.chunks[from_chunk:]) are not referenced + # by any (committed) part item, so they would leak (bad refcount / orphan chunk) - roll them + # back. the chunks before from_chunk are referenced by part items we already wrote, keep them. + for chunk in item.chunks[from_chunk:]: + cache.chunk_decref(chunk.id, stats) + item.chunks = item.chunks[:from_chunk] + raise # part_number > 1 means we wrote part files (checkpoints) for this file: return part_number diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index c53a4c7a0c..18624977d8 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -1298,6 +1298,33 @@ def change_file_on_post_read_fstat(fd): cached_ids = {id for id, entry in cache.chunks.iteritems()} assert cached_ids == referenced_ids + def test_create_erroneous_file_read_retry_rolls_back_chunks(self): + # a read error after we already read+added some chunks must roll those chunks back, so we do + # not end up with bad (too high) chunk refcounts. note: the re-read chunks dedup to the same + # ids, so a leak shows up as an inflated refcount (not as an orphan id), hence we check refcounts. + from collections import Counter + chunk_size = 1000 + # distinct content per block, so each block maps to its own (unique) chunk id: + self.create_regular_file('file', contents=b'A' * chunk_size + b'B' * chunk_size) + self.cmd('init', '--encryption=repokey', self.repository_location) + # rErr: read 1st block ok, 2nd block fails -> retry; on the retry both blocks read ok. + out = self.cmd('create', f'--chunker-params=fail,{chunk_size},rErr', '--list', + self.repository_location + '::test', 'input') + assert 'retry: 1 of ' in out + assert 'E input/file' not in out # it was backed up ok on the retry + + with Repository(self.repository_path, exclusive=True) as repository: + manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) + archive = Archive(repository, key, manifest, 'test') + item = [item for item in archive.iter_items() if item.path == 'input/file'][0] + item_chunk_refs = Counter(chunk.id for chunk in item.chunks) + with Cache(repository, key, manifest) as cache: + refcounts = {id: entry.refcount for id, entry in cache.chunks.iteritems()} + # each data chunk of the file must be referenced exactly as often as it occurs in the item - + # a leaked chunk from the failed read attempt would show up here as a too high refcount: + for chunk_id, refs in item_chunk_refs.items(): + assert refcounts[chunk_id] == refs + def test_create_erroneous_file_with_part_files(self): # if we have already written part files (checkpoints) for a file, a later read error must # NOT trigger a retry: re-reading the file from the start would create duplicate / inconsistent From ae3a2e067d2ba2ff1cf8309137d9eb3df77dcf9f Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 Jun 2026 18:44:54 +0200 Subject: [PATCH 5/5] Skip refcount-inspecting retry tests in remote variant test_create_changed_file_retry_rolls_back_chunks and test_create_erroneous_file_read_retry_rolls_back_chunks open a local Cache/Repository to inspect chunk refcounts. In RemoteArchiverTestCase the repo was created via a remote (__testsuite__:) location, so opening it locally triggers the "repository was previously located at ssh://..." relocation prompt, which reads stdin and fails under pytest. These checks are inherently local (they poke at cache internals), so skip them in the remote variant, like test_debug_put_get_delete_obj already does. Co-Authored-By: Claude Opus 4.8 --- src/borg/testsuite/archiver.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index 18624977d8..911fdfb03f 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -4992,6 +4992,14 @@ def test_remote_repo_restrict_to_repository(self): def test_debug_put_get_delete_obj(self): pass + @unittest.skip('only works locally') # inspects cache chunk refcounts via a local Cache/Repository + def test_create_changed_file_retry_rolls_back_chunks(self): + pass + + @unittest.skip('only works locally') # inspects cache chunk refcounts via a local Cache/Repository + def test_create_erroneous_file_read_retry_rolls_back_chunks(self): + pass + @unittest.skip('only works locally') def test_config(self): pass