Skip to content

Commit d446827

Browse files
committed
wip
1 parent ba86f48 commit d446827

2 files changed

Lines changed: 209 additions & 0 deletions

File tree

src/elevenlabs/webhooks.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import json
2+
import hmac
3+
import hashlib
4+
import time
5+
from typing import Any, Dict
6+
7+
from .errors import BadRequestError
8+
9+
10+
class WebhooksClient:
11+
"""
12+
A client to handle ElevenLabs webhook-related functionality
13+
"""
14+
15+
def construct_event(self, payload: Any, sig_header: str, secret: str) -> Dict:
16+
"""
17+
Constructs a webhook event object from a payload and signature.
18+
Verifies the webhook signature to ensure the event came from ElevenLabs.
19+
20+
Args:
21+
payload: The webhook payload (request body)
22+
sig_header: The signature header from the request
23+
secret: Your webhook secret
24+
25+
Returns:
26+
The verified webhook event
27+
28+
Raises:
29+
BadRequestError: If the signature is invalid or missing
30+
"""
31+
body = json.dumps(payload)
32+
33+
if not sig_header:
34+
raise BadRequestError(body="Missing signature header")
35+
36+
headers = sig_header.split(',')
37+
timestamp = None
38+
signature = None
39+
40+
for header in headers:
41+
if header.startswith('t='):
42+
timestamp = header[2:]
43+
elif header.startswith('v0='):
44+
signature = header
45+
46+
if not timestamp or not signature:
47+
raise BadRequestError(body="No signature hash found with expected scheme v0")
48+
49+
# Validate timestamp
50+
req_timestamp = int(timestamp) * 1000
51+
tolerance = int(time.time() * 1000) - 30 * 60 * 1000
52+
if req_timestamp < tolerance:
53+
raise BadRequestError(body="Timestamp outside the tolerance zone")
54+
55+
# Validate hash
56+
message = f"{timestamp}.{body}"
57+
58+
if not secret:
59+
raise BadRequestError(body="Webhook secret not configured")
60+
61+
digest = "v0=" + hmac.new(
62+
secret.encode('utf-8'),
63+
message.encode('utf-8'),
64+
hashlib.sha256
65+
).hexdigest()
66+
67+
if signature != digest:
68+
raise BadRequestError(
69+
body="Signature hash does not match the expected signature hash for payload"
70+
)
71+
72+
return json.loads(body)

tests/test_webhooks.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import json
2+
import time
3+
import hmac
4+
import hashlib
5+
from unittest import mock
6+
7+
from elevenlabs.client import ElevenLabs
8+
from elevenlabs.errors import BadRequestError
9+
10+
import pytest
11+
12+
13+
def test_construct_event_valid_signature():
14+
"""Test webhook event construction with valid signature."""
15+
# Setup
16+
client = ElevenLabs()
17+
webhook_secret = "test_secret"
18+
payload = {"event_type": "speech.completed", "id": "123456"}
19+
20+
# Create a valid signature
21+
body = json.dumps(payload)
22+
timestamp = str(int(time.time()))
23+
message = f"{timestamp}.{body}"
24+
signature = "v0=" + hmac.new(
25+
webhook_secret.encode('utf-8'),
26+
message.encode('utf-8'),
27+
hashlib.sha256
28+
).hexdigest()
29+
sig_header = f"t={timestamp},{signature}"
30+
31+
# Verify event construction
32+
event = client.webhooks.construct_event(payload, sig_header, webhook_secret)
33+
assert event == payload, "Event should match the original payload"
34+
35+
36+
def test_construct_event_missing_signature():
37+
"""Test webhook event construction with missing signature header."""
38+
client = ElevenLabs()
39+
webhook_secret = "test_secret"
40+
payload = {"event_type": "speech.completed", "id": "123456"}
41+
42+
with pytest.raises(BadRequestError) as excinfo:
43+
client.webhooks.construct_event(payload, "", webhook_secret)
44+
45+
assert "Missing signature header" in str(excinfo.value)
46+
47+
48+
def test_construct_event_invalid_signature_format():
49+
"""Test webhook event construction with invalid signature format."""
50+
client = ElevenLabs()
51+
webhook_secret = "test_secret"
52+
payload = {"event_type": "speech.completed", "id": "123456"}
53+
sig_header = "invalid_format"
54+
55+
with pytest.raises(BadRequestError) as excinfo:
56+
client.webhooks.construct_event(payload, sig_header, webhook_secret)
57+
58+
assert "No signature hash found with expected scheme v0" in str(excinfo.value)
59+
60+
61+
def test_construct_event_expired_timestamp():
62+
"""Test webhook event construction with expired timestamp."""
63+
client = ElevenLabs()
64+
webhook_secret = "test_secret"
65+
payload = {"event_type": "speech.completed", "id": "123456"}
66+
67+
# Create an expired timestamp (31 minutes old)
68+
expired_time = int(time.time()) - 31 * 60
69+
timestamp = str(expired_time)
70+
71+
body = json.dumps(payload)
72+
message = f"{timestamp}.{body}"
73+
signature = "v0=" + hmac.new(
74+
webhook_secret.encode('utf-8'),
75+
message.encode('utf-8'),
76+
hashlib.sha256
77+
).hexdigest()
78+
sig_header = f"t={timestamp},{signature}"
79+
80+
with pytest.raises(BadRequestError) as excinfo:
81+
client.webhooks.construct_event(payload, sig_header, webhook_secret)
82+
83+
assert "Timestamp outside the tolerance zone" in str(excinfo.value)
84+
85+
86+
def test_construct_event_invalid_signature():
87+
"""Test webhook event construction with invalid signature."""
88+
client = ElevenLabs()
89+
webhook_secret = "test_secret"
90+
payload = {"event_type": "speech.completed", "id": "123456"}
91+
92+
timestamp = str(int(time.time()))
93+
sig_header = f"t={timestamp},v0=invalid_signature"
94+
95+
with pytest.raises(BadRequestError) as excinfo:
96+
client.webhooks.construct_event(payload, sig_header, webhook_secret)
97+
98+
assert "Signature hash does not match" in str(excinfo.value)
99+
100+
101+
def test_construct_event_missing_secret():
102+
"""Test webhook event construction with missing secret."""
103+
client = ElevenLabs()
104+
payload = {"event_type": "speech.completed", "id": "123456"}
105+
106+
timestamp = str(int(time.time()))
107+
sig_header = f"t={timestamp},v0=some_signature"
108+
109+
with pytest.raises(BadRequestError) as excinfo:
110+
client.webhooks.construct_event(payload, sig_header, "")
111+
112+
assert "Webhook secret not configured" in str(excinfo.value)
113+
114+
115+
@mock.patch('time.time')
116+
def test_construct_event_mocked_time(mock_time):
117+
"""Test webhook event construction with mocked time."""
118+
mock_time.return_value = 1600000000
119+
120+
client = ElevenLabs()
121+
webhook_secret = "test_secret"
122+
payload = {"event_type": "speech.completed", "id": "123456"}
123+
124+
# Create a valid signature with fixed timestamp
125+
body = json.dumps(payload)
126+
timestamp = "1600000000"
127+
message = f"{timestamp}.{body}"
128+
signature = "v0=" + hmac.new(
129+
webhook_secret.encode('utf-8'),
130+
message.encode('utf-8'),
131+
hashlib.sha256
132+
).hexdigest()
133+
sig_header = f"t={timestamp},{signature}"
134+
135+
# Verify event construction
136+
event = client.webhooks.construct_event(payload, sig_header, webhook_secret)
137+
assert event == payload, "Event should match the original payload"

0 commit comments

Comments
 (0)