diff --git a/src/elevenlabs/client.py b/src/elevenlabs/client.py index 45552a70..d204c52a 100644 --- a/src/elevenlabs/client.py +++ b/src/elevenlabs/client.py @@ -17,6 +17,7 @@ from .environment import ElevenLabsEnvironment from .realtime_tts import RealtimeTextToSpeechClient from .types import OutputFormat +from .webhooks import WebhooksClient DEFAULT_VOICE = Voice( @@ -111,17 +112,16 @@ def __init__( httpx_client: typing.Optional[httpx.Client] = None ): super().__init__( - environment=base_url - and ElevenLabsEnvironment( + environment=ElevenLabsEnvironment( base=f"https://{get_base_url_host(base_url)}", wss=f"wss://{get_base_url_host(base_url)}", - ) - or environment, + ) if base_url else environment, api_key=api_key, timeout=timeout, httpx_client=httpx_client ) self.text_to_speech = RealtimeTextToSpeechClient(client_wrapper=self._client_wrapper) + self.webhooks = WebhooksClient() @deprecated def clone( @@ -303,6 +303,28 @@ class AsyncElevenLabs(AsyncBaseElevenLabs): ) """ + def __init__( + self, + *, + base_url: typing.Optional[str] = None, + environment: ElevenLabsEnvironment = ElevenLabsEnvironment.PRODUCTION, + api_key: typing.Optional[str] = os.getenv("ELEVENLABS_API_KEY"), + timeout: typing.Optional[float] = 60, + httpx_client: typing.Optional[httpx.AsyncClient] = None + ): + super().__init__( + environment=base_url + and ElevenLabsEnvironment( + base=f"https://{get_base_url_host(base_url)}", + wss=f"wss://{get_base_url_host(base_url)}", + ) + or environment, + api_key=api_key, + timeout=timeout, + httpx_client=httpx_client + ) + self.webhooks = WebhooksClient() + @deprecated_async async def clone( self, diff --git a/src/elevenlabs/webhooks.py b/src/elevenlabs/webhooks.py new file mode 100644 index 00000000..dcc707c0 --- /dev/null +++ b/src/elevenlabs/webhooks.py @@ -0,0 +1,70 @@ +import json +import hmac +import hashlib +import time +from typing import Any, Dict +from .errors import BadRequestError + + +class WebhooksClient: + """ + A client to handle ElevenLabs webhook-related functionality + """ + + def construct_event(self, rawBody: str, sig_header: str, secret: str) -> Dict: + """ + Constructs a webhook event object from a payload and signature. + Verifies the webhook signature to ensure the event came from ElevenLabs. + + Args: + rawBody: The webhook request body. Must be the raw body, not a JSON object + sig_header: The signature header from the request + secret: Your webhook secret + + Returns: + The verified webhook event + + Raises: + BadRequestError: If the signature is invalid or missing + """ + + if not sig_header: + raise BadRequestError(body="Missing signature header") + + if not secret: + raise BadRequestError(body="Webhook secret not configured") + + headers = sig_header.split(',') + timestamp = None + signature = None + + for header in headers: + if header.startswith('t='): + timestamp = header[2:] + elif header.startswith('v0='): + signature = header + + if not timestamp or not signature: + raise BadRequestError(body="No signature hash found with expected scheme v0") + + # Validate timestamp + req_timestamp = int(timestamp) * 1000 + tolerance = int(time.time() * 1000) - 30 * 60 * 1000 + if req_timestamp < tolerance: + raise BadRequestError(body="Timestamp outside the tolerance zone") + + # Validate hash + message = f"{timestamp}.{rawBody}" + + digest = "v0=" + hmac.new( + secret.encode('utf-8'), + message.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + + if signature != digest: + raise BadRequestError( + body="Signature hash does not match the expected signature hash for payload" + ) + + return json.loads(rawBody) diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py new file mode 100644 index 00000000..7282fdd8 --- /dev/null +++ b/tests/test_webhooks.py @@ -0,0 +1,140 @@ +import json +import time +import hmac +import hashlib +from unittest import mock + +from elevenlabs.client import ElevenLabs +from elevenlabs.errors import BadRequestError + +import pytest + + +def test_construct_event_valid_signature(): + """Test webhook event construction with valid signature.""" + # Setup + client = ElevenLabs() + webhook_secret = "test_secret" + payload = {"event_type": "speech.completed", "id": "123456"} + + # Create a valid signature + body = json.dumps(payload) + timestamp = str(int(time.time())) + message = f"{timestamp}.{body}" + signature = "v0=" + hmac.new( + webhook_secret.encode('utf-8'), + message.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + sig_header = f"t={timestamp},{signature}" + + # Verify event construction + event = client.webhooks.construct_event(body, sig_header, webhook_secret) + assert event == payload, "Event should match the original payload" + + +def test_construct_event_missing_signature(): + """Test webhook event construction with missing signature header.""" + client = ElevenLabs() + webhook_secret = "test_secret" + payload = {"event_type": "speech.completed", "id": "123456"} + + with pytest.raises(BadRequestError) as excinfo: + client.webhooks.construct_event(payload, "", webhook_secret) + + assert "Missing signature header" in str(excinfo.value) + + +def test_construct_event_invalid_signature_format(): + """Test webhook event construction with invalid signature format.""" + client = ElevenLabs() + webhook_secret = "test_secret" + payload = {"event_type": "speech.completed", "id": "123456"} + body = json.dumps(payload) + sig_header = "invalid_format" + + with pytest.raises(BadRequestError) as excinfo: + client.webhooks.construct_event(body, sig_header, webhook_secret) + + assert "No signature hash found with expected scheme v0" in str(excinfo.value) + + +def test_construct_event_expired_timestamp(): + """Test webhook event construction with expired timestamp.""" + client = ElevenLabs() + webhook_secret = "test_secret" + payload = {"event_type": "speech.completed", "id": "123456"} + + # Create an expired timestamp (31 minutes old) + expired_time = int(time.time()) - 31 * 60 + timestamp = str(expired_time) + + body = json.dumps(payload) + message = f"{timestamp}.{body}" + signature = "v0=" + hmac.new( + webhook_secret.encode('utf-8'), + message.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + sig_header = f"t={timestamp},{signature}" + + with pytest.raises(BadRequestError) as excinfo: + client.webhooks.construct_event(body, sig_header, webhook_secret) + + assert "Timestamp outside the tolerance zone" in str(excinfo.value) + + +def test_construct_event_invalid_signature(): + """Test webhook event construction with invalid signature.""" + client = ElevenLabs() + webhook_secret = "test_secret" + payload = {"event_type": "speech.completed", "id": "123456"} + body = json.dumps(payload) + + timestamp = str(int(time.time())) + sig_header = f"t={timestamp},v0=invalid_signature" + + with pytest.raises(BadRequestError) as excinfo: + client.webhooks.construct_event(body, sig_header, webhook_secret) + + assert "Signature hash does not match" in str(excinfo.value) + + +def test_construct_event_missing_secret(): + """Test webhook event construction with missing secret.""" + client = ElevenLabs() + payload = {"event_type": "speech.completed", "id": "123456"} + body = json.dumps(payload) + + timestamp = str(int(time.time())) + sig_header = f"t={timestamp},v0=some_signature" + + with pytest.raises(BadRequestError) as excinfo: + client.webhooks.construct_event(body, sig_header, "") + + assert "Webhook secret not configured" in str(excinfo.value) + + +@mock.patch('time.time') +def test_construct_event_mocked_time(mock_time): + """Test webhook event construction with mocked time.""" + mock_time.return_value = 1600000000 + + client = ElevenLabs() + webhook_secret = "test_secret" + payload = {"event_type": "speech.completed", "id": "123456"} + + # Create a valid signature with fixed timestamp + body = json.dumps(payload) + timestamp = "1600000000" + message = f"{timestamp}.{body}" + signature = "v0=" + hmac.new( + webhook_secret.encode('utf-8'), + message.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + sig_header = f"t={timestamp},{signature}" + + # Verify event construction + event = client.webhooks.construct_event(body, sig_header, webhook_secret) + assert event == payload, "Event should match the original payload"