diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d4a37f3e63..634bc3cfd9 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -3,6 +3,7 @@ # Specific ownerships: /beets/metadata_plugins.py @semohr +/beetsplug/tidal/* @semohr /beetsplug/titlecase.py @henry-oberholtzer diff --git a/beets/util/id_extractors.py b/beets/util/id_extractors.py index f66f1690f6..6f758a5ada 100644 --- a/beets/util/id_extractors.py +++ b/beets/util/id_extractors.py @@ -43,7 +43,7 @@ # - https://github.com/snejus/beetcamp. Bandcamp album URLs usually look # like: https://nameofartist.bandcamp.com/album/nameofalbum "bandcamp": re.compile(r"(.+)"), - "tidal": re.compile(r"([^/]+)$"), + "tidal": re.compile(r"(?:^|tidal\.com/(?:browse/)?(?:album|track)/)(\d+)"), } diff --git a/beetsplug/_utils/requests.py b/beetsplug/_utils/requests.py index 61a9992be5..21fa375d36 100644 --- a/beetsplug/_utils/requests.py +++ b/beetsplug/_utils/requests.py @@ -2,6 +2,7 @@ import atexit import threading +import time from contextlib import contextmanager from functools import cached_property from http import HTTPStatus @@ -103,6 +104,38 @@ def request(self, *args, **kwargs): return r +class RateLimitAdapter(HTTPAdapter): + """HTTPAdapter that enforces minimum interval between requests. + + Prevents server overload and 429 errors by sleeping when requests + come too fast. Thread-safe via lock. + + Attributes: + rate_limit: Minimum seconds between requests. Default 0.25 (4/sec). + + Override `_wait_time()` for custom strategies (token bucket, burst, etc.). + """ + + def __init__(self, rate_limit: float = 0.25, **kwargs): + super().__init__(**kwargs) + self.rate_limit = rate_limit + self._last_request_time = 0.0 + self._lock = threading.Lock() + + def _wait_time(self, elapsed: float) -> float: + """Return seconds to wait. Override for custom rate limiting.""" + return max(0, self.rate_limit - elapsed) + + def send(self, request: requests.PreparedRequest, *args, **kwargs): + with self._lock: + elapsed = time.monotonic() - self._last_request_time + wait = self._wait_time(elapsed) + if wait > 0: + time.sleep(wait) + self._last_request_time = time.monotonic() + return super().send(request, *args, **kwargs) + + class RequestHandler: """Manages HTTP requests with custom error handling and session management. diff --git a/beetsplug/tidal/__init__.py b/beetsplug/tidal/__init__.py new file mode 100644 index 0000000000..1ce754a622 --- /dev/null +++ b/beetsplug/tidal/__init__.py @@ -0,0 +1,500 @@ +from __future__ import annotations + +import itertools +import os +import re +from functools import cached_property +from typing import TYPE_CHECKING, overload + +import confuse + +from beets import ui +from beets.autotag.hooks import AlbumInfo, TrackInfo +from beets.logging import getLogger +from beets.metadata_plugins import MetadataSourcePlugin + +from .api import TidalAPI + +if TYPE_CHECKING: + import optparse + from collections.abc import Iterable, Sequence + + from beets.library.models import Item, Library + + from .api_types import ( + AlbumAttributes, + ResourceIdentifier, + TidalAlbum, + TidalArtist, + TidalTrack, + TrackAttributes, + ) + + +log = getLogger("beets.tidal") + + +class TidalPlugin(MetadataSourcePlugin): + def __init__(self) -> None: + super().__init__() + + self.config.add( + { + "client_id": "mcjmpl1bPATJXcBT", + "tokenfile": "tidal_token.json", + } + ) + self.config["client_id"].redact = True + + # We need to be authenticated if plugin is used to fetch metadata + # otherwise the import cannot run. + self.register_listener("import_begin", self.require_authentication) + + @cached_property + def api(self) -> TidalAPI: + return TidalAPI( + client_id=self.config["client_id"].as_str(), + token_path=self._tokenfile(), + ) + + def _tokenfile(self) -> str: + """Return the configured path to the token file in the app directory.""" + return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True)) + + def require_authentication(self): + if not os.path.isfile(self._tokenfile()): + raise ui.UserError( + "Please login to TIDAL" + " using `beet tidal --auth` or disable tidal plugin" + ) + + def commands(self) -> list[ui.Subcommand]: + tidal_cmd = ui.Subcommand( + "tidal", help="Tidal metadata plugin commands" + ) + tidal_cmd.parser.add_option( + "-a", + "--auth", + action="store_true", + help="Authenticate and login to Tidal", + default=False, + ) + + def func(lib: Library, opts: optparse.Values, args: list[str]): + if opts.auth: + self.api.ui_authenticate_flow() + else: + tidal_cmd.print_help() + + tidal_cmd.func = func + + return [tidal_cmd] + + def album_for_id(self, album_id: str) -> AlbumInfo | None: + if not (tidal_id := self._extract_id(album_id)): + return None + + if album := list(self.search_albums_by_ids(tidal_ids=[tidal_id])): + return album[0] + + log.warning("Could not find album:{0}", tidal_id) + return None + + def albums_for_ids(self, ids: Iterable[str]) -> Iterable[AlbumInfo | None]: + yield from self.search_albums_by_ids(ids=ids) + + def track_for_id(self, track_id: str) -> TrackInfo | None: + if not (tidal_id := self._extract_id(track_id)): + return None + + if track := list(self.search_tracks_by_ids(tidal_ids=[tidal_id])): + return track[0] + + log.warning("Could not find track:{0}", tidal_id) + return None + + def tracks_for_ids(self, ids: Iterable[str]) -> Iterable[TrackInfo | None]: + yield from self.search_tracks_by_ids(ids=ids) + + def candidates( + self, items: Sequence[Item], artist: str, album: str, va_likely: bool + ) -> Iterable[AlbumInfo]: + candidates: list[AlbumInfo] = [] + # Tidal allows to lookup via isrc and barcode (nice!) + # We just return early here as a lookup via isrc should + # return a 100% match + barcodes: list[str] = list( + filter(None, set(i.get("barcode") for i in items)) + ) + if barcodes and ( + candidates := list( + filter(None, self.search_albums_by_ids(barcode_ids=barcodes)), + ) + ): + return candidates + + for query in self._album_queries(items): + candidates += self.search_albums_by_query(query) + + log.debug("Found {0} candidates", len(candidates)) + return candidates + + def item_candidates( + self, item: Item, artist: str, title: str + ) -> Iterable[TrackInfo]: + candidates: list[TrackInfo] = [] + # Tidal allows to lookup via isrc and barcode (nice!) + # We just return early here as a lookup via isrc should + # return a 100% match + if isrc := item.get("isrc"): + if candidates := list( + filter(None, self.search_tracks_by_ids(isrcs=[isrc])) + ): + return candidates + + for query in self._item_queries(item): + candidates += self.search_tracks_by_query(query) + + log.debug("Found {0} candidates", len(candidates)) + return candidates + + @staticmethod + def _item_queries(item: Item) -> Iterable[str]: + """Search queries for items.""" + yield item.title + + if item.artist: + yield f"{item.artist} {item.title}" + + @staticmethod + def _album_queries(items: Sequence[Item]) -> Iterable[str]: + """Search queries for albums.""" + + album_names = set(i.album for i in items) + artist_names = set(i.artist for i in items) + + for album, artist in itertools.product(album_names, artist_names): + yield f"{artist} {album}" + + def search_tracks_by_query(self, query: str) -> Iterable[TrackInfo]: + """Search for tracks given a string query.""" + search_doc = self.api.search_results( + query, + include=["tracks.artists"], + ) + track_by_id: dict[str, TidalTrack] = { + item["id"]: item + for item in search_doc.get("included", []) + if item["type"] == "tracks" + } + artist_by_id: dict[str, TidalArtist] = { + item["id"]: item + for item in search_doc.get("included", []) + if item["type"] == "artists" + } + for track_rel in search_doc["data"]["relationships"]["tracks"]["data"]: + if track := track_by_id.get(track_rel["id"]): + yield self._get_track_info(track, artist_by_id=artist_by_id) + else: + log.warning( + "Track with id {0} not found in lookup", + track_rel["id"], + ) + + def search_albums_by_query(self, query: str) -> Iterable[AlbumInfo]: + """Search for album given a string query.""" + search_doc = self.api.search_results( + query, + include=["albums"], + # include="albums.items.artists" <- not supported + # This is a bit inconvenient, but we fetch the items and artists + # for all albums separately. + ) + album_ids = [ + album_rel["id"] + for album_rel in search_doc["data"]["relationships"]["albums"][ + "data" + ] + ] + yield from filter(None, self.search_albums_by_ids(tidal_ids=album_ids)) + + @overload + def search_tracks_by_ids( + self, *, ids: Iterable[str] + ) -> Iterable[TrackInfo | None]: ... + + @overload + def search_tracks_by_ids( + self, *, tidal_ids: Iterable[str] + ) -> Iterable[TrackInfo | None]: ... + + @overload + def search_tracks_by_ids( + self, *, isrcs: Iterable[str] + ) -> Iterable[TrackInfo | None]: ... + + def search_tracks_by_ids( + self, + ids: Iterable[str] | None = None, + tidal_ids: Iterable[str] | None = None, + isrcs: Iterable[str] | None = None, + ) -> Iterable[TrackInfo | None]: + _ids: list[str | None] = list(tidal_ids or []) + isrcs = list(isrcs or []) + if ids: + _ids = list(map(self._extract_id, ids)) + + tracks_doc = self.api.get_tracks( + ids=list(filter(None, _ids)), + isrcs=isrcs, + include=["artists"], + ) + track_by_id: dict[str, TidalTrack] = { + item["id"]: item + for item in tracks_doc.get("data", []) + if item["type"] == "tracks" + } + artist_by_id: dict[str, TidalArtist] = { + item["id"]: item + for item in tracks_doc.get("included", []) + if item["type"] == "artists" + } + + for _id in _ids: + if _id is not None and (track := track_by_id.get(_id)): + yield self._get_track_info(track, artist_by_id=artist_by_id) + else: + yield None + + if isrcs: + isrc_to_track: dict[str, TidalTrack] = { + t["attributes"]["isrc"]: t for t in track_by_id.values() + } + + for isrc in isrcs: + if track := isrc_to_track.get(isrc): + yield self._get_track_info(track, artist_by_id=artist_by_id) + else: + yield None + + @overload + def search_albums_by_ids( + self, *, ids: Iterable[str] + ) -> Iterable[AlbumInfo | None]: ... + + @overload + def search_albums_by_ids( + self, *, tidal_ids: Iterable[str] + ) -> Iterable[AlbumInfo | None]: ... + + @overload + def search_albums_by_ids( + self, *, barcode_ids: Iterable[str] + ) -> Iterable[AlbumInfo | None]: ... + + def search_albums_by_ids( + self, + ids: Iterable[str] | None = None, + tidal_ids: Iterable[str] | None = None, + barcode_ids: Iterable[str] | None = None, + ) -> Iterable[AlbumInfo | None]: + _ids: list[str | None] = list(tidal_ids or []) + barcode_ids = list(barcode_ids or []) + if ids: + _ids = list(map(self._extract_id, ids)) + + albums_doc = self.api.get_albums( + ids=list(filter(None, _ids)), + barcode_ids=barcode_ids, + include=["items.artists", "artists"], + ) + album_by_id: dict[str, TidalAlbum] = { + item["id"]: item + for item in albums_doc.get("data", []) + if item["type"] == "albums" + } + track_by_id: dict[str, TidalTrack] = { + item["id"]: item + for item in albums_doc.get("included", []) + if item["type"] == "tracks" + } + artist_by_id: dict[str, TidalArtist] = { + item["id"]: item + for item in albums_doc.get("included", []) + if item["type"] == "artists" + } + + for _id in _ids: + if _id is not None and (album := album_by_id.get(_id)): + yield self._get_album_info( + album, + track_by_id=track_by_id, + artist_by_id=artist_by_id, + ) + else: + yield None + + if barcode_ids: + barcode_to_album: dict[str, TidalAlbum] = { + a["attributes"]["barcodeId"]: a for a in album_by_id.values() + } + + for barcode in barcode_ids: + if album := barcode_to_album.get(barcode): + yield self._get_album_info( + album, + track_by_id=track_by_id, + artist_by_id=artist_by_id, + ) + else: + yield None + + def _get_album_info( + self, + album: TidalAlbum, + track_by_id: dict[str, TidalTrack], + artist_by_id: dict[str, TidalArtist], + ) -> AlbumInfo: + + track_infos: list[TrackInfo] = [] + for i, track_rel in enumerate( + album["relationships"]["items"]["data"], start=1 + ): + if track := track_by_id.get(track_rel["id"]): + track_info = self._get_track_info(track, artist_by_id) + track_info.index = i + track_infos.append(track_info) + + artist_names, artist_ids = self._parse_artists( + album["relationships"]["artists"]["data"], + artist_by_id, + ) + date_parts = self._parse_release_date(album["attributes"]) + return AlbumInfo( + # Identifier + data_source=self.data_source, + album_id=album["id"], + artists_ids=artist_ids, + data_url=self._parse_data_url(album["attributes"]), + barcode=album["attributes"]["barcodeId"], + # Meta + album=self._parse_title(album["attributes"]), + tracks=track_infos, + artist=", ".join(artist_names), + artists=artist_names, + duration=self._duration_to_seconds(album["attributes"]["duration"]), + albumtype=album["attributes"]["albumType"], + label=self._parse_label(album["attributes"]), + year=date_parts[0] if date_parts else None, + month=date_parts[1] if date_parts else None, + day=date_parts[2] if date_parts else None, + ) + + def _get_track_info( + self, + track: TidalTrack, + artist_by_id: dict[str, TidalArtist], + ) -> TrackInfo: + artist_names, artist_ids = self._parse_artists( + track["relationships"]["artists"]["data"], + artist_by_id, + ) + + return TrackInfo( + # Identifier + data_source=self.data_source, + track_id=track["id"], + artists_ids=artist_ids, + data_url=self._parse_data_url(track["attributes"]), + # Meta + title=self._parse_title(track["attributes"]), + isrc=track["attributes"]["isrc"], + artist=", ".join(artist_names), + artists=artist_names, + duration=self._duration_to_seconds(track["attributes"]["duration"]), + label=self._parse_label(track["attributes"]), + ) + + @staticmethod + def _parse_artists( + artist_relationships: list[ResourceIdentifier], + artist_by_id: dict[str, TidalArtist], + ) -> tuple[list[str], list[str]]: + """Extract artists from a relationship. + + Artists are sorted in the track/album response relationship but not in the + track/album responses included items. + """ + artist_names = [] + artist_ids = [] + for artist_rel in artist_relationships: + if artist := artist_by_id.get(artist_rel["id"]): + artist_ids.append(artist["id"]) + artist_names.append(artist["attributes"]["name"]) + else: + log.warning( + "Artist with id {0} not found in lookup", + artist_rel["id"], + ) + + return artist_names, artist_ids + + @staticmethod + def _parse_title(attributes: AlbumAttributes | TrackAttributes): + """ + Tidal UIs append the version string at the end of the title. We do the same here + by formatting it as ``"{title} ({version})"`` to stay consistent. + """ + if version := attributes.get("version"): + return f"{attributes['title']} ({version})" + else: + return attributes["title"] + + @staticmethod + def _parse_data_url( + attributes: AlbumAttributes | TrackAttributes, + ) -> str | None: + if external_links := attributes.get("externalLinks"): + return external_links[0].get("href") + return None + + @staticmethod + def _duration_to_seconds(duration: str) -> int | None: + """Convert ISO 8601 duration to seconds. E.g. 'PT15M2S' -> 902.""" + match = ISO_8601_RE.match(duration) + if not match: + log.warning("Invalid ISO 8601 duration: {0}", duration) + return None + parts = {k: int(v) if v else 0 for k, v in match.groupdict().items()} + return parts["seconds"] + parts["minutes"] * 60 + parts["hours"] * 3600 + + @staticmethod + def _parse_label( + attributes: AlbumAttributes | TrackAttributes, + ) -> str | None: + if copyright := attributes.get("copyright"): + return copyright["text"] + return None + + @staticmethod + def _parse_release_date( + attributes: AlbumAttributes, + ) -> tuple[int, int, int] | None: + """Returns year, month, day from iso YYYY-MM-DD""" + + if ( + (release_date := attributes.get("releaseDate")) + and (parts := release_date.split("-")) + and len(parts) == 3 + ): + return int(parts[0]), int(parts[1]), int(parts[2]) + return None + + +ISO_8601_RE = re.compile( + r"^P" + r"T" + r"(?:(?P\d+)H)?" + r"(?:(?P\d+)M)?" + r"(?:(?P\d+)S)?$" +) diff --git a/beetsplug/tidal/api.py b/beetsplug/tidal/api.py new file mode 100644 index 0000000000..e5f02eb89a --- /dev/null +++ b/beetsplug/tidal/api.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import urllib.parse +import webbrowser +from functools import cached_property +from itertools import islice, zip_longest +from typing import TYPE_CHECKING, Any, TypeVar + +from beets import ui +from beets.logging import getLogger +from beetsplug._utils.requests import RequestHandler +from beetsplug.tidal.session import TidalSession + +if TYPE_CHECKING: + from collections.abc import Iterable + + from .api_types import ( + AlbumDocument, + Document, + SearchDocument, + TrackDocument, + ) + + T = TypeVar("T") + +log = getLogger("beets.tidal") + + +API_BASE = "https://openapi.tidal.com/v2" +MAX_FILTER_SIZE = 20 + + +def _batched(iterable: Iterable[T], n: int) -> Iterable[list[T]]: + # FIXME: Replace with itertools.batched once + # we upgrade to python > 3.12 + it = iter(iterable) + while batch := list(islice(it, n)): + yield batch + + +class TidalAPI(RequestHandler): + def __init__(self, client_id: str, token_path: str) -> None: + self.client_id = client_id + self.token_path = token_path + + @cached_property + def session(self) -> TidalSession: + return TidalSession(self.client_id, self.token_path) + + def search_results( + self, + query: str, + *, + explicit_filter: str = "INCLUDE", + include: list[str] | None = None, + country_code: str = "US", + ) -> SearchDocument: + """Search results for a query. + + https://tidal-music.github.io/tidal-api-reference/#/searchResults + """ + params = { + "explicitFilter": explicit_filter, + "countryCode": country_code, + "include": include or [], + } + + return self.get_json( + f"{API_BASE}/searchResults/{urllib.parse.quote(query)}", + params=params, + ) + + def get_tracks( + self, + ids: list[str] | None = None, + isrcs: list[str] | None = None, + include: list[str] | None = None, + country_code: str = "US", + ) -> TrackDocument: + """Fetch tracks resolving pagination and included items. + + https://tidal-music.github.io/tidal-api-reference/#/tracks/get_tracks + """ + ids = ids or [] + isrcs = isrcs or [] + + # Tidal allows at max 20 filters per request. This needs a bit of extra + # logic sadly. + doc: TrackDocument = { + "data": [], + "included": [], + } + for id_batch, isrc_batch in zip_longest( + _batched(ids, MAX_FILTER_SIZE), + _batched(isrcs, MAX_FILTER_SIZE), + fillvalue=(), + ): + params: dict[str, Any] = {"countryCode": country_code} + if id_batch: + params["filter[id]"] = id_batch + if isrc_batch: + params["filter[isrc]"] = isrc_batch + + doc = self.merge_multiresource_pagination( + doc, + self.get_paginated( + f"{API_BASE}/tracks", include, params=params + ), + ) + + return doc + + def get_albums( + self, + ids: list[str] | None = None, + barcode_ids: list[str] | None = None, + include: list[str] | None = None, + country_code: str = "US", + ) -> AlbumDocument: + """Fetch Albums resolving pagination and included items. + + https://tidal-music.github.io/tidal-api-reference/#/albums/get_albums + """ + ids = ids or [] + barcode_ids = barcode_ids or [] + + # Tidal allows at max 20 filters per request. This needs a bit of extra + # logic sadly. + doc: AlbumDocument = { + "data": [], + "included": [], + } + for id_batch, barcode_batch in zip_longest( + _batched(ids, MAX_FILTER_SIZE), + _batched(barcode_ids, MAX_FILTER_SIZE), + fillvalue=(), + ): + params: dict[str, Any] = {"countryCode": country_code} + if id_batch: + params["filter[id]"] = id_batch + if barcode_batch: + params["filter[barcodeId]"] = barcode_batch + + doc = self.merge_multiresource_pagination( + doc, + self.get_paginated( + f"{API_BASE}/albums", include, params=params + ), + ) + + return doc + + def ui_authenticate_flow(self) -> None: + """Interactive first-time authentication (PKCE flow). + + 1. Visit generated URL + 2. Paste full redirect URL (with ?code=...) + 3. Token auto-saved for future use + """ + auth_url, _ = self.session.authorization_url( + "https://login.tidal.com/authorize" + ) + try: + webbrowser.open(auth_url) + except webbrowser.Error: + ui.print_(f"Visit: {auth_url}") + redirect_url = ui.input_("Paste redirected URL: ") + self.session.fetch_token( + "https://auth.tidal.com/v1/oauth2/token", + authorization_response=redirect_url, + include_client_id=True, + ) + self.session.save_token(self.session.token) + ui.print_(f"Saved tidal token in {self.session.token_path}") + + @staticmethod + def merge_multiresource_pagination( + a: Document[list[T]], + b: Document[list[T]], + ) -> Document[list[T]]: + """ + Merge of b into a, following JSON:API spec rules. + + - Appends data arrays + - Deduplicates included by (type, id) + - Updates links (b overrides a) + """ + a["included"] = a.get("included", []) + a["links"] = a.get("links", {}) + + a["data"].extend(b["data"]) + + # Merge included with deduplication + seen = {(item["type"], item["id"]) for item in a["included"]} + for item in b.get("included", []): + key = (item["type"], item["id"]) + if key not in seen: + seen.add(key) + a["included"].append(item) + + # Update pagination links (final state wins) + a["links"] = b.get("links", {}) + return a + + def get_paginated( + self, + url: str, + include: list[str] | str | None = None, + params: dict[str, Any] | None = None, + **kwargs, + ) -> Document[list[Any]]: + """ + Perform a GET request to the Tidal API with pagination resolution. + """ + include = include or [] + params = params or {} + + doc: Document[list[Any]] = { + "data": [], + "included": [], + "links": {"next": url}, + } + + while next := doc.get("links", {}).get("next"): + page_doc = self.get_json( + url=next, + params={**params, "include": include}, + **kwargs, + ) + doc = self.merge_multiresource_pagination(doc, page_doc) + + # Dedupe include + doc["included"] = list( + { + (item["type"], item["id"]): item + for item in doc.get("included", []) + }.values() + ) + return doc diff --git a/beetsplug/tidal/api_types.py b/beetsplug/tidal/api_types.py new file mode 100644 index 0000000000..7e61e6b149 --- /dev/null +++ b/beetsplug/tidal/api_types.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +from typing import Generic, Literal, TypedDict, TypeVar + +from typing_extensions import NotRequired + + +class ResourceIdentifier(TypedDict): + id: str + type: str + + +class RelationshipLinks(TypedDict): + self: NotRequired[str] + next: NotRequired[str] + + +class RelationshipData(TypedDict): + data: list[ResourceIdentifier] + links: RelationshipLinks + + +class SingleRelationshipData(TypedDict): + data: ResourceIdentifier + links: NotRequired[RelationshipLinks] + + +class ExternalLink(TypedDict): + href: str + meta: str + + +class Copyright(TypedDict): + text: str + + +class ArtistAttributes(TypedDict): + name: str + popularity: float # 0.0 - 1.0, required + handle: NotRequired[str] + ownerType: NotRequired[Literal["LABEL", "USER", "MIXED"]] + spotlighted: NotRequired[bool] + + +class AlbumAttributes(TypedDict): + # see "Albums_Attributes" + # in https://tidal-music.github.io/tidal-api-reference/tidal-api-oas.json + + # Required + albumType: Literal["ALBUM", "EP", "SINGLE"] + barcodeId: str + duration: str # ISO 8601 + explicit: bool + mediaTags: list[str] + numberOfItems: int + numberOfVolumes: int + popularity: float + title: str + + # Optional + accessType: NotRequired[Literal["PUBLIC", "UNLISTED", "PRIVATE"]] + copyright: NotRequired[Copyright] + createdAt: NotRequired[str] # ISO 8601 datetime + externalLinks: NotRequired[list[ExternalLink]] + releaseDate: NotRequired[str] # ISO date YYYY-MM-DD + version: NotRequired[str] + + +class TrackAttributes(TypedDict): + # see "Tracks_Attributes" + # in https://tidal-music.github.io/tidal-api-reference/tidal-api-oas.json + + # Required + title: str + duration: str # ISO 8601 + explicit: bool + isrc: str + key: Literal[ + "UNKNOWN", + "C", + "CSharp", + "D", + "Eb", + "E", + "F", + "FSharp", + "G", + "Ab", + "A", + "Bb", + "B", + ] + keyScale: Literal[ + "UNKNOWN", + "MAJOR", + "MINOR", + "AEOLIAN", + "BLUES", + "DORIAN", + "HARMONIC_MINOR", + "LOCRIAN", + "LYDIAN", + "MIXOLYDIAN", + "PENTATONIC_MAJOR", + "PHRYGIAN", + "MELODIC_MINOR", + "PENTATONIC_MINOR", + ] + mediaTags: list[str] + popularity: float + + # Optional + accessType: NotRequired[Literal["PUBLIC", "UNLISTED", "PRIVATE"]] + bpm: NotRequired[float] + copyright: NotRequired[Copyright] + createdAt: NotRequired[str] # ISO 8601 datetime + externalLinks: NotRequired[list[ExternalLink]] + spotlighted: NotRequired[bool] + toneTags: NotRequired[list[str]] + version: NotRequired[str] + + +class SearchAttributes(TypedDict): + didYouMean: NotRequired[str] + trackingId: str + + +class TidalArtist(TypedDict): + id: str + type: Literal["artists"] + attributes: ArtistAttributes + + +class TidalAlbum(TypedDict): + id: str + type: Literal["albums"] + attributes: AlbumAttributes + relationships: dict[str, RelationshipData] + + +class TidalTrack(TypedDict): + id: str + type: Literal["tracks"] + attributes: TrackAttributes + relationships: dict[str, RelationshipData] + + +class TidalSearch(TypedDict): + id: str + type: Literal["searchResults"] + attributes: SearchAttributes + relationships: dict[str, RelationshipData] + + +T = TypeVar("T") + + +class Document(TypedDict, Generic[T]): + data: T + included: NotRequired[list[TidalArtist | TidalAlbum | TidalTrack]] + links: NotRequired[dict[str, str]] + + +AlbumDocument = Document[list[TidalAlbum]] +TrackDocument = Document[list[TidalTrack]] +SearchDocument = Document[TidalSearch] diff --git a/beetsplug/tidal/session.py b/beetsplug/tidal/session.py new file mode 100644 index 0000000000..3ca9b99908 --- /dev/null +++ b/beetsplug/tidal/session.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import json +from http import HTTPStatus +from pathlib import Path +from time import sleep +from typing import TYPE_CHECKING + +import requests +from requests_oauthlib import OAuth2Session +from urllib3.util.retry import Retry + +from beets.logging import getLogger +from beetsplug._utils.requests import RateLimitAdapter, TimeoutAndRetrySession + +if TYPE_CHECKING: + from beetsplug._typing import JSONDict + +API_BASE = "https://openapi.tidal.com/v2" + +log = getLogger("beets.tidal") + + +class TidalSession(OAuth2Session, TimeoutAndRetrySession): + """Tidal API session with automatic OAuth2 PKCE authentication. + + Handles: + - Initial interactive PKCE flow (Tidal required) + - Automatic token refresh + - Rate limiting (~4 req/s) + - Token persistence + - API base URL prefixing + """ + + token_path: Path + + def __init__(self, client_id: str, token_path: str | Path) -> None: + self.token_path = Path(token_path) + + # Load token & init parent + token = self.load_token() + super().__init__( + client_id, + token=token, + scope="search.read", + auto_refresh_url="https://auth.tidal.com/v1/oauth2/token", + redirect_uri="https://localhost", + auto_refresh_kwargs={"client_id": client_id}, + token_updater=self.save_token, + pkce="S256", + ) + + # Retry on server errors + retry = Retry( + total=6, + backoff_factor=0.5, + status_forcelist=[ + HTTPStatus.INTERNAL_SERVER_ERROR, + HTTPStatus.BAD_GATEWAY, + HTTPStatus.SERVICE_UNAVAILABLE, + HTTPStatus.GATEWAY_TIMEOUT, + ], + ) + # Rate limit to ~4/s as tidal will penalize heavily if not respected + adapter = RateLimitAdapter(rate_limit=0.25, max_retries=retry) + self.mount("https://auth.tidal.com/", adapter) + self.mount(API_BASE, adapter) + + def load_token(self) -> JSONDict | None: + """Load token from JSON file.""" + if self.token_path.exists(): + with open(self.token_path) as f: + return json.load(f) + return None + + def save_token(self, token: JSONDict) -> None: + """Save token to JSON file.""" + with open(self.token_path, "w") as f: + json.dump(token, f, indent=2) + + def request( + self, method: str | bytes, url: str | bytes, *args, **kwargs + ) -> requests.Response: + """Override for Tidal-specific base URL and rate limits.""" + if isinstance(url, str) and not url.startswith("http"): + url = API_BASE + url + + try: + res = super().request(method, url, *args, **kwargs) + except requests.exceptions.HTTPError as e: + res = e.response + if res.status_code == 429: + self._handle_rate_limit(res) + return self.request(method, url, *args, **kwargs) + raise + return res + + def _handle_rate_limit(self, response: requests.Response) -> None: + remaining = int(response.headers.get("Retry-After", 0)) + if remaining > 0: + log.debug( + "Rate limit exceeded. Retrying after {0} seconds.", remaining + ) + sleep(remaining) + else: + raise Exception( + "Rate limit handling failed: Retry-After header is missing or invalid" + ) + return diff --git a/docs/changelog.rst b/docs/changelog.rst index 90db3d7c2a..94b09a5103 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -26,6 +26,17 @@ New features modes. - :doc:`plugins/fromfilename`: Support ``track`` prefix when parsing the track number from the filename (e.g., ``track01.m4a``). +- **Tidal plugin**: Introduces a new plugin for fetching metadata from Tidal. It + supports album and track lookups by ID, including batch operations via + ``albums_for_ids`` and ``tracks_for_ids``. It also enables search by query as + well as identifier-based retrieval, with support for ISRC codes (tracks) and + barcode/EANs (albums). + + This is an initial, relatively minimal implementation, but already fully + usable for common metadata workflows. We welcome feedback, improvement ideas, + and community contributions to further extend its capabilities. + + See :doc:`plugins/tidal` for more information. .. Bug fixes diff --git a/docs/plugins/index.rst b/docs/plugins/index.rst index c164e5ec71..c858c741da 100644 --- a/docs/plugins/index.rst +++ b/docs/plugins/index.rst @@ -127,6 +127,7 @@ databases. They share the following configuration options: substitute the thumbnails + tidal titlecase types unimported @@ -160,6 +161,9 @@ Autotagger Extensions :doc:`spotify ` Search for releases in the Spotify_ database. +:doc:`tidal ` + Search for releases in the Tidal_ catalog. + .. _deezer: https://www.deezer.com/en/ .. _discogs: https://www.discogs.com @@ -168,6 +172,8 @@ Autotagger Extensions .. _spotify: https://open.spotify.com/ +.. _tidal: https://tidal.com/ + Metadata -------- diff --git a/docs/plugins/tidal.rst b/docs/plugins/tidal.rst new file mode 100644 index 0000000000..9e1f0e3e1a --- /dev/null +++ b/docs/plugins/tidal.rst @@ -0,0 +1,95 @@ +Tidal Plugin +============ + +The ``tidal`` plugin provides metadata matches for the autotagger using the +Tidal_ Web APIs. + +.. _tidal: https://tidal.com + +Why Use the Tidal Plugin? +------------------------- + +The Tidal plugin allows you to: + +- Fetch metadata for albums and tracks from Tidal's catalog +- Look up tracks by ISRC code (useful if your music files already contain ISRC + tags) +- Look up albums by barcode ID (useful if your CDs have UPC barcodes) +- Get matches during import without needing to manually search + +This is especially useful if your music already has ISRC or barcode metadata +embedded, as it allows for quick and accurate matching to Tidal's catalog. + +Requirements +------------ + +- A Tidal account (free or premium) +- Python environment with network access to Tidal's API + +Before using the plugin, you need to authorize your beets installation to access +your Tidal account. + +Authentication +-------------- + +To authenticate with Tidal, run: + +.. code-block:: bash + + beet tidal --auth + +This will open a browser window where you can log in to your Tidal account. +After successful authentication, your token will be saved to the configured +token file and you won't need to re-authenticate on subsequent runs. + +Basic Usage +----------- + +Enable the ``tidal`` plugin (see :ref:`using-plugins`). Once enabled, you will +receive Tidal matches when importing new items. + +During import, you can also manually enter a Tidal URL at the ``enter Id`` +prompt: + +.. code-block:: bash + + Enter search, enter Id, aBort, eDit, edit Candidates, plaY? i + Enter release ID: https://tidal.com/album/226495055 + +You can enter both album and track URLs. For example: + +.. code-block:: bash + + https://tidal.com/track/490839595 + https://tidal.com/album/234493117 + +Configuration +------------- + +This plugin can be configured like other metadata source plugins as described in +:ref:`metadata-source-plugin-configuration`. + +Default +~~~~~~~ + +.. code-block:: yaml + + tidal: + client_id: mcjmpl1bPATJXcBT + tokenfile: tidal_token.json + data_source_mismatch_penalty: 0.5 + search_limit: 5 + +.. conf:: client_id + :default: mcjmpl1bPATJXcBT + + The Tidal API client ID. The default value is the public demo client ID. + You can register your own application at Tidal's developer portal for + production use. + +.. conf:: tokenfile + :default: tidal_token.json + + The path to the file where the Tidal authentication token is stored. + +.. include:: ./shared_metadata_source_config.rst diff --git a/setup.cfg b/setup.cfg index a6209a6421..88b4e73653 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,6 +19,7 @@ omit = test/* beets/test/* beetsplug/_typing.py + */*_types.py [coverage:report] precision = 2 @@ -60,3 +61,6 @@ strict = true [[mypy-beetsplug._utils]] strict = true + +[[mypy-beetsplug.tidal.*]] +strict = true diff --git a/test/plugins/test_tidal.py b/test/plugins/test_tidal.py new file mode 100644 index 0000000000..0f9ae45f96 --- /dev/null +++ b/test/plugins/test_tidal.py @@ -0,0 +1,579 @@ +"""Tests for the 'tidal' plugin.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING +from unittest.mock import Mock + +import pytest + +from beets.library.models import Item +from beets.test.helper import PluginTestCase +from beetsplug.tidal import TidalPlugin + +if TYPE_CHECKING: + from beetsplug.tidal.api_types import ( + AlbumAttributes, + TidalAlbum, + TidalArtist, + TidalTrack, + TrackAttributes, + ) + + +def _make_artist(id: str, name: str) -> TidalArtist: + return { + "id": id, + "type": "artists", + "attributes": {"name": name, "popularity": 0.5}, + } + + +def _make_album( + id: str, + title: str, + tracks: list[TidalTrack], + artist_ids: list[str], + release_date: str = "2024-01-15", + version: str | None = None, +) -> tuple[TidalAlbum, dict[str, TidalTrack], dict[str, TidalArtist]]: + artist_lookup = { + aid: _make_artist(aid, f"Artist {aid}") for aid in artist_ids + } + track_lookup = {t["id"]: t for t in tracks} + + attrs: AlbumAttributes = { + "albumType": "ALBUM", + "barcodeId": "123456", + "duration": "PT45M", + "explicit": False, + "mediaTags": [], + "numberOfItems": len(tracks), + "numberOfVolumes": 1, + "popularity": 0.5, + "title": title, + "releaseDate": release_date, + } + if version: + attrs["version"] = version + + album: TidalAlbum = { + "id": id, + "type": "albums", + "attributes": attrs, + "relationships": { + "artists": { + "data": [{"id": aid, "type": "artists"} for aid in artist_ids], + "links": {}, + }, + "items": { + "data": [{"id": t["id"], "type": "tracks"} for t in tracks], + "links": {}, + }, + }, + } + return album, track_lookup, artist_lookup + + +def _make_track( + id: str, + title: str, + duration: str = "PT3M30S", + isrc: str = "ISRC123", + artist_ids: list[str] | None = None, + version: str | None = None, +) -> TidalTrack: + attrs: TrackAttributes = { + "title": title, + "duration": duration, + "explicit": False, + "isrc": isrc, + "key": "C", + "keyScale": "MAJOR", + "mediaTags": [], + "popularity": 0.5, + } + if version: + attrs["version"] = version + return { + "id": id, + "type": "tracks", + "attributes": attrs, + "relationships": { + "artists": { + "data": [ + {"id": aid, "type": "artists"} for aid in (artist_ids or []) + ], + "links": {}, + }, + }, + } + + +class TidalPluginTest(PluginTestCase): + plugin = "tidal" + + def setUp(self): + super().setUp() + self.tidal = TidalPlugin() + + +class TestAlbumParsing(TidalPluginTest): + """High-level tests for album parsing.""" + + def test_parse_album(self): + track = _make_track("t1", "My Song", "PT3M30S", "ISRC001", ["a1"]) + album, track_lookup, artist_lookup = _make_album( + "al1", "My Album", [track], ["a1"] + ) + + info = self.tidal._get_album_info(album, track_lookup, artist_lookup) + + assert info.album == "My Album" + assert info.album_id == "al1" + assert len(info.tracks) == 1 + assert info.tracks[0].title == "My Song" + + def test_parse_album_with_multiple_tracks(self): + tracks = [ + _make_track("t1", "Track One", "PT3M", "ISRC1", ["a1"]), + _make_track("t2", "Track Two", "PT4M", "ISRC2", ["a1"]), + ] + album, track_lookup, artist_lookup = _make_album( + "al2", "Album Two", tracks, ["a1"] + ) + + info = self.tidal._get_album_info( + album, + track_lookup, + artist_lookup, + ) + + assert len(info.tracks) == 2 + assert info.tracks[0].index == 1 + assert info.tracks[1].index == 2 + + def test_parse_album_with_version(self): + """Album title should have version appended.""" + track = _make_track("t1", "My Song", "PT3M", "ISRC001", ["a1"]) + album, track_lookup, artist_lookup = _make_album( + "al3", "My Album", [track], ["a1"], version="Deluxe Edition" + ) + + info = self.tidal._get_album_info(album, track_lookup, artist_lookup) + + assert info.album == "My Album (Deluxe Edition)" + + +class TestTrackParsing(TidalPluginTest): + """High-level tests for track parsing.""" + + def test_parse_track(self): + track = _make_track("t1", "My Track", "PT4M", "ISRC456", ["a1"]) + artist_lookup = {"a1": _make_artist("a1", "My Artist")} + + info = self.tidal._get_track_info(track, artist_lookup) + + assert info.title == "My Track" + assert info.track_id == "t1" + assert info.duration == 240 # PT4M = 240 seconds + assert info.isrc == "ISRC456" + assert info.artist == "My Artist" + + def test_parse_track_with_version(self): + """Track title should have version appended.""" + track = _make_track( + "t2", "My Song", "PT3M", "ISRC002", ["a1"], version="Remastered" + ) + artist_lookup = {"a1": _make_artist("a1", "My Artist")} + + info = self.tidal._get_track_info(track, artist_lookup) + + assert info.title == "My Song (Remastered)" + + +class TestTrackForID(TidalPluginTest): + """Tests for track_for_id with mocked API.""" + + def test_track_for_id(self): + """Test fetching track by ID via API.""" + track = _make_track("490839595", "API Track", "PT3M", "ISRC001", ["a1"]) + artist = _make_artist("a1", "API Artist") + + self.tidal.api.get_tracks = Mock( + return_value={ + "data": [track], + "included": [artist], + } + ) + + info = self.tidal.track_for_id("https://tidal.com/track/490839595") + self.tidal.api.get_tracks.assert_called_once() + + assert info is not None + assert info.title == "API Track" + assert info.track_id == "490839595" + assert info.artist == "API Artist" + + def test_track_for_id_not_found(self): + """Test track_for_id returns None when not found.""" + self.tidal.api.get_tracks = Mock(return_value={"data": []}) + info = self.tidal.track_for_id("https://tidal.com/track/490839595") + assert info is None + + info = self.tidal.track_for_id("does_not_exist") + assert info is None + + +class TestTracksForIDs(TidalPluginTest): + """Tests for tracks_for_ids with mocked API.""" + + def test_tracks_for_ids(self): + """Test fetching multiple tracks by IDs via API.""" + track1 = _make_track( + "490839595", "API Track 1", "PT3M", "ISRC001", ["a1"] + ) + track2 = _make_track( + "490839596", "API Track 2", "PT4M", "ISRC002", ["a1"] + ) + artist = _make_artist("a1", "API Artist") + + self.tidal.api.get_tracks = Mock( + return_value={ + "data": [track1, track2], + "included": [artist], + } + ) + + results = list( + self.tidal.tracks_for_ids( + [ + "https://tidal.com/track/490839595", + "https://tidal.com/track/490839596", + ] + ) + ) + self.tidal.api.get_tracks.assert_called_once() + assert len(results) == 2 + assert results[0] is not None + assert results[0].title == "API Track 1" + assert results[1] is not None + assert results[1].title == "API Track 2" + + def test_tracks_for_ids_with_missing(self): + """Test tracks_for_ids yields None for IDs not found.""" + track = _make_track("490839595", "API Track", "PT3M", "ISRC001", ["a1"]) + artist = _make_artist("a1", "API Artist") + + self.tidal.api.get_tracks = Mock( + return_value={ + "data": [track], + "included": [artist], + } + ) + + results = list( + self.tidal.tracks_for_ids( + [ + "https://tidal.com/track/490839595", + "does_not_exist", + ] + ) + ) + + assert len(results) == 2 + assert results[0] is not None + assert results[0].title == "API Track" + assert results[1] is None + + +class TestAlbumForID(TidalPluginTest): + """Tests for album_for_id with mocked API.""" + + def test_album_for_id(self): + """Test fetching album by ID via API.""" + track = _make_track("t1", "Album Track", "PT3M30S", "ISRC001", ["a1"]) + album, track_lookup, artist_lookup = _make_album( + "226495055", "API Album", [track], ["a1"] + ) + self.tidal.api.get_albums = Mock( + return_value={ + "data": [album], + "included": [*artist_lookup.values(), *track_lookup.values()], + } + ) + + info = self.tidal.album_for_id("https://tidal.com/album/226495055") + + assert info is not None + assert info.album == "API Album" + assert info.album_id == "226495055" + assert len(info.tracks) == 1 + assert info.tracks[0].title == "Album Track" + + def test_album_for_id_not_found(self): + """Test album_for_id returns None when not found.""" + self.tidal.api.get_albums = Mock(return_value={"data": []}) + info = self.tidal.album_for_id("https://tidal.com/album/226495055") + assert info is None + + info = self.tidal.album_for_id("does_not_exist") + assert info is None + + +class TestAlbumsForIDs(TidalPluginTest): + """Tests for albums_for_ids with mocked API.""" + + def test_albums_for_ids(self): + """Test fetching multiple albums by IDs via API.""" + track1 = _make_track("t1", "Album Track 1", "PT3M", "ISRC001", ["a1"]) + track2 = _make_track("t2", "Album Track 2", "PT4M", "ISRC002", ["a1"]) + album1, track_lookup1, artist_lookup1 = _make_album( + "226495055", "API Album 1", [track1], ["a1"] + ) + album2, track_lookup2, artist_lookup2 = _make_album( + "226495056", "API Album 2", [track2], ["a1"] + ) + + # Combine lookups to simulate API response + all_included = [ + *artist_lookup1.values(), + *artist_lookup2.values(), + *track_lookup1.values(), + *track_lookup2.values(), + ] + + self.tidal.api.get_albums = Mock( + return_value={ + "data": [album1, album2], + "included": all_included, + } + ) + + results = list( + self.tidal.albums_for_ids( + [ + "https://tidal.com/album/226495055", + "https://tidal.com/album/226495056", + ] + ) + ) + + self.tidal.api.get_albums.assert_called_once() + # Note: yields album then None for each ID + assert len(results) == 2 + assert results[0] is not None + assert results[0].album == "API Album 1" + assert results[1] is not None + assert results[1].album == "API Album 2" + + def test_albums_for_ids_with_missing(self): + """Test albums_for_ids yields None for IDs not found.""" + track = _make_track("t1", "Album Track", "PT3M", "ISRC001", ["a1"]) + album, track_lookup, artist_lookup = _make_album( + "226495055", "API Album", [track], ["a1"] + ) + + self.tidal.api.get_albums = Mock( + return_value={ + "data": [album], + "included": [*artist_lookup.values(), *track_lookup.values()], + } + ) + + results = list( + self.tidal.albums_for_ids( + [ + "https://tidal.com/album/226495055", + "does_not_exist", + ] + ) + ) + + # yields (album, None) for (found, not_found) + assert len(results) == 2 + assert results[0] is not None + assert results[0].album == "API Album" + assert results[1] is None + + +class TestCandidates(TidalPluginTest): + """Tests for candidates method.""" + + def test_candidates_with_barcode(self): + """Test that candidates uses barcode lookup first.""" + track = _make_track("t1", "Album Track", "PT3M", "ISRC001", ["a1"]) + album, track_lookup, artist_lookup = _make_album( + "al1", "Barcode Album", [track], ["a1"] + ) + + self.tidal.api.get_albums = Mock( + return_value={ + "data": [album], + "included": [*artist_lookup.values(), *track_lookup.values()], + } + ) + + items = [Item(barcode="123456")] + + candidates = list( + self.tidal.candidates(items, "Artist", "Album", False) + ) + + self.tidal.api.get_albums.assert_called_once() + assert len(candidates) == 1 + assert candidates[0].album == "Barcode Album" + + def test_candidates_with_query_fallback(self): + """Test that candidates falls back to query search when no barcode.""" + items = [Item(title="My Song", artist="My Artist", album="My Album")] + + # Mock search returning album IDs + self.tidal.api.search_results = Mock( + return_value={ + "data": { + "relationships": { + "albums": { + "data": [{"id": "al1", "type": "albums"}], + }, + }, + }, + } + ) + + # Mock album lookup by ID + track = _make_track("t1", "Album Track", "PT3M", "ISRC001", ["a1"]) + album, track_lookup, artist_lookup = _make_album( + "al1", "Query Album", [track], ["a1"] + ) + self.tidal.api.get_albums = Mock( + return_value={ + "data": [album], + "included": [*artist_lookup.values(), *track_lookup.values()], + } + ) + + candidates = list( + self.tidal.candidates(items, "My Artist", "My Album", False) + ) + + # Should have called search_results + assert self.tidal.api.search_results.called + assert len(candidates) == 1 + assert candidates[0].album == "Query Album" + + +class TestItemCandidates(TidalPluginTest): + """Tests for item_candidates method.""" + + def test_item_candidates_with_isrc(self): + """Test that item_candidates uses ISRC lookup first.""" + track = _make_track( + "490839595", "ISRC Track", "PT3M", "ISRC001", ["a1"] + ) + artist = _make_artist("a1", "ISRC Artist") + + self.tidal.api.get_tracks = Mock( + return_value={ + "data": [track], + "included": [artist], + } + ) + + item = Item(isrc="ISRC001") + + results = list(self.tidal.item_candidates(item, "Artist", "Title")) + + self.tidal.api.get_tracks.assert_called_once() + assert len(results) == 1 + assert results[0].title == "ISRC Track" + + def test_item_candidates_with_query_fallback(self): + """Test that item_candidates falls back to query search when no ISRC.""" + item = Item(title="Query Song", artist="Query Artist") + + self.tidal.api.search_results = Mock( + return_value={ + "data": { + "relationships": { + "tracks": { + "data": [{"id": "490839595", "type": "tracks"}], + }, + }, + }, + "included": [ + _make_track( + "490839595", "Query Track", "PT3M", "ISRC002", ["a1"] + ), + _make_artist("a1", "Query Artist"), + ], + } + ) + + results = list( + self.tidal.item_candidates(item, "Query Artist", "Query Song") + ) + + assert self.tidal.api.search_results.called + assert results[0].title == "Query Track" + + +class TestStaticHelpers: + """Tests for static helper methods.""" + + @pytest.mark.parametrize( + "attrs, expected", + [ + ( + { + "externalLinks": [ + {"href": "https://tidal.com/b/123", "meta": ""} + ] + }, + "https://tidal.com/b/123", + ), + ({}, None), + ], + ) + def test_parse_data_url(self, attrs, expected): + assert TidalPlugin._parse_data_url(attrs) == expected + + @pytest.mark.parametrize( + "attrs, expected", + [ + ({"copyright": {"text": "(P) 2024 Tidal"}}, "(P) 2024 Tidal"), + ({}, None), + ], + ) + def test_parse_label(self, attrs, expected): + assert TidalPlugin._parse_label(attrs) == expected + + @pytest.mark.parametrize( + "attrs, expected", + [ + ({"releaseDate": "2024-01-15"}, (2024, 1, 15)), + ({}, None), + ({"releaseDate": "2024"}, None), + ], + ) + def test_parse_release_date(self, attrs, expected): + assert TidalPlugin._parse_release_date(attrs) == expected + + @pytest.mark.parametrize( + "duration,expected", + [ + ("PT30S", 30), + ("PT3M30S", 210), + ("PT4M", 240), + ("PT1H", 3600), + ("PT1H30M", 5400), + ], + ) + def test_duration_conversions(self, duration, expected): + assert TidalPlugin._duration_to_seconds(duration) == expected + + def test_duration_invalid_raises(self, caplog): + with caplog.at_level(logging.WARNING): + TidalPlugin._duration_to_seconds("invalid") + assert "Invalid ISO 8601 duration: invalid" in caplog.text diff --git a/test/plugins/utils/test_requests.py b/test/plugins/utils/test_requests.py new file mode 100644 index 0000000000..feadb3cce7 --- /dev/null +++ b/test/plugins/utils/test_requests.py @@ -0,0 +1,47 @@ +from unittest.mock import MagicMock + +import pytest +import requests + +from beetsplug._utils.requests import RateLimitAdapter + + +def _prepared_request( + url: str = "https://example.com", +) -> requests.PreparedRequest: + req = requests.Request("GET", url) + return req.prepare() + + +class TestRateLimitAdapter: + @pytest.mark.parametrize( + "last_request_time, now, expected_sleep", + [ + (100.0, 100.0, 0.25), + (100.0, 100.1, 0.15), + ], + ) + def test_send_sleeps_for_remaining_time( + self, monkeypatch, last_request_time, now, expected_sleep + ): + adapter = RateLimitAdapter(rate_limit=0.25) + request = _prepared_request() + + send_mock = MagicMock(return_value="ok") + monkeypatch.setattr( + "beetsplug._utils.requests.HTTPAdapter.send", send_mock + ) + + monkeypatch.setattr( + "beetsplug._utils.requests.time.monotonic", + lambda: now, + ) + + sleep_mock = MagicMock() + monkeypatch.setattr("beetsplug._utils.requests.time.sleep", sleep_mock) + + adapter._last_request_time = last_request_time + adapter.send(request) + + assert sleep_mock.call_count == 1 + assert sleep_mock.call_args.args[0] == pytest.approx(expected_sleep) diff --git a/test/util/test_id_extractors.py b/test/util/test_id_extractors.py index e510dd5d83..07571c1ef7 100644 --- a/test/util/test_id_extractors.py +++ b/test/util/test_id_extractors.py @@ -30,6 +30,9 @@ ("musicbrainz", "blah blah", None), ("musicbrainz", "https://musicbrainz.org/entity/28e32c71-1450-463e-92bf-e0a46446fc11", "28e32c71-1450-463e-92bf-e0a46446fc11"), ("bandcamp", "https://nameofartist.bandcamp.com/album/nameofalbum", "https://nameofartist.bandcamp.com/album/nameofalbum"), + ("tidal", "https://tidal.com/browse/album/76699758", "76699758"), + ("tidal", "https://tidal.com/track/463928643/u", "463928643"), + ("tidal", "blah blah", None), ], ) # fmt: skip def test_extract_release_id(source, id_string, expected): @@ -47,6 +50,7 @@ class SourceWithURL(NamedTuple): SourceWithURL("beatport", "https://www.beatport.com/release/album-name/3089651"), SourceWithURL("discogs", "http://www.discogs.com/G%C3%BCnther-Lause-Meru-Ep/release/4354798"), SourceWithURL("musicbrainz", "https://musicbrainz.org/entity/28e32c71-1450-463e-92bf-e0a46446fc11"), + SourceWithURL("tidal", "https://tidal.com/track/463928643"), ] # fmt: skip