Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 61 additions & 24 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,11 @@ def __init__(self, *, key, cache,
self.checkpoint_interval = checkpoint_interval
self.last_checkpoint = time.monotonic()
self.rechunkify = rechunkify
# number of the last checkpoint part file we wrote for the file currently being processed.
# 0 means "no part file written (yet)". used to decide whether a file may still be retried:
# once we have written part files, re-reading the file from the start would create duplicate
# part files, so such a file must not be retried (see Archiver._process_any).
self.last_part_number = 0

def write_part_file(self, item, from_chunk, number):
item = Item(internal_dict=item.as_dict())
Expand All @@ -1285,10 +1290,11 @@ def write_part_file(self, item, from_chunk, number):
item.get_size(memorize=True, from_chunks=True)
item.path += '.borg_part_%d' % number
item.part = number
number += 1
self.add_item(item, show_progress=False)
self.write_checkpoint()
return length, number
# remember that we have committed a part file for the current file, so it must not be retried:
self.last_part_number = number
return length, number + 1

def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
sig_int_triggered = sig_int and sig_int.action_triggered()
Expand Down Expand Up @@ -1318,24 +1324,36 @@ def chunk_processor(chunk):
del item.chunks_healthy
from_chunk = 0
part_number = 1
for chunk in chunk_iter:
item.chunks.append(chunk_processor(chunk))
if show_progress:
stats.show_progress(item=item, dt=0.2)
from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
else:
if part_number > 1:
if item.chunks[from_chunk:]:
# if we already have created a part item inside this file, we want to put the final
# chunks (if any) into a part item also (so all parts can be concatenated to get
# the complete file):
from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)

# if we created part files, we have referenced all chunks from the part files,
# but we also will reference the same chunks also from the final, complete file:
for chunk in item.chunks:
cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
stats.nfiles_parts += part_number - 1
try:
for chunk in chunk_iter:
item.chunks.append(chunk_processor(chunk))
if show_progress:
stats.show_progress(item=item, dt=0.2)
from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
else:
if part_number > 1:
if item.chunks[from_chunk:]:
# if we already have created a part item inside this file, we want to put the final
# chunks (if any) into a part item also (so all parts can be concatenated to get
# the complete file):
from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)

# if we created part files, we have referenced all chunks from the part files,
# but we also will reference the same chunks also from the final, complete file:
for chunk in item.chunks:
cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
stats.nfiles_parts += part_number - 1
except BackupOSError:
# a read error happened after we already read (and added to the repo/cache) some chunks.
# the chunks we added since the last checkpoint (item.chunks[from_chunk:]) are not referenced
# by any (committed) part item, so they would leak (bad refcount / orphan chunk) - roll them
# back. the chunks before from_chunk are referenced by part items we already wrote, keep them.
for chunk in item.chunks[from_chunk:]:
cache.chunk_decref(chunk.id, stats)
item.chunks = item.chunks[:from_chunk]
raise
# part_number > 1 means we wrote part files (checkpoints) for this file:
return part_number


class FilesystemObjectProcessors:
Expand All @@ -1344,7 +1362,7 @@ class FilesystemObjectProcessors:
# and process_file becomes a callback passed to __init__.

def __init__(self, *, metadata_collector, cache, key,
add_item, process_file_chunks,
add_item, process_file_chunks, chunks_processor,
chunker_params, show_progress, sparse,
log_json, iec, file_status_printer=None,
files_changed='ctime'):
Expand All @@ -1353,6 +1371,9 @@ def __init__(self, *, metadata_collector, cache, key,
self.key = key
self.add_item = add_item
self.process_file_chunks = process_file_chunks
# the ChunksProcessor (also providing process_file_chunks) - we need it to find out whether
# checkpoint part files have already been written for the file currently being processed.
self.chunks_processor = chunks_processor
self.show_progress = show_progress
self.print_file_status = file_status_printer or (lambda *args: None)
self.files_changed = files_changed
Expand Down Expand Up @@ -1478,7 +1499,7 @@ def process_pipe(self, *, path, cache, fd, mode, user, group):
self.add_item(item, stats=self.stats)
return status

def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, strip_prefix):
def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, strip_prefix, last_try=False):
with self.create_helper(path, st, None, strip_prefix=strip_prefix) as (item, status, hardlinked, hardlink_master): # no status yet
if item is None:
return status
Expand Down Expand Up @@ -1523,7 +1544,9 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal,
item.chunks = chunks
else:
with backup_io('read'):
self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(None, fd)))
part_number = self.process_file_chunks(
item, cache, self.stats, self.show_progress,
backup_io_iter(self.chunker.chunkify(None, fd)))
if is_win32:
changed_while_backup = False # TODO
else:
Expand All @@ -1541,7 +1564,21 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal,
else:
raise ValueError('invalid files_changed value: %r' % self.files_changed)
if changed_while_backup:
status = 'C' # regular file changed while we backed it up, might be inconsistent/corrupt!
# regular file changed while we backed it up, might be inconsistent/corrupt!
if last_try or part_number > 1:
# this was the last try or we already wrote part files (checkpoints) for
# this file. in both cases we must not retry (re-reading the file from the
# start would create duplicate / inconsistent part files), so we keep what
# we have and flag it as potentially inconsistent/corrupt.
status = 'C'
else:
# not the last try and no part files written yet: trigger a retry by raising,
# hoping that re-reading the file gives us a consistent copy. the retry is
# done by the caller (Archiver._process_any).
for chunk in item.chunks:
cache.chunk_decref(chunk.id, self.stats)
item.chunks = []
raise BackupError('file changed while we read it!')
if not is_special_file and not changed_while_backup:
# we must not memorize special files, because the contents of e.g. a
# block or char device will change without its mtime/size/inode changing.
Expand Down
131 changes: 82 additions & 49 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import argparse
import collections
import configparser
import errno
import faulthandler
import functools
import inspect
Expand Down Expand Up @@ -730,7 +731,7 @@ def create_inner(archive, cache, fso):
add_item=archive.add_item, write_checkpoint=archive.write_checkpoint,
checkpoint_interval=args.checkpoint_interval, rechunkify=False)
fso = FilesystemObjectProcessors(metadata_collector=metadata_collector, cache=cache, key=key,
process_file_chunks=cp.process_file_chunks, add_item=archive.add_item,
process_file_chunks=cp.process_file_chunks, chunks_processor=cp, add_item=archive.add_item,
chunker_params=args.chunker_params, show_progress=args.progress, sparse=args.sparse,
log_json=args.log_json, iec=args.iec, file_status_printer=self.print_file_status,
files_changed=args.files_changed)
Expand All @@ -745,55 +746,87 @@ def _process_any(self, *, path, parent_fd, name, st, fso, cache, read_special, d

if dry_run:
return '-'
elif stat.S_ISREG(st.st_mode):
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, strip_prefix=strip_prefix)
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st_target,
cache=cache, flags=flags_special_follow, strip_prefix=strip_prefix)
# the ChunksProcessor remembers (in last_part_number) whether it has already written checkpoint
# part files for the file we are about to process. we must not retry a file once part files have
# been written for it, because re-reading it from the start would create duplicate / inconsistent
# part files (concatenating all part files would then not yield the complete file any more).
cp = fso.chunks_processor # the ChunksProcessor that fso uses
MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0")
for retry in range(MAX_RETRIES):
cp.last_part_number = 0 # reset part file tracking for this (re)try of the file
last_try = retry == MAX_RETRIES - 1
try:
if stat.S_ISREG(st.st_mode):
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, cache=cache,
strip_prefix=strip_prefix, last_try=last_try)
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st_target,
cache=cache, flags=flags_special_follow, strip_prefix=strip_prefix,
last_try=last_try)
else:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
else:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st,
cache=cache, flags=flags_special, strip_prefix=strip_prefix, last_try=last_try)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='c', strip_prefix=strip_prefix)
else:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st,
cache=cache, flags=flags_special, strip_prefix=strip_prefix, last_try=last_try)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='b', strip_prefix=strip_prefix)
else:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st,
cache=cache, flags=flags_special, strip_prefix=strip_prefix, last_try=last_try)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
elif stat.S_ISDOOR(st.st_mode):
# Ignore Solaris doors
return
elif stat.S_ISPORT(st.st_mode):
# Ignore Solaris event ports
return
else:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
else:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st,
cache=cache, flags=flags_special, strip_prefix=strip_prefix)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='c', strip_prefix=strip_prefix)
else:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st,
cache=cache, flags=flags_special, strip_prefix=strip_prefix)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type='b', strip_prefix=strip_prefix)
else:
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st,
cache=cache, flags=flags_special, strip_prefix=strip_prefix)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
elif stat.S_ISDOOR(st.st_mode):
# Ignore Solaris doors
return
elif stat.S_ISPORT(st.st_mode):
# Ignore Solaris event ports
return
else:
self.print_warning('Unknown file type: %s', path)
return
self.print_warning('Unknown file type: %s', path)
return
except BackupError as err:
if getattr(err, 'errno', None) in (errno.EPERM, errno.EACCES):
# permission errors will not get better when retrying, so don't waste time on it.
raise
if last_try or cp.last_part_number > 0:
# giving up: this was the last try, or we have already written part files for this
# file and thus must not retry. the error will be dealt with (logged) by the caller.
raise
# sleep a bit, so temporary problems (e.g. a file briefly being inaccessible or
# being modified) might go away before we try to read the file again...
sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ...
time.sleep(sleep_s)
logger.warning(
f'{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}...'
)
# We retry by source path, assuming that this path or symlink target has not been
# exchanged against a different filesystem object while processing. Refresh the stat
# before dispatching again, so temporary changes are reflected in the next attempt.
with backup_io('stat'):
st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)

def _rec_walk(self, *, path, parent_fd, name, fso, cache, matcher,
exclude_caches, exclude_if_present, keep_exclude_tags,
Expand Down
Loading
Loading