Skip to content

Commit b785efb

Browse files
committed
uris, external-ids and with-bar can be set in config. flags override file, and file overrides no flag
also, not saved playlists can be exported now if id is provided
1 parent 5926d0a commit b785efb

1 file changed

Lines changed: 252 additions & 21 deletions

File tree

exportify-cli.py

Lines changed: 252 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,29 +87,30 @@ def load_config(config_path: Path) -> configparser.ConfigParser:
8787
return config
8888
# Prompt user to create config
8989
logger.info(f"Config not found or invalid at {config_path}, creating new.")
90-
click.echo(
91-
"""
92-
File "config.cfg" not found or invalid. Let's create it.
90+
click.echo("""File "config.cfg" not found or invalid. Let's create it.
9391
9492
1. Go to Spotify Developer Dashboard (https://developer.spotify.com/dashboard).
9593
2. Create a new app.
9694
3. Set a name and description for your app.
9795
4. Add a redirect URI (e.g. http://127.0.0.1:3000/callback).
9896
99-
Now copy the Client ID, Client Secret and Redirect URI and paste below:
100-
"""
101-
)
97+
Now after creating the app, press the Settings button on the upper right corner.
98+
Copy the Client ID, Client Secret and Redirect URI and paste them below.""")
99+
102100
spotify_cfg = {
103101
"client_id": click.prompt("Spotify Client ID", type=str),
104102
"client_secret": click.prompt(
105-
"Spotify Client Secret", hide_input=True, type=str
103+
"Spotify Client Secret",
104+
hide_input=True,
105+
type=str,
106106
),
107107
"redirect_uri": click.prompt(
108108
"Redirect URI",
109109
type=str,
110-
default="http://127.0.0.1:3000/callback",
110+
default="http://127.0.1:3000/callback",
111111
),
112112
}
113+
113114
config["spotify"] = spotify_cfg
114115
if not validate_config(config):
115116
logger.error("Invalid Spotify configuration.")
@@ -144,20 +145,23 @@ def sanitize_filename(name: str, ext: str) -> str:
144145

145146

146147
def write_file(file_path: Path, data: list[dict], file_format: str = "csv") -> None:
147-
"""Write list of dicts to file (CSV or JSON)."""
148+
"""Write list of dicts to file."""
148149
if not data:
149150
logger.warning("No data to write; skipping file.")
150151
return
152+
151153
if file_format == "csv":
152154
headers = list(data[0].keys())
153155
with file_path.open("w", newline="", encoding="utf-8") as csvfile:
154156
writer = csv.DictWriter(csvfile, fieldnames=headers)
155157
writer.writeheader()
156158
for row in data:
157159
writer.writerow(row)
160+
158161
elif file_format == "json":
159162
with file_path.open("w", encoding="utf-8") as jsonfile:
160163
json.dump(data, jsonfile, ensure_ascii=False, indent=4)
164+
161165
logger.info(f"Exported to {file_path}")
162166

163167

@@ -172,6 +176,7 @@ def __init__(
172176
external_ids: bool,
173177
with_bar_flag: bool,
174178
) -> None:
179+
"""Initialize the exporter with a Spotify client."""
175180
self.spotify = spotify_client
176181
self.file_format = file_format
177182
self.include_uris = include_uris
@@ -188,10 +193,110 @@ def _fetch_all_items(
188193
show_bar: bool = True,
189194
**kwargs: Any,
190195
) -> list[dict]:
191-
# ... existing pagination and batching logic unchanged ...
192-
pass # existing implementation here
196+
"""Fetch all paginated or batched items from a Spotify endpoint.
197+
198+
- If the first positional arg is a list, treats it as an ID list for a batch
199+
endpoint (e.g. `self.spotify.albums`), calls in chunks of 20, and returns
200+
results[key] aggregated.
201+
- Otherwise treats as a paginated endpoint, calling `fetch_func(*args, **kwargs)`
202+
then .next() until exhausted, aggregating results["items"] or results[key].
203+
"""
204+
items: list[dict] = []
205+
206+
# --- Batch mode (e.g. spotify.albums) ---
207+
if args and isinstance(args[0], list):
208+
id_list: list[str] = args[0]
209+
total = len(id_list)
210+
desc_text = desc or fetch_func.__name__
211+
# build 20-item batches
212+
batches = [id_list[i : i + 20] for i in range(0, total, 20)]
213+
214+
pbar = (
215+
tqdm(total=total, desc=desc_text, unit="album", bar_format=bar_format)
216+
if show_bar and self.with_bar_flag
217+
else None
218+
)
219+
220+
for batch in batches:
221+
results = fetch_func(batch, *args[1:], **kwargs)
222+
page_items = results.get(key, [])
223+
224+
fetched_ids = [i.get("id") for i in page_items if i and i.get("id")]
225+
page_items = [i for i in page_items if i]
226+
227+
unfetched_ids = list(set(batch) - set(fetched_ids))
228+
shows = []
229+
for id in unfetched_ids:
230+
try:
231+
if unfetched_ids:
232+
shows.append(self.spotify.show(id))
233+
except spotipy.SpotifyException as e:
234+
logger.warning(f"Failed to fetch show for ID {id}: {e}")
235+
236+
for i in shows:
237+
i["label"] = i.get("publisher")
238+
page_items.extend(shows)
239+
240+
items.extend(page_items)
241+
242+
if pbar:
243+
pbar.update(len(page_items))
244+
245+
if pbar:
246+
pbar.close()
247+
return items
248+
249+
# --- Paginated mode (e.g. playlist_tracks, saved_tracks) ---
250+
# Initial fetch
251+
results = fetch_func(*args, **kwargs)
252+
page_items = results.get(key, [])
253+
items.extend(page_items)
254+
255+
total = results.get("total")
256+
# fallback to a name field or the function name if no desc given
257+
desc_text = desc or (results.get("name") or fetch_func.__name__)
258+
259+
pbar = (
260+
tqdm(total=total, desc=desc_text, unit="track", bar_format=bar_format)
261+
if show_bar and self.with_bar_flag
262+
else None
263+
)
264+
if pbar:
265+
pbar.update(len(page_items))
266+
267+
# iterate through all pages
268+
while len(items) < total:
269+
results = self.spotify.next(results)
270+
page_items = results.get(key, [])
271+
items.extend(page_items)
272+
if pbar:
273+
pbar.update(len(page_items))
274+
275+
if pbar:
276+
pbar.close()
277+
278+
def _episode_to_track(item: dict) -> dict:
279+
"""Convert episode to track if applicable."""
280+
if item.get("track"):
281+
track = item.get("track")
282+
else:
283+
return
284+
285+
if track.get("type") == "episode":
286+
episode_id = track.get("id")
287+
episode = self.spotify.episode(episode_id)
288+
track["release_date"] = episode.get("release_date")
289+
290+
for artist in track.get("artists", []):
291+
artist["name"] = artist.get("type")
292+
293+
for i in items:
294+
_episode_to_track(i)
295+
296+
return items
193297

194298
def get_playlists(self) -> list[dict]:
299+
"""Retrieve all user playlists plus liked songs."""
195300
items = self._fetch_all_items(
196301
self.spotify.current_user_playlists,
197302
"items",
@@ -207,8 +312,109 @@ def get_playlists(self) -> list[dict]:
207312
return [liked, *items]
208313

209314
def export_playlist(self, playlist: dict, output_dir: Path) -> None:
210-
# ... existing export logic unchanged ...
211-
pass # existing implementation here
315+
"""Export a single playlist to CSV file."""
316+
name, pid = playlist["name"], playlist["id"]
317+
output_dir.mkdir(parents=True, exist_ok=True)
318+
filepath = output_dir / sanitize_filename(name, self.file_format)
319+
320+
# Format description for progress bar
321+
desc = (
322+
name[: DESC_LENGTH - 2] + "...: "
323+
if len(name) > DESC_LENGTH - 2
324+
else f"{name}: ".ljust(DESC_LENGTH + 3)
325+
)
326+
327+
# Fetch tracks
328+
if pid == "liked_songs":
329+
items = self._fetch_all_items(
330+
self.spotify.current_user_saved_tracks,
331+
"items",
332+
desc=desc,
333+
)
334+
else:
335+
items = self._fetch_all_items(
336+
self.spotify.playlist_tracks,
337+
"items",
338+
pid,
339+
desc=desc,
340+
)
341+
342+
# Found a track like this in a playlist (replacing [user_id]
343+
# with the actual user id). I could remove it from the list
344+
# since it has no info about the track, but one could hypothetically
345+
# do detective work out of 'added_at' to find the track (or thing)
346+
# it originally was (I guess), so I'll leave it there.
347+
#
348+
# No idea on how it was generated.
349+
#
350+
# {'added_at': '2022-06-30T21:08:13Z',
351+
# 'added_by': {'external_urls': {'spotify': 'https://open.spotify.com/user/[user_id]'},
352+
# 'href': 'https://api.spotify.com/v1/users/[user_id]',
353+
# 'id': '[user_id]',
354+
# 'type': 'user',
355+
# 'uri': 'spotify:user:[user_id]'},
356+
# 'is_local': False,
357+
# 'primary_color': None,
358+
# 'track': None,
359+
# 'video_thumbnail': {'url': None}}
360+
361+
# Batch fetch album details
362+
album_ids = list(
363+
{
364+
i.get("track").get("album").get("id")
365+
for i in items
366+
if i.get("track")
367+
and i.get("track").get("album")
368+
and i.get("track").get("album").get("id")
369+
},
370+
)
371+
album_items = self._fetch_all_items(
372+
self.spotify.albums,
373+
"albums",
374+
album_ids,
375+
desc="Fetching album details: ",
376+
)
377+
albums = {a.get("id"): a for a in album_items if a}
378+
379+
# Build export data
380+
export_data = []
381+
for i in items:
382+
track = i.get("track") or {}
383+
album = albums.get(track.get("album", {}).get("id"), {})
384+
artists = [a["name"] for a in track.get("artists", [])]
385+
artist_uris = [a["uri"] for a in track.get("artists", [])]
386+
387+
record = {
388+
"Track URI": track.get("uri"),
389+
"Artist URI(s)": artist_uris,
390+
"Album URI": album.get("uri"),
391+
"Track Name": track.get("name"),
392+
"Album Name": album.get("name"),
393+
"Artist Name(s)": artists,
394+
"Release Date": album.get("release_date")
395+
or (track.get("release_date")),
396+
"Duration_ms": track.get("duration_ms"),
397+
"Popularity": track.get("popularity"),
398+
"Added By": i.get("added_by", {}).get("id"),
399+
"Added At": i.get("added_at"),
400+
"Record Label": album.get("label"),
401+
"Track ISRC": track.get("external_ids", {}).get("isrc"),
402+
"Album UPC": album.get("external_ids", {}).get("upc"),
403+
}
404+
405+
if not self.include_uris:
406+
record.pop("Artist URI(s)", None)
407+
record.pop("Album URI", None)
408+
if not self.external_ids:
409+
record.pop("Track ISRC", None)
410+
record.pop("Album UPC", None)
411+
412+
export_data.append(record)
413+
414+
write_file(filepath, export_data, self.file_format)
415+
click.echo(
416+
f"Exported {len(export_data)} tracks from '{name}' to {filepath}",
417+
)
212418

213419

214420
@click.command()
@@ -245,6 +451,7 @@ def export_playlist(self, playlist: dict, output_dir: Path) -> None:
245451
@click.option(
246452
"--playlist",
247453
"-p",
454+
"playlist",
248455
multiple=True,
249456
help="Names or IDs of playlists to export",
250457
)
@@ -271,7 +478,7 @@ def export_playlist(self, playlist: dict, output_dir: Path) -> None:
271478
"--with-bar/--no-bar",
272479
"with_bar_flag",
273480
default=None,
274-
help="Show progress bar (overrides config)",
481+
help="Show or hide progress bar (overrides config)",
275482
)
276483
def main(
277484
config: str,
@@ -306,6 +513,7 @@ def main(
306513
)
307514

308515
client = init_spotify_client(cfg)
516+
309517
exporter = SpotifyExporter(
310518
spotify_client=client,
311519
file_format=file_format,
@@ -314,41 +522,64 @@ def main(
314522
with_bar_flag=bar_flag,
315523
)
316524

317-
playlists = exporter.get_playlists()
525+
fetched_playlists = exporter.get_playlists()
526+
318527
if list_only:
319-
playlist_data = [[p["name"], p["id"], p["tracks"]["total"]] for p in playlists]
528+
playlist_data = [
529+
[p["name"], p["id"], p["tracks"]["total"]] for p in fetched_playlists
530+
]
320531
terminal_width = os.get_terminal_size().columns
532+
321533
click.echo(
322534
tabulate(
323535
playlist_data,
324536
headers=["Name", "ID", "Tracks"],
325537
tablefmt="simple",
538+
# 34 is the width of ID and Tracks columns + padding
326539
maxcolwidths=[terminal_width - 34, None, None],
327-
)
540+
),
328541
)
329542
sys.exit(0)
330543

331544
# Determine targets
332545
targets = []
333546
if export_all:
334-
targets = playlists
547+
targets = fetched_playlists
335548
else:
336-
for p in playlists:
549+
# Exact matches first
550+
for p in fetched_playlists:
337551
if p["name"] in playlist or p["id"] in playlist:
338552
targets.append(p)
553+
# For unmatched inputs, try unique prefix match
339554
for term in playlist:
340555
if any(p for p in targets if p["name"] == term or p["id"] == term):
341556
continue
557+
342558
matches = [
343-
p for p in playlists if p["name"].lower().startswith(term.lower())
559+
p
560+
for p in fetched_playlists
561+
if p["name"].lower().startswith(term.lower())
344562
]
345563
if len(matches) == 1:
346564
targets.append(matches[0])
347565
elif len(matches) > 1:
348566
click.echo(
349-
f"Ambiguous prefix '{term}': matches {', '.join(p['name'] for p in matches)}. Skipping."
567+
f"Ambiguous prefix '{term}': matches "
568+
f"{', '.join(p['name'] for p in matches)}. Skipping.",
350569
)
570+
571+
# User may be trying to export a playlist they have not saved
572+
elif term.isalnum() and len(term) == 22:
573+
try:
574+
pl = client.playlist(term)
575+
if pl:
576+
targets.append(pl)
577+
except spotipy.SpotifyException as e:
578+
logger.warning(f"Failed to fetch playlist {term}: {e}")
579+
580+
# Deduplicate
351581
targets = list({p["id"]: p for p in targets}.values())
582+
352583
if not targets:
353584
click.echo("No matching playlists found.")
354585
sys.exit(1)

0 commit comments

Comments
 (0)