Skip to content

Commit 6b2c0aa

Browse files
authored
Merge pull request #1 from bylickilabs/bylickilabs-patch-1
Add files via upload
2 parents 16e8219 + 1c95eb9 commit 6b2c0aa

4 files changed

Lines changed: 376 additions & 0 deletions

File tree

__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .engine import (
2+
encrypt_path,
3+
decrypt_container,
4+
list_container,
5+
verify_container,
6+
change_password,
7+
SecureArchiveError,
8+
InvalidContainerError,
9+
WrongPasswordError,
10+
)

crypto.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import os
2+
from dataclasses import dataclass
3+
from typing import Tuple
4+
5+
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
6+
from cryptography.hazmat.primitives import hashes
7+
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
8+
from cryptography.hazmat.backends import default_backend
9+
10+
@dataclass
11+
class KdfParams:
12+
iterations: int
13+
salt: bytes
14+
15+
def derive_key(password: str, params: KdfParams, length: int = 32) -> bytes:
16+
kdf = PBKDF2HMAC(
17+
algorithm=hashes.SHA512(),
18+
length=length,
19+
salt=params.salt,
20+
iterations=params.iterations,
21+
backend=default_backend(),
22+
)
23+
return kdf.derive(password.encode("utf-8"))
24+
25+
def generate_salt(size: int = 16) -> bytes:
26+
return os.urandom(size)
27+
28+
def encrypt_aes_gcm(key: bytes, plaintext: bytes, aad: bytes = b"") -> Tuple[bytes, bytes]:
29+
nonce = os.urandom(12)
30+
aesgcm = AESGCM(key)
31+
ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
32+
return nonce, ciphertext
33+
34+
def decrypt_aes_gcm(key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes = b"") -> bytes:
35+
aesgcm = AESGCM(key)
36+
return aesgcm.decrypt(nonce, ciphertext, aad)

engine.py

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
import json
2+
from dataclasses import dataclass
3+
from pathlib import Path
4+
from typing import Dict, Any, List, Tuple
5+
6+
from cryptography.exceptions import InvalidTag
7+
8+
from .crypto import (
9+
KdfParams,
10+
derive_key,
11+
generate_salt,
12+
encrypt_aes_gcm,
13+
decrypt_aes_gcm,
14+
)
15+
from .fsutil import collect_entries, FileEntry
16+
17+
MAGIC = b"SECARC01"
18+
VERSION = 1
19+
PAYLOAD_SEPARATOR = b"\n---PAYLOAD---\n"
20+
21+
class SecureArchiveError(Exception):
22+
"""Base exception for all SecureArchive-related errors."""
23+
pass
24+
25+
class InvalidContainerError(SecureArchiveError):
26+
"""Raised when the container is corrupted, invalid or unreadable."""
27+
pass
28+
29+
class WrongPasswordError(SecureArchiveError):
30+
"""Raised when AES-GCM decryption fails due to an incorrect password."""
31+
pass
32+
33+
@dataclass
34+
class ContainerHeader:
35+
version: int
36+
salt: bytes
37+
iterations: int
38+
nonce: bytes
39+
40+
41+
def _build_header_bytes(header: ContainerHeader) -> bytes:
42+
salt_len = len(header.salt)
43+
nonce_len = len(header.nonce)
44+
data = bytearray()
45+
data.extend(MAGIC)
46+
data.append(header.version & 0xFF)
47+
data.append(salt_len & 0xFF)
48+
data.extend(header.salt)
49+
data.extend(header.iterations.to_bytes(4, "big"))
50+
data.append(nonce_len & 0xFF)
51+
data.extend(header.nonce)
52+
return bytes(data)
53+
54+
55+
def _parse_header_bytes(data: bytes) -> Tuple[ContainerHeader, bytes]:
56+
if len(data) < 8 + 1 + 1 + 4 + 1:
57+
raise InvalidContainerError("Header too short")
58+
59+
offset = 0
60+
magic = data[offset:offset + 8]
61+
offset += 8
62+
if magic != MAGIC:
63+
raise InvalidContainerError("Magic mismatch")
64+
65+
version = data[offset]
66+
offset += 1
67+
if version != VERSION:
68+
raise InvalidContainerError("Unsupported version")
69+
70+
salt_len = data[offset]
71+
offset += 1
72+
if len(data) < offset + salt_len + 4 + 1:
73+
raise InvalidContainerError("Header corrupt")
74+
salt = data[offset:offset + salt_len]
75+
offset += salt_len
76+
77+
iterations = int.from_bytes(data[offset:offset + 4], "big")
78+
offset += 4
79+
80+
nonce_len = data[offset]
81+
offset += 1
82+
83+
if len(data) < offset + nonce_len:
84+
raise InvalidContainerError("Header corrupt (nonce)")
85+
nonce = data[offset:offset + nonce_len]
86+
offset += nonce_len
87+
88+
header = ContainerHeader(
89+
version=version,
90+
salt=salt,
91+
iterations=iterations,
92+
nonce=nonce,
93+
)
94+
remaining = data[offset:]
95+
return header, remaining
96+
97+
98+
def encrypt_path(
99+
input_path: str,
100+
container_path: str,
101+
password: str,
102+
iterations: int = 300_000,
103+
overwrite: bool = False,
104+
) -> None:
105+
src = Path(input_path)
106+
if not src.exists():
107+
raise FileNotFoundError(input_path)
108+
109+
dst = Path(container_path)
110+
if dst.exists() and not overwrite:
111+
raise FileExistsError(container_path)
112+
113+
entries: List[FileEntry] = collect_entries(src)
114+
if not entries:
115+
raise SecureArchiveError("Input path contains no files.")
116+
117+
salt = generate_salt(16)
118+
kdf_params = KdfParams(iterations=iterations, salt=salt)
119+
key = derive_key(password, kdf_params)
120+
121+
manifest: Dict[str, Any] = {
122+
"version": VERSION,
123+
"cipher": "AES-256-GCM",
124+
"kdf": {
125+
"type": "PBKDF2-SHA512",
126+
"iterations": iterations,
127+
"salt_hex": salt.hex(),
128+
},
129+
"root": str(src.resolve()),
130+
"entries": [],
131+
}
132+
133+
data_chunks = bytearray()
134+
current_offset = 0
135+
136+
for e in entries:
137+
with open(e.abs_path, "rb") as f:
138+
content = f.read()
139+
start = current_offset
140+
data_chunks.extend(content)
141+
length = len(content)
142+
current_offset += length
143+
144+
manifest["entries"].append(
145+
{
146+
"path": e.rel_path,
147+
"size": e.size,
148+
"mtime": e.mtime,
149+
"offset": start,
150+
"length": length,
151+
}
152+
)
153+
154+
manifest_bytes = json.dumps(manifest, ensure_ascii=False).encode("utf-8")
155+
payload = manifest_bytes + PAYLOAD_SEPARATOR + bytes(data_chunks)
156+
157+
nonce, ciphertext = encrypt_aes_gcm(key, payload, aad=MAGIC)
158+
159+
header = ContainerHeader(
160+
version=VERSION,
161+
salt=salt,
162+
iterations=iterations,
163+
nonce=nonce,
164+
)
165+
header_bytes = _build_header_bytes(header)
166+
167+
with open(dst, "wb") as out:
168+
out.write(header_bytes)
169+
out.write(ciphertext)
170+
171+
172+
def _load_and_decrypt(container_path: str, password: str) -> Tuple[Dict[str, Any], bytes, ContainerHeader]:
173+
p = Path(container_path)
174+
if not p.exists() or not p.is_file():
175+
raise FileNotFoundError(container_path)
176+
177+
with open(p, "rb") as f:
178+
file_data = f.read()
179+
180+
header, ciphertext = _parse_header_bytes(file_data)
181+
182+
kdf_params = KdfParams(iterations=header.iterations, salt=header.salt)
183+
key = derive_key(password, kdf_params)
184+
185+
try:
186+
plaintext = decrypt_aes_gcm(key, header.nonce, ciphertext, aad=MAGIC)
187+
except InvalidTag as ex:
188+
raise WrongPasswordError("Decryption failed") from ex
189+
190+
try:
191+
manifest_part, data_part = plaintext.split(PAYLOAD_SEPARATOR, 1)
192+
except ValueError as ex:
193+
raise InvalidContainerError("Payload separator missing") from ex
194+
195+
try:
196+
manifest = json.loads(manifest_part.decode("utf-8"))
197+
except json.JSONDecodeError as ex:
198+
raise InvalidContainerError("Manifest JSON invalid") from ex
199+
200+
return manifest, data_part, header
201+
202+
203+
def decrypt_container(container_path: str, output_path: str, password: str) -> None:
204+
manifest, data_part, _header = _load_and_decrypt(container_path, password)
205+
206+
out_root = Path(output_path)
207+
out_root.mkdir(parents=True, exist_ok=True)
208+
209+
for entry in manifest.get("entries", []):
210+
rel_path = entry["path"]
211+
offset = entry["offset"]
212+
length = entry["length"]
213+
chunk = data_part[offset:offset + length]
214+
215+
target_path = out_root / rel_path
216+
target_path.parent.mkdir(parents=True, exist_ok=True)
217+
with open(target_path, "wb") as f:
218+
f.write(chunk)
219+
220+
221+
def list_container(container_path: str, password: str) -> List[Dict[str, Any]]:
222+
manifest, _data_part, _header = _load_and_decrypt(container_path, password)
223+
return manifest.get("entries", [])
224+
225+
226+
def verify_container(container_path: str, password: str) -> bool:
227+
try:
228+
manifest, data_part, _header = _load_and_decrypt(container_path, password)
229+
except WrongPasswordError:
230+
return False
231+
except SecureArchiveError:
232+
return False
233+
234+
entries = manifest.get("entries", [])
235+
max_offset = 0
236+
for e in entries:
237+
offset = e["offset"]
238+
length = e["length"]
239+
if offset < 0 or length < 0:
240+
return False
241+
end = offset + length
242+
if end > max_offset:
243+
max_offset = end
244+
245+
if max_offset > len(data_part):
246+
return False
247+
248+
return True
249+
250+
251+
def change_password(
252+
container_path: str,
253+
old_password: str,
254+
new_password: str,
255+
iterations: int | None = None,
256+
) -> None:
257+
manifest, data_part, old_header = _load_and_decrypt(container_path, old_password)
258+
259+
if iterations is None:
260+
iterations = old_header.iterations
261+
262+
new_salt = generate_salt(16)
263+
kdf_params = KdfParams(iterations=iterations, salt=new_salt)
264+
key = derive_key(new_password, kdf_params)
265+
266+
manifest["kdf"] = {
267+
"type": "PBKDF2-SHA512",
268+
"iterations": iterations,
269+
"salt_hex": new_salt.hex(),
270+
}
271+
272+
manifest_bytes = json.dumps(manifest, ensure_ascii=False).encode("utf-8")
273+
payload = manifest_bytes + PAYLOAD_SEPARATOR + data_part
274+
275+
nonce, ciphertext = encrypt_aes_gcm(key, payload, aad=MAGIC)
276+
277+
new_header = ContainerHeader(
278+
version=VERSION,
279+
salt=new_salt,
280+
iterations=iterations,
281+
nonce=nonce,
282+
)
283+
header_bytes = _build_header_bytes(new_header)
284+
285+
p = Path(container_path)
286+
with open(p, "wb") as out:
287+
out.write(header_bytes)
288+
out.write(ciphertext)

fsutil.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from pathlib import Path
2+
from dataclasses import dataclass
3+
from typing import List
4+
5+
@dataclass
6+
class FileEntry:
7+
rel_path: str
8+
abs_path: Path
9+
size: int
10+
mtime: float
11+
12+
def collect_entries(root: Path) -> List[FileEntry]:
13+
root = root.resolve()
14+
entries: List[FileEntry] = []
15+
16+
if not root.exists():
17+
return entries
18+
19+
if root.is_file():
20+
stat = root.stat()
21+
entries.append(
22+
FileEntry(
23+
rel_path=root.name,
24+
abs_path=root,
25+
size=stat.st_size,
26+
mtime=stat.st_mtime,
27+
)
28+
)
29+
return entries
30+
31+
for p in root.rglob("*"):
32+
if p.is_file():
33+
stat = p.stat()
34+
entries.append(
35+
FileEntry(
36+
rel_path=str(p.relative_to(root)),
37+
abs_path=p,
38+
size=stat.st_size,
39+
mtime=stat.st_mtime,
40+
)
41+
)
42+
return entries

0 commit comments

Comments
 (0)