Skip to content

Commit f5aba18

Browse files
Unify chunk rollback in process_file_chunks
Demonstrator (per review request): move the "changed while we read it" detection and reaction from FilesystemObjectProcessors.process_file down into ChunksProcessor.process_file_chunks, so that both the read-error and the changed-file rollback share a single rollback_uncommitted_chunks() helper (decref item.chunks[from_chunk:], i.e. the tail not yet referenced by a committed part item). process_file_chunks now takes fd/st/is_special_file/files_changed/last_try (keyword-only, default to "no change detection" for the pipe/tar/recreate callers) and returns (part_number, changed_while_backup). Tradeoff to evaluate: the rollback is now DRY and lives next to from_chunk, but ChunksProcessor (a generic chunk-stream processor) gains fs-file stat semantics (fstat2, ctime/mtime, files_changed) that arguably belong in FilesystemObjectProcessors, and the method grows a tuple return. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 7133066 commit f5aba18

1 file changed

Lines changed: 46 additions & 42 deletions

File tree

src/borg/archive.py

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,7 +1309,8 @@ def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
13091309
logger.info('checkpoint requested: finished checkpoint creation!')
13101310
return from_chunk, part_number
13111311

1312-
def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
1312+
def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None,
1313+
*, fd=None, st=None, is_special_file=False, files_changed=None, last_try=False):
13131314
if not chunk_processor:
13141315
def chunk_processor(chunk):
13151316
chunk_id, data = cached_hash(chunk, self.key.id_hash)
@@ -1324,6 +1325,15 @@ def chunk_processor(chunk):
13241325
del item.chunks_healthy
13251326
from_chunk = 0
13261327
part_number = 1
1328+
1329+
def rollback_uncommitted_chunks():
1330+
# roll back the chunks we added since the last checkpoint: they are not referenced by any
1331+
# (committed) part item, so they would otherwise leak (bad refcount / orphan chunk). the
1332+
# chunks before from_chunk are referenced by part files we already wrote, so we keep them.
1333+
for chunk in item.chunks[from_chunk:]:
1334+
cache.chunk_decref(chunk.id, stats)
1335+
item.chunks = item.chunks[:from_chunk]
1336+
13271337
try:
13281338
for chunk in chunk_iter:
13291339
item.chunks.append(chunk_processor(chunk))
@@ -1344,16 +1354,33 @@ def chunk_processor(chunk):
13441354
cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
13451355
stats.nfiles_parts += part_number - 1
13461356
except BackupOSError:
1347-
# a read error happened after we already read (and added to the repo/cache) some chunks.
1348-
# the chunks we added since the last checkpoint (item.chunks[from_chunk:]) are not referenced
1349-
# by any (committed) part item, so they would leak (bad refcount / orphan chunk) - roll them
1350-
# back. the chunks before from_chunk are referenced by part items we already wrote, keep them.
1351-
for chunk in item.chunks[from_chunk:]:
1352-
cache.chunk_decref(chunk.id, stats)
1353-
item.chunks = item.chunks[:from_chunk]
1357+
# a read error happened after we already read+added some chunks: roll them back, then re-raise
1358+
# so the caller (Archiver._process_any) can retry or skip the file.
1359+
rollback_uncommitted_chunks()
13541360
raise
1355-
# part_number > 1 means we wrote part files (checkpoints) for this file:
1356-
return part_number
1361+
1362+
# did the file change while we read it? this is only checked for real fs files, i.e. when the
1363+
# caller (FilesystemObjectProcessors.process_file) passed the file descriptor and stat result.
1364+
changed_while_backup = False
1365+
if fd is not None and not is_win32 and not is_special_file and files_changed != 'disabled':
1366+
# special files: fifos change naturally; blk/chr devices don't change ctime anyway.
1367+
with backup_io('fstat2'):
1368+
st2 = os.fstat(fd)
1369+
if files_changed == 'ctime':
1370+
changed_while_backup = st.st_ctime_ns != st2.st_ctime_ns
1371+
elif files_changed == 'mtime':
1372+
changed_while_backup = st.st_mtime_ns != st2.st_mtime_ns
1373+
else:
1374+
raise ValueError('invalid files_changed value: %r' % files_changed)
1375+
if changed_while_backup and not last_try and part_number == 1:
1376+
# regular file changed while we backed it up, might be inconsistent/corrupt! it is not the
1377+
# last try and we did not write part files yet, so we trigger a retry by raising (hoping to
1378+
# get a consistent copy on re-read). roll back the chunks we added, like the read-error path.
1379+
rollback_uncommitted_chunks()
1380+
raise BackupError('file changed while we read it!')
1381+
# changed_while_backup (with last_try or part files written) means: keep it, but flag as "C".
1382+
# part_number > 1 means we wrote part files (checkpoints) for this file.
1383+
return part_number, changed_while_backup
13571384

13581385

13591386
class FilesystemObjectProcessors:
@@ -1543,42 +1570,19 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal,
15431570
if chunks is not None:
15441571
item.chunks = chunks
15451572
else:
1573+
# process_file_chunks does the read+chunking, the checkpoint/part-file handling,
1574+
# the "changed while we read it" detection and all the chunk rollback on errors
1575+
# (read error or changed file). it raises to trigger a retry (see _process_any),
1576+
# or returns changed_while_backup=True for the "keep but flag as C" case.
15461577
with backup_io('read'):
1547-
part_number = self.process_file_chunks(
1578+
part_number, changed_while_backup = self.process_file_chunks(
15481579
item, cache, self.stats, self.show_progress,
1549-
backup_io_iter(self.chunker.chunkify(None, fd)))
1550-
if is_win32:
1551-
changed_while_backup = False # TODO
1552-
else:
1553-
with backup_io('fstat2'):
1554-
st2 = os.fstat(fd)
1555-
# special files:
1556-
# - fifos change naturally, because they are fed from the other side. no problem.
1557-
# - blk/chr devices don't change ctime anyway.
1558-
if self.files_changed == 'disabled' or is_special_file:
1559-
changed_while_backup = False
1560-
elif self.files_changed == 'ctime':
1561-
changed_while_backup = st.st_ctime_ns != st2.st_ctime_ns
1562-
elif self.files_changed == 'mtime':
1563-
changed_while_backup = st.st_mtime_ns != st2.st_mtime_ns
1564-
else:
1565-
raise ValueError('invalid files_changed value: %r' % self.files_changed)
1580+
backup_io_iter(self.chunker.chunkify(None, fd)),
1581+
fd=fd, st=st, is_special_file=is_special_file,
1582+
files_changed=self.files_changed, last_try=last_try)
15661583
if changed_while_backup:
15671584
# regular file changed while we backed it up, might be inconsistent/corrupt!
1568-
if last_try or part_number > 1:
1569-
# this was the last try or we already wrote part files (checkpoints) for
1570-
# this file. in both cases we must not retry (re-reading the file from the
1571-
# start would create duplicate / inconsistent part files), so we keep what
1572-
# we have and flag it as potentially inconsistent/corrupt.
1573-
status = 'C'
1574-
else:
1575-
# not the last try and no part files written yet: trigger a retry by raising,
1576-
# hoping that re-reading the file gives us a consistent copy. the retry is
1577-
# done by the caller (Archiver._process_any).
1578-
for chunk in item.chunks:
1579-
cache.chunk_decref(chunk.id, self.stats)
1580-
item.chunks = []
1581-
raise BackupError('file changed while we read it!')
1585+
status = 'C'
15821586
if not is_special_file and not changed_while_backup:
15831587
# we must not memorize special files, because the contents of e.g. a
15841588
# block or char device will change without its mtime/size/inode changing.

0 commit comments

Comments
 (0)