Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions src/elevenlabs/music_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@
logger = logging.getLogger(__name__)


def _find_audio_start(data: bytes, start: int) -> int:
"""Find the blank line separating headers from body, returning the index of the body start."""
crlf_pos = data.find(b"\r\n\r\n", start)
if crlf_pos != -1:
return crlf_pos + 4

lf_pos = data.find(b"\n\n", start)
if lf_pos != -1:
return lf_pos + 2

raise ValueError("Could not find header/body separator in audio part")


@dataclass
class SongMetadata:
title: str
Expand Down Expand Up @@ -145,16 +158,7 @@ def _parse_multipart(self, stream: typing.Iterator[bytes]) -> MultipartResponse:
raise ValueError('Could not find audio part boundary')

# Find the start of audio data (after headers and empty line)
audio_start = second_boundary + len(boundary_bytes)

# Skip past the headers to find the empty line (\n\n)
while audio_start < len(response_bytes) - 1:
if (response_bytes[audio_start] == 0x0A and
response_bytes[audio_start + 1] == 0x0A):
# Found \n\n - audio starts after this
audio_start += 2
break
audio_start += 1
audio_start = _find_audio_start(response_bytes, second_boundary + len(boundary_bytes))

# Audio goes until the end (or until we find another boundary)
audio_buffer = response_bytes[audio_start:]
Expand Down Expand Up @@ -281,16 +285,7 @@ async def _parse_multipart_async(self, stream: typing.AsyncIterator[bytes]) -> M
raise ValueError('Could not find audio part boundary')

# Find the start of audio data (after headers and empty line)
audio_start = second_boundary + len(boundary_bytes)

# Skip past the headers to find the empty line (\n\n)
while audio_start < len(response_bytes) - 1:
if (response_bytes[audio_start] == 0x0A and
response_bytes[audio_start + 1] == 0x0A):
# Found \n\n - audio starts after this
audio_start += 2
break
audio_start += 1
audio_start = _find_audio_start(response_bytes, second_boundary + len(boundary_bytes))

# Audio goes until the end (or until we find another boundary)
audio_buffer = response_bytes[audio_start:]
Expand Down