Skip to content

Commit ab91818

Browse files
committed
[DCO01A-121] Make snapshot zip smaller by using symlinks to identical entities archives
1 parent dd5bb40 commit ab91818

File tree

2 files changed

+111
-8
lines changed

2 files changed

+111
-8
lines changed

mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import pathlib
44
import queue
55
import shutil
6+
import hashlib
7+
import tarfile
68
import tempfile
79
import zipfile
810
from collections import defaultdict
@@ -84,6 +86,11 @@ def __init__(
8486
self._auditlog_queue = queue.Queue()
8587
self._auditlog_listener = AuditLogListener(self._client,
8688
self._auditlog_queue)
89+
self._written_archives: dict[str, dict[tuple[str, ...], str]]
90+
self._written_archives = { # track created entities archives
91+
'plugins': {},
92+
'blueprints': {}, # will do for both blueprints and blueprint_revisions
93+
}
8794

8895
def create(self, timeout: float | None = None):
8996
"""Dumps manager's data and some metadata into a single zip file"""
@@ -263,6 +270,7 @@ def _write_files(
263270
if _should_append_entity(dump_type, entity):
264271
self._auditlog_listener.append_entity(
265272
tenant_name, dump_type, entity)
273+
self._update_written_archives(entity_id, dump_type, output_dir)
266274
# Dump the data as JSON files
267275
filenum = _get_max_filenum_in_dir(output_dir) or 0
268276
for (source, source_id), items in data_buckets.items():
@@ -308,17 +316,24 @@ def _create_archive(self):
308316
) as zf:
309317
base_dir = os.path.join(root_dir, os.curdir)
310318
base_dir = os.path.normpath(base_dir)
311-
for dirpath, dirnames, filenames in os.walk(base_dir):
319+
for dirpath, dirnames, filenames in os.walk(base_dir, followlinks=False):
320+
root_path = Path(dirpath)
312321
arcdirpath = os.path.relpath(dirpath, root_dir)
313322
for name in sorted(dirnames):
314323
path = os.path.join(dirpath, name)
315324
arcname = os.path.join(arcdirpath, name)
316325
zf.write(path, arcname)
317326
for name in filenames:
318-
path = os.path.join(dirpath, name)
319-
path = os.path.normpath(path)
320-
if os.path.isfile(path):
321-
arcname = os.path.join(arcdirpath, name)
327+
path = root_path / name
328+
arcname = path.relative_to(root_dir)
329+
if path.is_symlink():
330+
zip_info = zipfile.ZipInfo(str(arcname))
331+
zip_info.create_system = 3 # Unix
332+
st = os.lstat(path)
333+
zip_info.external_attr = st.st_mode << 16
334+
link_target = os.readlink(path)
335+
zf.writestr(zip_info, link_target)
336+
elif os.path.isfile(path):
322337
zf.write(path, arcname)
323338

324339
def _upload_archive(self):
@@ -392,6 +407,29 @@ def _update_snapshot_status(self, status, error=None):
392407
error=error
393408
)
394409

410+
def _update_written_archives(self, entity_id, dump_type, output_dir):
411+
dest_dir = (output_dir / f'{dump_type}').resolve()
412+
suffix = {
413+
'plugins': '.zip',
414+
'blueprints': '.tar.gz',
415+
}.get(dump_type)
416+
if not suffix:
417+
return
418+
entity_archive = dest_dir / f'{entity_id}{suffix}'
419+
if dump_type == 'plugins':
420+
content_hashes = get_zip_content_hashes(entity_archive)
421+
else:
422+
content_hashes = get_tar_gz_content_hashes(entity_archive)
423+
if existing_path := self._written_archives[dump_type].get(content_hashes):
424+
entity_archive.unlink(missing_ok=False)
425+
os.symlink(
426+
os.path.relpath(existing_path, entity_archive).split("/", 1)[-1],
427+
entity_archive,
428+
)
429+
ctx.logger.debug("Created symlink: %s to %s", entity_archive, existing_path)
430+
return
431+
self._written_archives[dump_type][content_hashes] = entity_archive
432+
395433

396434
def _prepare_temp_dir() -> Path:
397435
"""Prepare temporary (working) directory structure"""
@@ -516,3 +554,54 @@ def get_all(method, kwargs=None):
516554
kwargs['_offset'] = len(data)
517555

518556
return data
557+
558+
559+
def get_zip_content_hashes(zip_path) -> tuple[str, ...]:
560+
hashes: set[str] = set()
561+
all_dirs: set[str] = set()
562+
not_empty_dirs: set[str] = set()
563+
filenames: set[str] = set()
564+
with zipfile.ZipFile(zip_path, 'r') as archive:
565+
for info in archive.infolist():
566+
if not info.is_dir():
567+
filenames.add(info.filename)
568+
parts = info.filename.split('/')
569+
for i in range(1, len(parts)):
570+
not_empty_dirs.add('/'.join(parts[:i]))
571+
with archive.open(info) as fileobj:
572+
content = fileobj.read()
573+
sha256 = hashlib.sha256(content).hexdigest()
574+
hashes.add(sha256)
575+
else:
576+
all_dirs.add(info.filename.rstrip("/"))
577+
if filenames:
578+
hashes.add(hashlib.sha256(":".join(filenames).encode("utf-8")).hexdigest())
579+
if empty_dirs := all_dirs - not_empty_dirs:
580+
hashes.add(hashlib.sha256(":".join(empty_dirs).encode("utf-8")).hexdigest())
581+
return tuple(hashes)
582+
583+
584+
def get_tar_gz_content_hashes(tar_gz_path) -> tuple[str, ...]:
585+
hashes: set[str] = set()
586+
all_dirs: set[str] = set()
587+
not_empty_dirs: set[str] = set()
588+
filenames: set[str] = set()
589+
with tarfile.open(tar_gz_path, 'r:gz') as archive:
590+
for member in archive.getmembers():
591+
if member.isfile():
592+
filenames.add(member.name)
593+
parts = member.name.split('/')
594+
for i in range(1, len(parts)):
595+
not_empty_dirs.add('/'.join(parts[:i]))
596+
fileobj = archive.extractfile(member)
597+
if fileobj:
598+
content = fileobj.read()
599+
sha256 = hashlib.sha256(content).hexdigest()
600+
hashes.add(sha256)
601+
else:
602+
all_dirs.add(member.name.rstrip("/"))
603+
if filenames:
604+
hashes.add(hashlib.sha256(":".join(filenames).encode("utf-8")).hexdigest())
605+
if empty_dirs := all_dirs - not_empty_dirs:
606+
hashes.add(hashlib.sha256(":".join(empty_dirs).encode("utf-8")).hexdigest())
607+
return tuple(hashes)

mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
import uuid
77
import base64
88
import shutil
9+
import stat
910
import zipfile
1011
import tempfile
1112
import threading
1213
import subprocess
1314
from contextlib import contextmanager
1415
from functools import partial
16+
from pathlib import Path
1517
from typing import Any
1618

1719
from cloudify.workflows import ctx
@@ -76,6 +78,11 @@
7678

7779
# Reproduced/modified from patch for https://bugs.python.org/issue15795
7880
class ZipFile(zipfile.ZipFile):
81+
82+
def __init__(self, *args, **kwargs):
83+
super().__init__(*args, **kwargs)
84+
self._all_entries = {info.filename.rstrip('/'): info for info in self.infolist()}
85+
7986
def _extract_member(self, member, targetpath, pwd):
8087
"""Extract the ZipInfo object 'member' to a physical
8188
file on the path targetpath.
@@ -112,11 +119,18 @@ def _extract_member(self, member, targetpath, pwd):
112119
os.mkdir(targetpath)
113120
return targetpath
114121

115-
with self.open(member, pwd=pwd) as source, \
116-
open(targetpath, "wb") as target:
122+
_mode = member.external_attr >> 16
123+
if stat.S_ISLNK(_mode):
124+
link = self.read(member.filename).decode('utf-8')
125+
source_path = Path(member.filename).parent / link
126+
member_to_extract = self._all_entries[os.path.normpath(source_path)]
127+
else:
128+
member_to_extract = member
129+
130+
with self.open(member_to_extract, pwd=pwd) as source, open(targetpath, "wb") as target:
117131
shutil.copyfileobj(source, target)
118132

119-
mode = member.external_attr >> 16 & 0xFFF
133+
mode = member_to_extract.external_attr >> 16 & 0xFFF
120134
os.chmod(targetpath, mode)
121135
return targetpath
122136

0 commit comments

Comments
 (0)