Skip to content

Commit f43fda1

Browse files
committed
MigrateHashMethodAction fix chunk group not rebuilt
1 parent 155e979 commit f43fda1

2 files changed

Lines changed: 42 additions & 22 deletions

File tree

prime_backup/action/migrate_hash_method_action.py

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing_extensions import override
88

99
from prime_backup.action import Action
10+
from prime_backup.action.helpers.chunk_grouper import ChunkGrouper
1011
from prime_backup.compressors import Compressor
1112
from prime_backup.db import schema
1213
from prime_backup.db.access import DbAccess
@@ -177,22 +178,32 @@ def __collect_chunk_moves(self, session: DbSession) -> List[_HashMove[schema.Chu
177178
self.__ensure_paths_can_migrate(moves, chunk_utils.get_chunk_path)
178179
return _changed_moves(moves)
179180

180-
def __collect_chunk_group_moves(self, session: DbSession) -> List[_HashMove[schema.ChunkGroup]]:
181-
moves: List[_HashMove[schema.ChunkGroup]] = []
182-
chunk_groups = session.list_chunk_groups()
183-
chunks_by_group_id = session.get_chunk_group_chunks_batch([chunk_group.id for chunk_group in chunk_groups])
184-
total = len(chunk_groups)
185-
for i, chunk_group in enumerate(chunk_groups):
186-
new_hash = chunk_utils.create_chunk_group_hash(
187-
offset_chunk.chunk.hash
188-
for offset_chunk in chunks_by_group_id[chunk_group.id]
189-
)
190-
moves.append(_HashMove(object=chunk_group, old_hash=chunk_group.hash, new_hash=new_hash))
191-
if (i + 1) % 5000 == 0 or i + 1 == total:
192-
self.logger.info('Calculated chunk group hashes {} / {}'.format(i + 1, total))
193-
194-
self.__ensure_hashes_can_migrate(moves, 'chunk group')
195-
return _changed_moves(moves)
181+
def __regroup_chunked_blobs(self, session: DbSession):
182+
# Step 1 - collect blob -> ordered chunks before destroying the binding chain
183+
chunked_blobs = session.list_blobs_by_storage_method(BlobStorageMethod.chunked)
184+
total = len(chunked_blobs)
185+
blob_chunks_map: Dict[int, Dict[int, schema.Chunk]] = {}
186+
for blob in chunked_blobs:
187+
offset_chunks = session.get_blob_chunks(blob.id) # sorted by absolute_offset
188+
blob_chunks_map[blob.id] = {oc.offset: oc.chunk for oc in offset_chunks}
189+
190+
chunk_group_count = session.get_chunk_group_count()
191+
self.logger.info('Dropping {} chunk group and all bindings, then re-grouping {} chunked blob with the new chunk hashes'.format(chunk_group_count, total))
192+
193+
# Step 2 - wipe all chunk group data
194+
session.delete_all_blob_chunk_group_bindings()
195+
session.delete_all_chunk_group_chunk_bindings()
196+
session.delete_all_chunk_groups()
197+
session.flush()
198+
199+
# Step 3 - re-group using ChunkGrouper (which applies the endswith('00') cut rule)
200+
if total == 0:
201+
return
202+
chunk_grouper = ChunkGrouper(session, None)
203+
for i, blob in enumerate(chunked_blobs):
204+
chunk_grouper.create_chunk_groups(blob, blob_chunks_map[blob.id])
205+
if (i + 1) % 200 == 0 or i + 1 == total:
206+
self.logger.info('Re-grouped chunked blobs {} / {}'.format(i + 1, total))
196207

197208
# ==================== DB Updates ====================
198209

@@ -233,11 +244,6 @@ def __migrate_chunk_hashes(self, session: DbSession, moves: List[_HashMove[schem
233244
move.object.hash = move.new_hash
234245
session.flush()
235246

236-
def __migrate_chunk_group_hashes(self, session: DbSession, moves: List[_HashMove[schema.ChunkGroup]]):
237-
for move in moves:
238-
move.object.hash = move.new_hash
239-
session.flush()
240-
241247
def __rollback_files(self):
242248
self.__move_journal.rollback()
243249

@@ -260,7 +266,7 @@ def run(self) -> None:
260266

261267
self.__migrate_blob_hashes(session, self.__collect_blob_moves(session))
262268
self.__migrate_chunk_hashes(session, self.__collect_chunk_moves(session))
263-
self.__migrate_chunk_group_hashes(session, self.__collect_chunk_group_moves(session))
269+
self.__regroup_chunked_blobs(session)
264270

265271
meta = session.get_db_meta()
266272
meta.hash_method = self.new_hash_method.name

prime_backup/db/session.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ def list_blobs(self, limit: Optional[int] = None, offset: Optional[int] = None)
255255
s = s.offset(offset)
256256
return _list_it(self.session.execute(s).scalars().all())
257257

258+
def list_blobs_by_storage_method(self, storage_method: 'BlobStorageMethod') -> List[schema.Blob]:
259+
return _list_it(self.session.execute(
260+
select(schema.Blob).where(schema.Blob.storage_method == storage_method.value)
261+
).scalars().all())
262+
258263
def list_blob_with_hash_prefix(self, hash_prefix: str, limit: int) -> List[schema.Blob]:
259264
s = select(schema.Blob).where(*self.__get_hash_prefix_conditions(hash_prefix, schema.Blob.hash)).limit(limit)
260265
return _list_it(self.session.execute(s).scalars().all())
@@ -605,6 +610,9 @@ def delete_chunk_groups_by_ids(self, chunk_group_ids: List[int]):
605610
for view in collection_utils.slicing_iterate(chunk_group_ids, self.__safe_var_limit):
606611
self.session.execute(delete(schema.ChunkGroup).where(schema.ChunkGroup.id.in_(view)))
607612

613+
def delete_all_chunk_groups(self):
614+
self.session.execute(delete(schema.ChunkGroup))
615+
608616
def calc_chunk_group_stored_size_sum(self, chunk_group_id: int) -> int:
609617
return _int_or_0(self.session.execute(
610618
select(func.sum(schema.Chunk.stored_size)).
@@ -757,6 +765,9 @@ def delete_chunk_group_chunk_bindings_for_chunk_groups(self, chunk_group_ids: Li
757765
where(schema.ChunkGroupChunkBinding.chunk_group_id.in_(view))
758766
)
759767

768+
def delete_all_chunk_group_chunk_bindings(self):
769+
self.session.execute(delete(schema.ChunkGroupChunkBinding))
770+
760771
def delete_chunk_group_chunk_binding(self, binding: schema.ChunkGroupChunkBinding):
761772
self.session.delete(binding)
762773

@@ -971,6 +982,9 @@ def delete_blob_chunk_group_bindings_for_blobs(self, blob_ids: List[int]):
971982
where(schema.BlobChunkGroupBinding.blob_id.in_(view))
972983
)
973984

985+
def delete_all_blob_chunk_group_bindings(self):
986+
self.session.execute(delete(schema.BlobChunkGroupBinding))
987+
974988
def delete_blob_chunk_group_binding(self, binding: schema.BlobChunkGroupBinding):
975989
self.session.delete(binding)
976990

0 commit comments

Comments
 (0)