1+ import dataclasses
12import shutil
23import time
3- from typing import List , Dict , Set
4+ from pathlib import Path
5+ from typing import Callable , Dict , Generic , List , Protocol , TypeVar , cast , Tuple
46
57from typing_extensions import override
68
79from prime_backup .action import Action
810from prime_backup .compressors import Compressor
11+ from prime_backup .db import schema
912from prime_backup .db .access import DbAccess
1013from prime_backup .db .session import DbSession
14+ from prime_backup .db .values import BlobStorageMethod
1115from prime_backup .exceptions import PrimeBackupError
1216from prime_backup .types .hash_method import HashMethod
13- from prime_backup .utils import blob_utils , hash_utils , collection_utils
17+ from prime_backup .utils import blob_utils , chunk_utils , hash_utils
18+
19+ _READ_BUF_SIZE = 128 * 1024
20+ _FILE_BLOB_HASH_BATCH_SIZE = 200
21+ _T = TypeVar ('_T' )
1422
1523
1624class HashCollisionError (PrimeBackupError ):
@@ -20,84 +28,256 @@ class HashCollisionError(PrimeBackupError):
2028 pass
2129
2230
31+ class _HashObject (Protocol ):
32+ id : int
33+ hash : str
34+
35+
36+ @dataclasses .dataclass (frozen = True )
37+ class _HashMove (Generic [_T ]):
38+ object : _T
39+ old_hash : str
40+ new_hash : str
41+ has_file_to_move : bool = False
42+
43+ @property
44+ def id (self ) -> int :
45+ return cast (_HashObject , self .object ).id
46+
47+ @property
48+ def changed (self ) -> bool :
49+ return self .old_hash != self .new_hash
50+
51+
52+ class _MoveJournal :
53+ def __init__ (self ):
54+ self .__moves : List [Tuple [Path , Path ]] = []
55+
56+ def move (self , src : Path , dst : Path ):
57+ if dst .exists ():
58+ raise FileExistsError (dst )
59+ shutil .move (src , dst )
60+ self .__moves .append ((src , dst ))
61+
62+ def rollback (self ):
63+ for src , dst in reversed (self .__moves ):
64+ shutil .move (dst , src )
65+
66+ def clear (self ):
67+ self .__moves .clear ()
68+
69+
70+ def _changed_moves (moves : List [_HashMove [_T ]]) -> List [_HashMove [_T ]]:
71+ return [move for move in moves if move .changed ]
72+
73+
2374class MigrateHashMethodAction (Action [None ]):
2475 def __init__ (self , new_hash_method : HashMethod ):
2576 super ().__init__ ()
2677 self .new_hash_method = new_hash_method
78+ self .__move_journal = _MoveJournal ()
2779
28- def __migrate_blobs (self , session : DbSession , blob_hashes : List [str ], old_hashes : Set [str ], processed_hash_mapping : Dict [str , str ]):
29- hash_mapping : Dict [str , str ] = {}
30- blobs = list (session .get_blobs_by_hashes (blob_hashes ).values ())
31-
32- # calc blob hashes
33- for blob in blobs :
34- blob_path = blob_utils .get_blob_path (blob .hash )
35- with Compressor .create (blob .compress ).open_decompressed (blob_path ) as f :
36- sah = hash_utils .calc_reader_size_and_hash (f , hash_method = self .new_hash_method )
37- hash_mapping [blob .hash ] = sah .hash
38- if sah .hash in old_hashes :
39- raise HashCollisionError (sah .hash )
40-
41- # update the objects
42- for blob in blobs :
43- old_hash , new_hash = blob .hash , hash_mapping [blob .hash ]
44- old_path = blob_utils .get_blob_path (old_hash )
45- new_path = blob_utils .get_blob_path (new_hash )
46- try :
47- shutil .move (old_path , new_path )
48- except Exception as e :
49- self .logger .error ('Move blob ({} -> {}) from {!r} to {!r} failed: {}' .format (old_hash , new_hash , old_path , new_path , e ))
50- raise
80+ # ==================== Checks ====================
81+
82+ @classmethod
83+ def __ensure_hashes_can_migrate (cls , moves : List [_HashMove ], object_name : str ):
84+ old_hash_ids : Dict [str , int ] = {}
85+ new_hash_ids : Dict [str , int ] = {}
86+ for move in moves :
87+ old_hash_ids [move .old_hash ] = move .id
88+ if (other_id := new_hash_ids .get (move .new_hash )) is not None :
89+ raise HashCollisionError ('{} hash collision: {}, object ids {} and {}' .format (object_name , move .new_hash , other_id , move .id ))
90+ new_hash_ids [move .new_hash ] = move .id
91+
92+ for move in moves :
93+ if not move .changed :
94+ continue
95+ if (old_owner_id := old_hash_ids .get (move .new_hash )) is not None and old_owner_id != move .id :
96+ raise HashCollisionError ('{} hash conflicts with existing old hash: {}, object ids {} and {}' .format (object_name , move .new_hash , old_owner_id , move .id ))
97+
98+ @classmethod
99+ def __ensure_paths_can_migrate (cls , moves : List [_HashMove ], get_path : Callable [[str ], Path ]):
100+ for move in moves :
101+ if move .changed and move .has_file_to_move and (new_hash_path := get_path (move .new_hash )).exists ():
102+ raise FileExistsError (new_hash_path )
103+
104+ # ==================== Hash Calculation ====================
105+
106+ def __calc_direct_blob_new_hash (self , blob : schema .Blob ) -> str :
107+ with Compressor .create (blob .compress ).open_decompressed (blob_utils .get_blob_path (blob .hash )) as f :
108+ sah = hash_utils .calc_reader_size_and_hash (f , hash_method = self .new_hash_method )
109+ if sah .size != blob .raw_size :
110+ raise ValueError ('raw size mismatch for blob {}, expect {}, found {}' .format (blob .hash , blob .raw_size , sah .size ))
111+ return sah .hash
112+
113+ def __calc_chunked_blob_new_hash (self , session : DbSession , blob : schema .Blob ) -> str :
114+ hasher = self .new_hash_method .value .create_hasher ()
115+ size = 0
116+ for offset_chunk in session .get_blob_chunks (blob .id ):
117+ chunk = offset_chunk .chunk
118+ chunk_size = 0
119+ with Compressor .create (chunk .compress ).open_decompressed (chunk_utils .get_chunk_path (chunk .hash )) as f :
120+ while True :
121+ buf = f .read (_READ_BUF_SIZE )
122+ if len (buf ) == 0 :
123+ break
124+ hasher .update (buf )
125+ chunk_size += len (buf )
126+ if chunk_size != chunk .raw_size :
127+ raise ValueError ('raw size mismatch for chunk {}, expect {}, found {}' .format (chunk .hash , chunk .raw_size , chunk_size ))
128+ size += chunk_size
129+ if size != blob .raw_size :
130+ raise ValueError ('raw size mismatch for chunked blob {}, expect {}, found {}' .format (blob .hash , blob .raw_size , size ))
131+ return hasher .hexdigest ()
51132
52- processed_hash_mapping [old_hash ] = new_hash
53- blob .hash = new_hash
133+ def __calc_chunk_new_hash (self , chunk : schema .Chunk ) -> str :
134+ with Compressor .create (chunk .compress ).open_decompressed_bypassed (chunk_utils .get_chunk_path (chunk .hash )) as (reader , f ):
135+ sah = hash_utils .calc_reader_size_and_hash (f , hash_method = self .new_hash_method )
136+ if reader .get_read_len () != chunk .stored_size :
137+ raise ValueError ('stored size mismatch for chunk {}, expect {}, found {}' .format (chunk .hash , chunk .stored_size , reader .get_read_len ()))
138+ if sah .size != chunk .raw_size :
139+ raise ValueError ('raw size mismatch for chunk {}, expect {}, found {}' .format (chunk .hash , chunk .raw_size , sah .size ))
140+ return sah .hash
54141
142+ # ==================== Move Collection ====================
143+
144+ def __collect_blob_moves (self , session : DbSession ) -> List [_HashMove [schema .Blob ]]:
145+ moves : List [_HashMove [schema .Blob ]] = []
146+ blobs = session .list_blobs ()
147+ total = len (blobs )
148+ for i , blob in enumerate (blobs ):
149+ if blob .storage_method == BlobStorageMethod .direct .value :
150+ new_hash = self .__calc_direct_blob_new_hash (blob )
151+ has_file_to_move = True
152+ elif blob .storage_method == BlobStorageMethod .chunked .value :
153+ new_hash = self .__calc_chunked_blob_new_hash (session , blob )
154+ has_file_to_move = False
155+ else :
156+ raise ValueError ('unsupported blob storage method {}' .format (blob .storage_method ))
157+
158+ moves .append (_HashMove (object = blob , old_hash = blob .hash , new_hash = new_hash , has_file_to_move = has_file_to_move ))
159+ if (i + 1 ) % 1000 == 0 or i + 1 == total :
160+ self .logger .info ('Calculated blob hashes {} / {}' .format (i + 1 , total ))
161+
162+ self .__ensure_hashes_can_migrate (moves , 'blob' )
163+ self .__ensure_paths_can_migrate (moves , blob_utils .get_blob_path )
164+ return _changed_moves (moves )
165+
166+ def __collect_chunk_moves (self , session : DbSession ) -> List [_HashMove [schema .Chunk ]]:
167+ moves : List [_HashMove [schema .Chunk ]] = []
168+ chunks = session .list_chunks ()
169+ total = len (chunks )
170+ for i , chunk in enumerate (chunks ):
171+ new_hash = self .__calc_chunk_new_hash (chunk )
172+ moves .append (_HashMove (object = chunk , old_hash = chunk .hash , new_hash = new_hash , has_file_to_move = True ))
173+ if (i + 1 ) % 2000 == 0 or i + 1 == total :
174+ self .logger .info ('Calculated chunk hashes {} / {}' .format (i + 1 , total ))
175+
176+ self .__ensure_hashes_can_migrate (moves , 'chunk' )
177+ self .__ensure_paths_can_migrate (moves , chunk_utils .get_chunk_path )
178+ return _changed_moves (moves )
179+
180+ def __collect_chunk_group_moves (self , session : DbSession ) -> List [_HashMove [schema .ChunkGroup ]]:
181+ moves : List [_HashMove [schema .ChunkGroup ]] = []
182+ chunk_groups = session .list_chunk_groups ()
183+ chunks_by_group_id = session .get_chunk_group_chunks_batch ([chunk_group .id for chunk_group in chunk_groups ])
184+ total = len (chunk_groups )
185+ for i , chunk_group in enumerate (chunk_groups ):
186+ new_hash = chunk_utils .create_chunk_group_hash (
187+ offset_chunk .chunk .hash
188+ for offset_chunk in chunks_by_group_id [chunk_group .id ]
189+ )
190+ moves .append (_HashMove (object = chunk_group , old_hash = chunk_group .hash , new_hash = new_hash ))
191+ if (i + 1 ) % 5000 == 0 or i + 1 == total :
192+ self .logger .info ('Calculated chunk group hashes {} / {}' .format (i + 1 , total ))
193+
194+ self .__ensure_hashes_can_migrate (moves , 'chunk group' )
195+ return _changed_moves (moves )
196+
197+ # ==================== DB Updates ====================
198+
199+ def __update_file_blob_hashes (self , session : DbSession , moves : List [_HashMove [schema .Blob ]]):
200+ hash_mapping = {move .old_hash : move .new_hash for move in moves }
201+ if len (hash_mapping ) == 0 :
202+ return
55203 for file in session .get_file_by_blob_hashes (list (hash_mapping .keys ())):
56204 if file .blob_hash is None :
57205 raise AssertionError ('file {!r} has no blob_hash' .format (file ))
58206 file .blob_hash = hash_mapping [file .blob_hash ]
59207
208+ # ==================== Migration Steps ====================
209+
210+ def __move_files_to_new_hashes (self , object_name : str , moves : List [_HashMove ], get_path : Callable [[str ], Path ]):
211+ total = sum (1 for move in moves if move .has_file_to_move )
212+ done = 0
213+ for move in moves :
214+ if move .has_file_to_move :
215+ self .__move_journal .move (get_path (move .old_hash ), get_path (move .new_hash ))
216+ done += 1
217+ if done % 2000 == 0 or done == total :
218+ self .logger .info ('Moved {} files {} / {}' .format (object_name , done , total ))
219+
220+ def __migrate_blob_hashes (self , session : DbSession , moves : List [_HashMove [schema .Blob ]]):
221+ self .__move_files_to_new_hashes ('blob' , moves , blob_utils .get_blob_path )
222+ for offset in range (0 , len (moves ), _FILE_BLOB_HASH_BATCH_SIZE ):
223+ batch = moves [offset :offset + _FILE_BLOB_HASH_BATCH_SIZE ]
224+ with session .no_auto_flush ():
225+ for move in batch :
226+ move .object .hash = move .new_hash
227+ self .__update_file_blob_hashes (session , batch )
228+ session .flush ()
229+
230+ def __migrate_chunk_hashes (self , session : DbSession , moves : List [_HashMove [schema .Chunk ]]):
231+ self .__move_files_to_new_hashes ('chunk' , moves , chunk_utils .get_chunk_path )
232+ for move in moves :
233+ move .object .hash = move .new_hash
234+ session .flush ()
235+
236+ def __migrate_chunk_group_hashes (self , session : DbSession , moves : List [_HashMove [schema .ChunkGroup ]]):
237+ for move in moves :
238+ move .object .hash = move .new_hash
239+ session .flush ()
240+
241+ def __rollback_files (self ):
242+ self .__move_journal .rollback ()
243+
244+ # ==================== Entry Point ====================
245+
60246 @override
61247 def run (self ) -> None :
62- processed_hash_mapping : Dict [str , str ] = {} # old -> new
248+ t = time .time ()
249+ db_committed = False
63250 try :
64- t = time .time ()
65251 with DbAccess .open_session () as session :
66252 meta = session .get_db_meta ()
67253 if meta .hash_method == self .new_hash_method .name :
68254 self .logger .info ('Hash method of the database is already {}, no need to migrate' .format (self .new_hash_method .name ))
69255 return
70256
71257 self .logger .info ('Migrating hash method from {} to {}' .format (meta .hash_method , self .new_hash_method .name ))
258+ blob_utils .prepare_blob_directories ()
259+ chunk_utils .prepare_chunk_directories ()
72260
73- # XXX: don't load all blobs into memory?
74- total_blob_count = session .get_blob_count ()
75- all_hashes = session .get_all_blob_hashes ()
76- all_hash_set = set (all_hashes )
77- cnt = 0
78- for blob_hashes in collection_utils .slicing_iterate (all_hashes , 1000 ):
79- cnt += len (blob_hashes )
80- self .logger .info ('Migrating blobs {} / {}' .format (cnt , total_blob_count ))
261+ self .__migrate_blob_hashes (session , self .__collect_blob_moves (session ))
262+ self .__migrate_chunk_hashes (session , self .__collect_chunk_moves (session ))
263+ self .__migrate_chunk_group_hashes (session , self .__collect_chunk_group_moves (session ))
81264
82- self .__migrate_blobs (session , blob_hashes , all_hash_set , processed_hash_mapping )
83- session .flush_and_expunge_all ()
84-
85- meta = session .get_db_meta () # get the meta again, cuz expunge_all() was called
265+ meta = session .get_db_meta ()
86266 meta .hash_method = self .new_hash_method .name
87267
88- self .logger .info ('Syncing config and variables' )
89- DbAccess .sync_hash_method ()
90- self .config .backup .hash_method = self .new_hash_method
91-
268+ db_committed = True
269+ try :
270+ self .logger .info ('Syncing config and variables' )
271+ DbAccess .sync_hash_method ()
272+ self .config .backup .hash_method = self .new_hash_method
273+ self .__move_journal .clear ()
274+ except Exception :
275+ self .logger .fatal ('DB committed but in-memory sync failed, plugin restart required' )
276+ raise
92277 self .logger .info ('Hash method migration done, cost {}s' .format (round (time .time () - t , 2 )))
93278
94279 except Exception :
95280 self .logger .warning ('Error occurs during migration, applying rollback' )
96- for old_hash , new_hash in processed_hash_mapping .items ():
97- old_path = blob_utils .get_blob_path (old_hash )
98- new_path = blob_utils .get_blob_path (new_hash )
99- try :
100- shutil .move (new_path , old_path )
101- except Exception as e_rb :
102- self .logger .error ('Rollback failed for blob {} -> {}, suppressed: {}' .format (old_path , old_path , e_rb ))
281+ if not db_committed :
282+ self .__rollback_files ()
103283 raise
0 commit comments