Skip to content

Commit 8e36536

Browse files
committed
feat(example): add authlib file-based secrets example
Add an example demonstrating use of FileBasedSecretsManager with the authlib crypto backend. Shows key generation, did:jwk creation, pack/unpack round-trip, and re-loading secrets from file. Also adds pytest collector to run example scripts as tests. Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
1 parent a88f016 commit 8e36536

2 files changed

Lines changed: 163 additions & 0 deletions

File tree

examples/authlib_file_secrets.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""Example of using authlib crypto with file-based secrets storage."""
2+
3+
import asyncio
4+
import json
5+
import tempfile
6+
from pathlib import Path
7+
8+
from authlib.jose import OKPKey
9+
10+
from didcomm_messaging.crypto.backend.authlib import (
11+
AuthlibCryptoService,
12+
AuthlibSecretKey,
13+
)
14+
from didcomm_messaging.crypto.backend.basic import FileBasedSecretsManager
15+
from didcomm_messaging.multiformats.multibase import Base64UrlEncoder
16+
from didcomm_messaging.packaging import PackagingService
17+
from didcomm_messaging.resolver import PrefixResolver
18+
from didcomm_messaging.resolver.jwk import JWKResolver
19+
20+
b64 = Base64UrlEncoder()
21+
22+
23+
def create_jwk_did(jwk: dict) -> str:
24+
"""Create a did:jwk from a JWK dict."""
25+
encoded = b64.encode(json.dumps(jwk).encode())
26+
return f"did:jwk:{encoded}"
27+
28+
29+
async def main():
30+
"""Run the example."""
31+
# Create a temporary file for secrets storage
32+
secrets_file = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False)
33+
secrets_path = secrets_file.name
34+
secrets_file.close()
35+
36+
# Serializer: Convert AuthlibSecretKey to JWK dict
37+
def serialize_secret(secret: AuthlibSecretKey) -> dict:
38+
return secret.key.as_dict(is_private=True)
39+
40+
# Deserializer: Convert JWK dict back to AuthlibSecretKey
41+
def deserialize_secret(kid: str, data: dict) -> AuthlibSecretKey:
42+
key = OKPKey.import_key(data)
43+
return AuthlibSecretKey(key, kid)
44+
45+
# Create the file-based secrets manager
46+
secrets = FileBasedSecretsManager(secrets_path, serialize_secret, deserialize_secret)
47+
48+
# Generate keys for sender and recipient
49+
# Using X25519 for both since it supports key agreement (encryption)
50+
sender_sk = OKPKey.generate_key("X25519", is_private=True)
51+
recipient_sk = OKPKey.generate_key("X25519", is_private=True)
52+
53+
# Get JWKs and create DIDs
54+
# For 1PU (authenticated encryption), we need key agreement keys
55+
sender_jwk = {**sender_sk.as_dict(), "use": "enc"}
56+
recipient_jwk = {**recipient_sk.as_dict(), "use": "enc"}
57+
58+
sender_did = create_jwk_did(sender_jwk)
59+
recipient_did = create_jwk_did(recipient_jwk)
60+
61+
# Add keys to secrets manager with proper kids
62+
sender_secret = AuthlibSecretKey(sender_sk, f"{sender_did}#0")
63+
recipient_secret = AuthlibSecretKey(recipient_sk, f"{recipient_did}#0")
64+
65+
await secrets.add_secret(sender_secret)
66+
await secrets.add_secret(recipient_secret)
67+
68+
# Set up crypto and resolver
69+
crypto = AuthlibCryptoService()
70+
resolver = PrefixResolver({"did:jwk": JWKResolver()})
71+
packer = PackagingService()
72+
73+
message = b"Hello, secure world!"
74+
75+
# Pack the message using authenticated encryption (ECDH-1PU)
76+
# Requires both sender and recipient to have key agreement keys
77+
packed = await packer.pack(
78+
crypto=crypto,
79+
resolver=resolver,
80+
secrets=secrets,
81+
message=message,
82+
to=[recipient_did],
83+
frm=sender_did,
84+
)
85+
print("Packed message:")
86+
print(json.dumps(json.loads(packed), indent=2))
87+
88+
# Flush secrets to file
89+
await secrets.flush()
90+
91+
# Show the contents of the secrets file
92+
print("\nSecrets file contents:")
93+
with open(secrets_path) as f:
94+
for line in f:
95+
print(line.strip())
96+
97+
# Create a new secrets manager that loads from the file
98+
# This exercises the deserializer
99+
print("\n--- Creating new secrets manager from file ---")
100+
secrets2 = FileBasedSecretsManager(secrets_path, serialize_secret, deserialize_secret)
101+
102+
# Unpack the message using the newly loaded secrets
103+
plaintext, metadata = await packer.unpack(
104+
crypto=crypto,
105+
resolver=resolver,
106+
secrets=secrets2,
107+
enc_message=packed,
108+
)
109+
print("\nUnpacked message:")
110+
print(plaintext)
111+
112+
# Verify the message matches
113+
assert plaintext == message
114+
print("\nSuccess! Round-trip completed with deserialized secrets.")
115+
116+
# Clean up
117+
Path(secrets_path).unlink()
118+
119+
120+
if __name__ == "__main__":
121+
asyncio.run(main())

examples/conftest.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import subprocess
2+
from pathlib import Path
3+
4+
import pytest
5+
6+
7+
def pytest_collect_file(parent, file_path: Path):
8+
"""Collect Python files in examples/ as test items."""
9+
if file_path.suffix == ".py" and file_path.name not in ("conftest.py",):
10+
return ExampleFile.from_parent(parent, path=file_path)
11+
12+
13+
class ExampleFile(pytest.File):
14+
"""pytest collector for example scripts."""
15+
16+
def collect(self):
17+
yield ExampleItem.from_parent(self, name=self.path.stem)
18+
19+
20+
class ExampleItem(pytest.Item):
21+
"""pytest item that runs an example script."""
22+
23+
def runtest(self):
24+
result = subprocess.run(
25+
["python", str(self.path)],
26+
capture_output=True,
27+
text=True,
28+
cwd=self.path.parent,
29+
)
30+
if result.returncode != 0:
31+
raise ExampleFailedError(
32+
f"Example {self.name} failed with code {result.returncode}\n"
33+
f"stdout: {result.stdout}\n"
34+
f"stderr: {result.stderr}"
35+
)
36+
37+
def reportinfo(self):
38+
return self.path, 0, f"Example: {self.path.name}"
39+
40+
41+
class ExampleFailedError(Exception):
42+
"""Raised when an example script fails to run."""

0 commit comments

Comments
 (0)