-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpackaging.py
More file actions
191 lines (157 loc) · 6.63 KB
/
packaging.py
File metadata and controls
191 lines (157 loc) · 6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""PackagingService interface."""
import hashlib
from dataclasses import dataclass
from typing import Generic, Literal, Optional, Sequence, Tuple, Union
from pydid import DIDUrl, VerificationMethod
from didcomm_messaging.crypto import CryptoService, P, S, SecretsManager
from didcomm_messaging.crypto.jwe import JweEnvelope, b64url, from_b64url
from didcomm_messaging.resolver import DIDResolver
@dataclass
class PackedMessageMetadata(Generic[S]):
"""Unpack result."""
wrapper: JweEnvelope
method: Literal["ECDH-ES", "ECDH-1PU"]
recip_key: S
sender_kid: Optional[str]
class PackagingServiceError(Exception):
"""Represents an error from the DIDComm Messaging interface."""
class PackagingService(Generic[P, S]):
"""DIDComm Messaging interface."""
async def extract_packed_message_metadata( # noqa: C901
self, enc_message: Union[str, bytes], secrets: SecretsManager[S]
) -> PackedMessageMetadata:
"""Extract metadata from a packed DIDComm message."""
try:
wrapper = JweEnvelope.from_json(enc_message)
except ValueError:
raise PackagingServiceError("Invalid packed message")
alg = wrapper.protected.get("alg")
if not alg:
raise PackagingServiceError("Missing alg header")
method: Literal["ECDH-1PU", "ECDH-ES"] | None = next(
(m for m in ("ECDH-1PU", "ECDH-ES") if m in alg), None
)
if not method:
raise PackagingServiceError(
f"Unsupported DIDComm encryption algorithm: {alg}"
)
sender_kid = None
recip_key = None
for kid in wrapper.recipient_key_ids:
recip_key = await secrets.get_secret_by_kid(kid)
if recip_key:
break
if not recip_key:
raise PackagingServiceError("No recognized recipient key")
expected_apv = b64url(
hashlib.sha256((".".join(wrapper.recipient_key_ids)).encode()).digest()
)
apv = wrapper.protected.get("apv")
if not apv:
raise PackagingServiceError("Missing apv header")
if apv != expected_apv:
raise PackagingServiceError("Invalid apv value")
if method == "ECDH-1PU":
sender_kid_apu = None
apu = wrapper.protected.get("apu")
if not apu:
raise PackagingServiceError("Missing apu header")
try:
sender_kid_apu = from_b64url(apu).decode("utf-8")
except (UnicodeDecodeError, ValueError):
raise PackagingServiceError("Invalid apu value")
sender_kid = wrapper.protected.get("skid") or sender_kid_apu
if sender_kid != sender_kid_apu:
raise PackagingServiceError("Mismatch between skid and apu")
if not sender_kid:
raise PackagingServiceError("Sender key ID not provided")
return PackedMessageMetadata(wrapper, method, recip_key, sender_kid)
async def unpack(
self,
crypto: CryptoService[P, S],
resolver: DIDResolver,
secrets: SecretsManager[S],
enc_message: Union[str, bytes],
) -> Tuple[bytes, PackedMessageMetadata]:
"""Unpack a DIDComm message."""
metadata = await self.extract_packed_message_metadata(enc_message, secrets)
if metadata.method == "ECDH-ES":
return (
await crypto.ecdh_es_decrypt(enc_message, metadata.recip_key),
metadata,
)
if not metadata.sender_kid:
raise PackagingServiceError("Missing sender key ID")
sender_vm = await resolver.resolve_and_dereference_verification_method(
metadata.sender_kid
)
sender_key = crypto.verification_method_to_public_key(sender_vm)
return (
await crypto.ecdh_1pu_decrypt(enc_message, metadata.recip_key, sender_key),
metadata,
)
async def recip_for_kid_or_default_for_did(
self, crypto: CryptoService[P, S], resolver: DIDResolver, kid_or_did: str
) -> P:
"""Resolve a verification method for a kid or return default recip."""
if "#" in kid_or_did:
vm = await resolver.resolve_and_dereference_verification_method(kid_or_did)
else:
doc = await resolver.resolve_and_parse(kid_or_did)
if not doc.key_agreement:
raise PackagingServiceError(
"No key agreement methods found; cannot determine recipient"
)
default = doc.key_agreement[0]
if isinstance(default, DIDUrl):
vm = doc.dereference(default)
if not isinstance(vm, VerificationMethod):
raise PackagingServiceError(
f"Expected verification method, found: {type(vm)}"
)
else:
vm = default
return crypto.verification_method_to_public_key(vm)
async def default_sender_kid_for_did(self, resolver: DIDResolver, did: str) -> str:
"""Determine the kid of the default sender key for a DID."""
if "#" in did:
return did
doc = await resolver.resolve_and_parse(did)
if not doc.key_agreement:
raise PackagingServiceError(
"No key agreement methods found; cannot determine recipient"
)
default = doc.key_agreement[0]
if isinstance(default, DIDUrl):
vm = doc.dereference(default)
if not isinstance(vm, VerificationMethod):
raise PackagingServiceError(
f"Expected verification method, found: {type(vm)}"
)
else:
vm = default
if not vm.id.did:
return vm.id.as_absolute(vm.controller)
return vm.id
async def pack(
self,
crypto: CryptoService[P, S],
resolver: DIDResolver,
secrets: SecretsManager[S],
message: bytes,
to: Sequence[str],
frm: Optional[str] = None,
):
"""Pack a DIDComm message."""
recip_keys = [
await self.recip_for_kid_or_default_for_did(crypto, resolver, kid)
for kid in to
]
sender_kid = await self.default_sender_kid_for_did(resolver, frm) if frm else None
sender_key = await secrets.get_secret_by_kid(sender_kid) if sender_kid else None
if frm and not sender_key:
raise PackagingServiceError("No sender key found")
if sender_key:
return await crypto.ecdh_1pu_encrypt(recip_keys, sender_key, message)
else:
return await crypto.ecdh_es_encrypt(recip_keys, message)