Skip to content

Commit ad1b713

Browse files
committed
db migration: freeze 2to3 implementation
1 parent cd83d9e commit ad1b713

7 files changed

Lines changed: 414 additions & 64 deletions

File tree

prime_backup/action/helpers/fileset_allocator.py

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from prime_backup.db import schema
99
from prime_backup.db.session import DbSession
1010
from prime_backup.db.values import FileRole
11+
from prime_backup.utils import collection_utils
1112
from prime_backup.utils.lru_dict import LruDict
1213

1314
if TYPE_CHECKING:
@@ -64,26 +65,12 @@ def __init__(self, session: DbSession, files: List[schema.File]):
6465
self.files = files
6566
self.__fileset_files_cache: Optional[FilesetAllocator.FilesetFileCache] = None
6667

67-
@dataclasses.dataclass(frozen=True)
68-
class Delta:
69-
@dataclasses.dataclass(frozen=True)
70-
class OldNewFile:
71-
old: schema.File
72-
new: schema.File
73-
74-
added: List[schema.File] = dataclasses.field(default_factory=list) # list of new
75-
removed: List[schema.File] = dataclasses.field(default_factory=list) # list of old
76-
changed: List[OldNewFile] = dataclasses.field(default_factory=list) # list of (old, new)
77-
78-
def size(self) -> int:
79-
return len(self.added) + len(self.removed) + len(self.changed)
80-
8168
@classmethod
8269
def __get_file_by_path(cls, files: List[schema.File]) -> Dict[str, schema.File]:
8370
return {f.path: f for f in files}
8471

8572
@classmethod
86-
def __are_files_content_equaled(cls, a: schema.File, b: schema.File):
73+
def __are_files_content_equaled(cls, a: schema.File, b: schema.File) -> bool:
8774
return (
8875
a.path == b.path and a.mode == b.mode and
8976
a.content == b.content and a.blob_hash == b.blob_hash and
@@ -92,19 +79,8 @@ def __are_files_content_equaled(cls, a: schema.File, b: schema.File):
9279
)
9380

9481
@classmethod
95-
def __calc_delta(cls, old: Dict[str, schema.File], new: Dict[str, schema.File]) -> Delta:
96-
delta = cls.Delta()
97-
for path, old_file in old.items():
98-
if path in new:
99-
new_file = new[path]
100-
if not cls.__are_files_content_equaled(new_file, old_file):
101-
delta.changed.append(cls.Delta.OldNewFile(old_file, new_file))
102-
else:
103-
delta.removed.append(old_file)
104-
for path in new.keys():
105-
if path not in old:
106-
delta.added.append(new[path])
107-
return delta
82+
def __calc_delta(cls, old: Dict[str, schema.File], new: Dict[str, schema.File]) -> collection_utils.DictValueDelta[schema.File]:
83+
return collection_utils.compute_dict_value_delta(old, new, cmp=cls.__are_files_content_equaled)
10884

10985
def enable_fileset_files_cache(self, cache: FilesetFileCache):
11086
self.__fileset_files_cache = cache
@@ -123,7 +99,7 @@ def allocate(self, args: FilesetAllocateArgs) -> FilesetAllocateResult:
12399
class Candidate:
124100
fileset: schema.Fileset
125101
file_by_path: Dict[str, schema.File]
126-
delta: FilesetAllocator.Delta
102+
delta: collection_utils.DictValueDelta[schema.File]
127103
delta_size: int
128104

129105
c: Optional[Candidate] = None

prime_backup/cli/cli_entrypoint.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import argparse
2+
import logging
23
from typing import List, Dict
34

45
from prime_backup.cli import cli_utils
@@ -59,6 +60,7 @@ def main(self):
5960
parser = argparse.ArgumentParser(description='Prime Backup v{} CLI tools'.format(cli_utils.get_plugin_version()), formatter_class=argparse.ArgumentDefaultsHelpFormatter)
6061
parser.add_argument('-d', '--db', default=_DEFAULT_STORAGE_ROOT, help='Path to the {db} database file, or path to the directory that contains the {db} database file, e.g. "/my/path/{db}", or "/my/path"'.format(db=db_constants.DB_FILE_NAME))
6162
parser.add_argument('--version', action='store_true', help='Show version and exit')
63+
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
6264
subparsers = parser.add_subparsers(title='Command', help='Available commands', dest='command')
6365

6466
for adapter in self.adaptors.values():
@@ -69,6 +71,9 @@ def main(self):
6971
if args.version:
7072
print('Prime Backup v{}'.format(cli_utils.get_plugin_version()))
7173
return
74+
if args.debug:
75+
self.logger.setLevel(logging.DEBUG)
76+
Config.get().debug = True
7277
if args.command is None:
7378
parser.print_help()
7479
return
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from prime_backup.db.migrations.migration_2_3.migration_2_3 import MigrationImpl2To3 as MigrationImpl2To3
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
import collections
2+
import dataclasses
3+
import enum
4+
from typing import List, Optional, Dict, Tuple, Iterable
5+
6+
from prime_backup import logger
7+
from prime_backup.db.migrations.migration_2_3._v3_session import _V3DbSession
8+
from prime_backup.utils import collection_utils
9+
from prime_backup.utils.lru_dict import LruDict
10+
11+
12+
class _V3FileRole(enum.IntEnum):
13+
unknown = 0
14+
standalone = 1
15+
delta_override = 2
16+
delta_add = 3
17+
delta_remove = 4
18+
19+
20+
def _sum_file_sizes(files: Iterable[dict]) -> Tuple[int, int]:
21+
file_raw_size_sum = 0
22+
file_stored_size_sum = 0
23+
24+
for f in files:
25+
file_raw_size_sum += f['blob_raw_size'] or 0
26+
file_stored_size_sum += f['blob_stored_size'] or 0
27+
28+
return file_raw_size_sum, file_stored_size_sum
29+
30+
31+
@dataclasses.dataclass(frozen=True)
32+
class _FilesetAllocateArgs:
33+
candidate_select_count: int = 2
34+
candidate_max_changes_ratio: float = 0.2
35+
max_delta_ratio: float = 1.5
36+
max_base_reuse_count: int = 100
37+
38+
39+
@dataclasses.dataclass(frozen=True)
40+
class _FilesetAllocateResult:
41+
fileset_base: dict
42+
fileset_delta: dict
43+
new_file_object_count: int
44+
45+
46+
class _V3FilesetAllocator:
47+
FilesetFileCache = LruDict[int, List[dict]]
48+
49+
def __init__(self, session: _V3DbSession, files: List[dict]):
50+
self.logger = logger.get()
51+
self.session = session
52+
self.files = files
53+
self.__fileset_files_cache: Optional[_V3FilesetAllocator.FilesetFileCache] = None
54+
55+
@classmethod
56+
def __get_file_by_path(cls, files: List[dict]) -> Dict[str, dict]:
57+
return {f['path']: f for f in files}
58+
59+
@classmethod
60+
def __are_files_content_equaled(cls, a: dict, b: dict) -> bool:
61+
return (
62+
a['path'] == b['path'] and a['mode'] == b['mode'] and
63+
a['content'] == b['content'] and a['blob_hash'] == b['blob_hash'] and
64+
a['uid'] == b['uid'] and a['gid'] == b['gid'] and
65+
a['mtime'] == b['mtime']
66+
)
67+
68+
@classmethod
69+
def __calc_delta(cls, old: Dict[str, dict], new: Dict[str, dict]) -> collection_utils.DictValueDelta[dict]:
70+
return collection_utils.compute_dict_value_delta(old, new, cmp=cls.__are_files_content_equaled)
71+
72+
def enable_fileset_files_cache(self, cache: FilesetFileCache):
73+
self.__fileset_files_cache = cache
74+
75+
def __get_fileset_files(self, fileset_id: int) -> List[dict]:
76+
if self.__fileset_files_cache is not None:
77+
if (files := self.__fileset_files_cache.get(fileset_id, None)) is not None:
78+
return files
79+
80+
files = self.session.v3_get_fileset_files(fileset_id)
81+
if self.__fileset_files_cache is not None:
82+
self.__fileset_files_cache.set(fileset_id, files)
83+
return files
84+
85+
def allocate(self) -> _FilesetAllocateResult:
86+
@dataclasses.dataclass(frozen=True)
87+
class Candidate:
88+
fileset: dict
89+
file_by_path: Dict[str, dict]
90+
delta: collection_utils.DictValueDelta[dict]
91+
delta_size: int
92+
93+
args = _FilesetAllocateArgs()
94+
c: Optional[Candidate] = None
95+
file_by_path = self.__get_file_by_path(self.files)
96+
97+
for c_fileset in self.session.v3_get_last_n_base_fileset(limit=args.candidate_select_count):
98+
c_fileset_files = self.__get_fileset_files(c_fileset['id'])
99+
c_file_by_path = self.__get_file_by_path(c_fileset_files)
100+
delta = self.__calc_delta(old=c_file_by_path, new=file_by_path)
101+
self.logger.debug('Selecting fileset base candidate: id={} delta_size={}'.format(c_fileset['id'], delta.size()))
102+
if c_fileset['id'] <= 0 or c_fileset['base_id'] < 0:
103+
# should never happen, but just in case
104+
self.logger.error('Skipping corrupt fileset with id {}. Please validate the healthiness of the database'.format(c_fileset['id']))
105+
continue
106+
if delta.size() < len(file_by_path) * args.candidate_max_changes_ratio and (c is None or delta.size() < c.delta_size):
107+
c = Candidate(c_fileset, c_file_by_path, delta, delta.size())
108+
109+
if c is not None:
110+
ref_cnt = self.session.v3_get_fileset_associated_backup_count(c.fileset['id'])
111+
delta_file_object_count_sum = self.session.v3_get_fileset_delta_file_object_count_sum(c.fileset['id'])
112+
delta_ratio = delta_file_object_count_sum / c.fileset['file_object_count'] if c.fileset['file_object_count'] > 0 else 0
113+
self.logger.debug('Fileset base candidate selected, id {}, ref_cnt {}, delta_file_object_count_sum {} (r={:.2f})'.format(
114+
c.fileset['id'], ref_cnt, delta_file_object_count_sum, delta_ratio,
115+
))
116+
117+
if c is not None and ref_cnt >= args.max_base_reuse_count:
118+
self.logger.debug('Fileset base candidate {} has its ref_cnt {} >= {}, create a new fileset'.format(
119+
c.fileset['id'], ref_cnt, args.max_base_reuse_count
120+
))
121+
c = None
122+
if c is not None and delta_ratio >= args.max_delta_ratio:
123+
self.logger.info('Fileset base candidate {} has its delta_ratio {:.2f} >= {:.2f}, create a new fileset'.format(
124+
c.fileset['id'], delta_ratio, args.max_delta_ratio
125+
))
126+
c = None
127+
else:
128+
self.logger.debug('FilesetAllocator base fileset not found')
129+
130+
if c is None:
131+
rss, sss = _sum_file_sizes(self.files)
132+
fileset_base = self.session.v3_insert('fileset', dict(
133+
base_id=0,
134+
file_object_count=len(self.files),
135+
file_count=len(self.files),
136+
file_raw_size_sum=rss,
137+
file_stored_size_sum=sss,
138+
), need_result=True)
139+
fileset_delta = self.session.v3_insert('fileset', dict(
140+
base_id=fileset_base['id'],
141+
file_object_count=0,
142+
file_count=0,
143+
file_raw_size_sum=0,
144+
file_stored_size_sum=0,
145+
), need_result=True)
146+
147+
for file in self.files:
148+
file['fileset_id'] = fileset_base['id']
149+
file['role'] = _V3FileRole.standalone.value
150+
self.session.v3_insert('file', file)
151+
152+
self.logger.debug('Created base fileset {}, len(files)={}'.format(fileset_base, len(self.files)))
153+
self.logger.debug('Created empty delta fileset {}'.format(fileset_delta))
154+
return _FilesetAllocateResult(fileset_base, fileset_delta, new_file_object_count=len(self.files))
155+
else:
156+
# reuse the existing base fileset
157+
delta_files: List[dict] = []
158+
159+
# these sum are deltas
160+
file_count = 0
161+
file_raw_size_sum = 0
162+
file_stored_size_sum = 0
163+
164+
# NOTES: We can only manipulate new files (which are references from self.files),
165+
# DO NOT manipulate old files (which are files in existing fileset)
166+
for new_file in c.delta.added:
167+
new_file['role'] = _V3FileRole.delta_add.value
168+
file_count += 1
169+
file_raw_size_sum += (new_file['blob_raw_size'] or 0)
170+
file_stored_size_sum += (new_file['blob_stored_size'] or 0)
171+
delta_files.append(new_file)
172+
173+
for old_new in c.delta.changed:
174+
old_new.new['role'] = _V3FileRole.delta_override.value
175+
file_raw_size_sum += (old_new.new['blob_raw_size'] or 0) - (old_new.old['blob_raw_size'] or 0)
176+
file_stored_size_sum += (old_new.new['blob_stored_size'] or 0) - (old_new.old['blob_stored_size'] or 0)
177+
delta_files.append(old_new.new)
178+
179+
for old_file in c.delta.removed:
180+
file = dict(
181+
path=old_file['path'],
182+
role=_V3FileRole.delta_remove.value,
183+
mode=0,
184+
content=None,
185+
uid=None,
186+
gid=None,
187+
mtime=None,
188+
)
189+
file_count -= 1
190+
file_raw_size_sum -= (old_file['blob_raw_size'] or 0)
191+
file_stored_size_sum -= (old_file['blob_stored_size'] or 0)
192+
delta_files.append(file)
193+
194+
fileset_delta = self.session.v3_insert('fileset', dict(
195+
base_id=c.fileset['id'],
196+
file_object_count=len(delta_files),
197+
file_count=file_count,
198+
file_raw_size_sum=file_raw_size_sum,
199+
file_stored_size_sum=file_stored_size_sum,
200+
), need_result=True)
201+
202+
role_counter: Dict[_V3FileRole, int] = collections.defaultdict(int)
203+
for file in delta_files:
204+
file['fileset_id'] = fileset_delta['id']
205+
role_counter[_V3FileRole(file['role'])] += 1
206+
self.session.v3_insert('file', file)
207+
208+
self.logger.debug('Created delta fileset {}, len(delta_files)={}, role counts={}'.format(
209+
fileset_delta, len(delta_files), {k.name: v for k, v in role_counter.items()},
210+
))
211+
return _FilesetAllocateResult(c.fileset, fileset_delta, new_file_object_count=len(delta_files))
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
from pathlib import Path
2+
from typing import Optional, Sequence, Dict, Any, cast
3+
from typing import TypeVar, List
4+
5+
from sqlalchemy import text, CursorResult
6+
from sqlalchemy.orm import Session
7+
8+
from prime_backup.utils import collection_utils
9+
10+
_T = TypeVar('_T')
11+
12+
13+
# make type checker happy
14+
def _list_it(seq: Sequence[_T]) -> List[_T]:
15+
if not isinstance(seq, list):
16+
seq = list(seq)
17+
return seq
18+
19+
20+
def _int_or_0(value: Optional[int]) -> int:
21+
if value is None:
22+
return 0
23+
return int(value)
24+
25+
26+
class _V3DbSession:
27+
def __init__(self, session: Session, db_path: Path = None):
28+
self.session = session
29+
self.db_path = db_path
30+
self.__safe_var_limit = 999 - 20
31+
32+
def v3_insert(self, table: str, data: Dict[str, Any], *, need_result: bool = False, id_key: str = 'id') -> Dict[str, Any]:
33+
if table not in ['file', 'fileset', 'backup']:
34+
raise ValueError(f'unknown table name {table!r}')
35+
36+
columns = list(data.keys())
37+
38+
columns_str = ', '.join(columns)
39+
placeholders_str = ', '.join(f':{col}' for col in columns)
40+
41+
stmt = text(f'INSERT INTO {table} ({columns_str}) VALUES ({placeholders_str})').bindparams(**data)
42+
insert_result = cast(CursorResult, self.session.execute(stmt))
43+
44+
if need_result:
45+
query_stmt = text(f'SELECT * FROM {table} WHERE {id_key} = :id').bindparams(id=insert_result.lastrowid)
46+
result_row = self.session.execute(query_stmt).fetchone()
47+
# noinspection PyProtectedMember
48+
return dict(result_row._mapping)
49+
else:
50+
return data
51+
52+
def v3_get_fileset_associated_backup_count(self, fileset_id: int) -> int:
53+
return _int_or_0(self.session.execute(
54+
text('SELECT count(*) FROM backup WHERE fileset_id_base = :fileset_id OR fileset_id_delta = :fileset_id').
55+
bindparams(fileset_id=fileset_id)
56+
).scalar_one())
57+
58+
def v3_get_fileset_delta_file_object_count_sum(self, base_fileset_id: int) -> int:
59+
"""For those backups whose base fileset is the given one, sum up the file_object_count of their delta filesets"""
60+
delta_fileset_ids = _list_it(self.session.execute(
61+
text('SELECT distinct fileset_id_delta FROM backup WHERE fileset_id_base = :base_fileset_id').
62+
bindparams(base_fileset_id=base_fileset_id)
63+
).scalars().all())
64+
65+
count_sum = 0
66+
for view in collection_utils.slicing_iterate(_list_it(delta_fileset_ids), self.__safe_var_limit):
67+
sql_in_arg = '({})'.format(','.join(map(str, view)))
68+
count_sum += _int_or_0(self.session.execute(
69+
text(f'SELECT sum(file_object_count) FROM fileset WHERE id IN {sql_in_arg}')
70+
).scalar_one())
71+
return count_sum
72+
73+
def v3_get_last_n_base_fileset(self, limit: int) -> List[dict]:
74+
fileset_rows = self.session.execute(text('SELECT * FROM fileset WHERE base_id = 0 ORDER BY id DESC LIMIT :limit').bindparams(limit=limit))
75+
# noinspection PyProtectedMember
76+
return [dict(fileset_row._mapping) for fileset_row in fileset_rows]
77+
78+
def v3_get_fileset_files(self, fileset_id: int) -> List[dict]:
79+
file_rows = self.session.execute(text('SELECT * FROM file WHERE fileset_id = :fileset_id').bindparams(fileset_id=fileset_id))
80+
# noinspection PyProtectedMember
81+
return [dict(file_row._mapping) for file_row in file_rows]

0 commit comments

Comments
 (0)