Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions src/elevenlabs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .environment import ElevenLabsEnvironment
from .realtime_tts import RealtimeTextToSpeechClient
from .types import OutputFormat
from .webhooks import WebhooksClient


DEFAULT_VOICE = Voice(
Expand Down Expand Up @@ -111,17 +112,16 @@ def __init__(
httpx_client: typing.Optional[httpx.Client] = None
):
super().__init__(
environment=base_url
and ElevenLabsEnvironment(
environment=ElevenLabsEnvironment(
base=f"https://{get_base_url_host(base_url)}",
wss=f"wss://{get_base_url_host(base_url)}",
)
or environment,
) if base_url else environment,
api_key=api_key,
timeout=timeout,
httpx_client=httpx_client
)
self.text_to_speech = RealtimeTextToSpeechClient(client_wrapper=self._client_wrapper)
self.webhooks = WebhooksClient()

@deprecated
def clone(
Expand Down Expand Up @@ -303,6 +303,28 @@ class AsyncElevenLabs(AsyncBaseElevenLabs):
)
"""

def __init__(
self,
*,
base_url: typing.Optional[str] = None,
environment: ElevenLabsEnvironment = ElevenLabsEnvironment.PRODUCTION,
api_key: typing.Optional[str] = os.getenv("ELEVENLABS_API_KEY"),
timeout: typing.Optional[float] = 60,
httpx_client: typing.Optional[httpx.AsyncClient] = None
):
super().__init__(
environment=base_url
and ElevenLabsEnvironment(
base=f"https://{get_base_url_host(base_url)}",
wss=f"wss://{get_base_url_host(base_url)}",
)
or environment,
api_key=api_key,
Comment thread
PaulAsjes marked this conversation as resolved.
timeout=timeout,
httpx_client=httpx_client
)
self.webhooks = WebhooksClient()

@deprecated_async
async def clone(
self,
Expand Down
70 changes: 70 additions & 0 deletions src/elevenlabs/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import json
import hmac
import hashlib
import time
from typing import Any, Dict
from .errors import BadRequestError


class WebhooksClient:
"""
A client to handle ElevenLabs webhook-related functionality
"""

def construct_event(self, rawBody: str, sig_header: str, secret: str) -> Dict:
"""
Constructs a webhook event object from a payload and signature.
Verifies the webhook signature to ensure the event came from ElevenLabs.

Args:
rawBody: The webhook request body. Must be the raw body, not a JSON object
sig_header: The signature header from the request
secret: Your webhook secret

Returns:
The verified webhook event

Raises:
BadRequestError: If the signature is invalid or missing
"""

if not sig_header:
raise BadRequestError(body="Missing signature header")

if not secret:
raise BadRequestError(body="Webhook secret not configured")

headers = sig_header.split(',')
timestamp = None
signature = None

for header in headers:
if header.startswith('t='):
timestamp = header[2:]
elif header.startswith('v0='):
signature = header

if not timestamp or not signature:
raise BadRequestError(body="No signature hash found with expected scheme v0")

# Validate timestamp
req_timestamp = int(timestamp) * 1000
tolerance = int(time.time() * 1000) - 30 * 60 * 1000
if req_timestamp < tolerance:
raise BadRequestError(body="Timestamp outside the tolerance zone")

# Validate hash
message = f"{timestamp}.{rawBody}"

digest = "v0=" + hmac.new(
secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()

if signature != digest:
raise BadRequestError(
body="Signature hash does not match the expected signature hash for payload"
)

return json.loads(rawBody)
140 changes: 140 additions & 0 deletions tests/test_webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import json
import time
import hmac
import hashlib
from unittest import mock

from elevenlabs.client import ElevenLabs
from elevenlabs.errors import BadRequestError

import pytest


def test_construct_event_valid_signature():
"""Test webhook event construction with valid signature."""
# Setup
client = ElevenLabs()
webhook_secret = "test_secret"
payload = {"event_type": "speech.completed", "id": "123456"}

# Create a valid signature
body = json.dumps(payload)
timestamp = str(int(time.time()))
message = f"{timestamp}.{body}"
signature = "v0=" + hmac.new(
webhook_secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
sig_header = f"t={timestamp},{signature}"

# Verify event construction
event = client.webhooks.construct_event(body, sig_header, webhook_secret)
assert event == payload, "Event should match the original payload"


def test_construct_event_missing_signature():
"""Test webhook event construction with missing signature header."""
client = ElevenLabs()
webhook_secret = "test_secret"
payload = {"event_type": "speech.completed", "id": "123456"}

with pytest.raises(BadRequestError) as excinfo:
client.webhooks.construct_event(payload, "", webhook_secret)

assert "Missing signature header" in str(excinfo.value)


def test_construct_event_invalid_signature_format():
"""Test webhook event construction with invalid signature format."""
client = ElevenLabs()
webhook_secret = "test_secret"
payload = {"event_type": "speech.completed", "id": "123456"}
body = json.dumps(payload)
sig_header = "invalid_format"

with pytest.raises(BadRequestError) as excinfo:
client.webhooks.construct_event(body, sig_header, webhook_secret)

assert "No signature hash found with expected scheme v0" in str(excinfo.value)


def test_construct_event_expired_timestamp():
"""Test webhook event construction with expired timestamp."""
client = ElevenLabs()
webhook_secret = "test_secret"
payload = {"event_type": "speech.completed", "id": "123456"}

# Create an expired timestamp (31 minutes old)
expired_time = int(time.time()) - 31 * 60
timestamp = str(expired_time)

body = json.dumps(payload)
message = f"{timestamp}.{body}"
signature = "v0=" + hmac.new(
webhook_secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
sig_header = f"t={timestamp},{signature}"

with pytest.raises(BadRequestError) as excinfo:
client.webhooks.construct_event(body, sig_header, webhook_secret)

assert "Timestamp outside the tolerance zone" in str(excinfo.value)


def test_construct_event_invalid_signature():
"""Test webhook event construction with invalid signature."""
client = ElevenLabs()
webhook_secret = "test_secret"
payload = {"event_type": "speech.completed", "id": "123456"}
body = json.dumps(payload)

timestamp = str(int(time.time()))
sig_header = f"t={timestamp},v0=invalid_signature"

with pytest.raises(BadRequestError) as excinfo:
client.webhooks.construct_event(body, sig_header, webhook_secret)

assert "Signature hash does not match" in str(excinfo.value)


def test_construct_event_missing_secret():
"""Test webhook event construction with missing secret."""
client = ElevenLabs()
payload = {"event_type": "speech.completed", "id": "123456"}
body = json.dumps(payload)

timestamp = str(int(time.time()))
sig_header = f"t={timestamp},v0=some_signature"

with pytest.raises(BadRequestError) as excinfo:
client.webhooks.construct_event(body, sig_header, "")

assert "Webhook secret not configured" in str(excinfo.value)


@mock.patch('time.time')
def test_construct_event_mocked_time(mock_time):
"""Test webhook event construction with mocked time."""
mock_time.return_value = 1600000000

client = ElevenLabs()
webhook_secret = "test_secret"
payload = {"event_type": "speech.completed", "id": "123456"}

# Create a valid signature with fixed timestamp
body = json.dumps(payload)
timestamp = "1600000000"
message = f"{timestamp}.{body}"
signature = "v0=" + hmac.new(
webhook_secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
sig_header = f"t={timestamp},{signature}"

# Verify event construction
event = client.webhooks.construct_event(body, sig_header, webhook_secret)
assert event == payload, "Event should match the original payload"