From cdab89782f2d578f2d4d7e0d78bab10ccdcb1a13 Mon Sep 17 00:00:00 2001 From: Lukas Pastuszek Date: Thu, 5 Mar 2026 15:49:32 +0100 Subject: [PATCH 1/2] Fix finding body start --- src/elevenlabs/music_custom.py | 35 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/src/elevenlabs/music_custom.py b/src/elevenlabs/music_custom.py index 4897aeb5..09fc9e17 100644 --- a/src/elevenlabs/music_custom.py +++ b/src/elevenlabs/music_custom.py @@ -16,6 +16,19 @@ logger = logging.getLogger(__name__) +def _find_audio_start(data: bytes, start: int) -> int: + """Find the blank line separating headers from body, returning the index of the body start.""" + crlf_pos = data.find(b"\r\n\r\n", start) + if crlf_pos != -1: + return crlf_pos + 4 + + lf_pos = data.find(b"\n\n", start) + if lf_pos != -1: + return lf_pos + 2 + + raise ValueError("Could not find header/body separator in audio part") + + @dataclass class SongMetadata: title: str @@ -145,16 +158,7 @@ def _parse_multipart(self, stream: typing.Iterator[bytes]) -> MultipartResponse: raise ValueError('Could not find audio part boundary') # Find the start of audio data (after headers and empty line) - audio_start = second_boundary + len(boundary_bytes) - - # Skip past the headers to find the empty line (\n\n) - while audio_start < len(response_bytes) - 1: - if (response_bytes[audio_start] == 0x0A and - response_bytes[audio_start + 1] == 0x0A): - # Found \n\n - audio starts after this - audio_start += 2 - break - audio_start += 1 + audio_start = _find_audio_start(response_bytes, second_boundary + len(boundary_bytes)) # Audio goes until the end (or until we find another boundary) audio_buffer = response_bytes[audio_start:] @@ -281,16 +285,7 @@ async def _parse_multipart_async(self, stream: typing.AsyncIterator[bytes]) -> M raise ValueError('Could not find audio part boundary') # Find the start of audio data (after headers and empty line) - audio_start = second_boundary + len(boundary_bytes) - - # Skip past the headers to find the empty line (\n\n) - while audio_start < len(response_bytes) - 1: - if (response_bytes[audio_start] == 0x0A and - response_bytes[audio_start + 1] == 0x0A): - # Found \n\n - audio starts after this - audio_start += 2 - break - audio_start += 1 + audio_start = _find_audio_start(response_bytes, second_boundary + len(boundary_bytes)) # Audio goes until the end (or until we find another boundary) audio_buffer = response_bytes[audio_start:] From 1bfe28a66f9d63f5e22d2f2f2e8a1059f74c3b20 Mon Sep 17 00:00:00 2001 From: Lukas Pastuszek Date: Fri, 6 Mar 2026 10:22:08 +0100 Subject: [PATCH 2/2] Add tests --- tests/test_music_multipart.py | 81 +++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 tests/test_music_multipart.py diff --git a/tests/test_music_multipart.py b/tests/test_music_multipart.py new file mode 100644 index 00000000..85a2cc64 --- /dev/null +++ b/tests/test_music_multipart.py @@ -0,0 +1,81 @@ +import json + +import pytest + +from elevenlabs.music_custom import MusicClient, _find_audio_start + +SAMPLE_JSON = {"compositionPlan": {"sections": []}, "songMetadata": {"title": "Test"}} +SAMPLE_AUDIO = bytes(range(256)) * 4 + + +def _build_multipart(line_ending: bytes) -> bytes: + """Build a realistic multipart response with the given line ending.""" + boundary = b"--boundary123" + nl = line_ending + + parts = [] + parts.append(boundary + nl) + parts.append(b"Content-Type: application/json" + nl) + parts.append(nl) + parts.append(json.dumps(SAMPLE_JSON).encode("utf-8") + nl) + parts.append(boundary + nl) + parts.append(b"Content-Type: audio/mpeg" + nl) + parts.append(b'Content-Disposition: attachment; filename="test_song.mp3"' + nl) + parts.append(nl) + parts.append(SAMPLE_AUDIO) + parts.append(nl + boundary + b"--" + nl) + + return b"".join(parts) + + +class TestFindAudioStart: + def test_crlf(self): + data = b"Header: value\r\n\r\naudio data here" + assert _find_audio_start(data, 0) == data.index(b"audio") + + def test_lf(self): + data = b"Header: value\n\naudio data here" + assert _find_audio_start(data, 0) == data.index(b"audio") + + def test_crlf_preferred_over_lf(self): + data = b"Header: value\r\n\r\naudio data here" + pos = _find_audio_start(data, 0) + assert data[pos : pos + 5] == b"audio" + + def test_raises_when_no_separator(self): + with pytest.raises(ValueError, match="header/body separator"): + _find_audio_start(b"no blank line here", 0) + + def test_start_offset_respected(self): + data = b"skip\n\nHeader: value\r\n\r\naudio" + pos = _find_audio_start(data, 6) + assert data[pos : pos + 5] == b"audio" + + +class TestParseMultipart: + @staticmethod + def _parse(data: bytes): + return MusicClient._parse_multipart(None, iter([data])) # type: ignore[arg-type] + + def test_crlf_line_endings(self): + data = _build_multipart(b"\r\n") + result = self._parse(data) + assert result.audio.startswith(SAMPLE_AUDIO) + assert result.json == SAMPLE_JSON + assert result.filename == "test_song.mp3" + + def test_lf_line_endings(self): + data = _build_multipart(b"\n") + result = self._parse(data) + assert result.audio.startswith(SAMPLE_AUDIO) + assert result.json == SAMPLE_JSON + assert result.filename == "test_song.mp3" + + def test_audio_bytes_not_corrupted(self): + for nl in [b"\r\n", b"\n"]: + data = _build_multipart(nl) + result = self._parse(data) + assert result.audio[:4] == SAMPLE_AUDIO[:4], ( + f"First 4 bytes mismatch with {nl!r} line endings — possible byte offset error" + ) + assert result.audio[: len(SAMPLE_AUDIO)] == SAMPLE_AUDIO