Skip to content

Commit bc2a94c

Browse files
committed
duplicates: write type annotations
This adds type annotations to `beetsplug.duplicates`.
1 parent 9b1bd5d commit bc2a94c

1 file changed

Lines changed: 82 additions & 40 deletions

File tree

beetsplug/duplicates.py

Lines changed: 82 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
"""List duplicate tracks or albums."""
1616

17+
from __future__ import annotations
18+
1719
import os
1820
import shlex
21+
from typing import TYPE_CHECKING, Any, TypeAlias, cast
1922

2023
from beets.library import Album, Item
2124
from beets.plugins import BeetsPlugin
@@ -28,8 +31,23 @@
2831
subprocess,
2932
)
3033

34+
if TYPE_CHECKING:
35+
import optparse
36+
from collections import defaultdict
37+
from collections.abc import Iterator
38+
39+
from beets.dbcore.db import Results
40+
from beets.library.library import Library
41+
3142
PLUGIN = "duplicates"
3243

44+
# Value of `tiebreak` config item.
45+
# Really the key is "item" or "album".
46+
Tiebreak: TypeAlias = dict[str, list[str]]
47+
# Attribute values; these are formed by taking the values of `keys` (a list of
48+
# strings) on the items. These are used as dict keys in a bunch of places.
49+
KeyValues: TypeAlias = tuple[Any]
50+
3351

3452
class DuplicatesPlugin(BeetsPlugin):
3553
"""List duplicate tracks or albums"""
@@ -57,7 +75,9 @@ def __init__(self):
5775
}
5876
)
5977

60-
self._command = Subcommand("duplicates", help=__doc__, aliases=["dup"])
78+
self._command = Subcommand(
79+
"duplicates", help=cast(str, __doc__), aliases=["dup"]
80+
)
6181
self._command.parser.add_option(
6282
"-c",
6383
"--count",
@@ -142,23 +162,23 @@ def __init__(self):
142162
self._command.parser.add_all_common_options()
143163

144164
def commands(self):
145-
def _dup(lib, opts, args):
165+
def _dup(lib: Library, opts: optparse.Values, args: list[str]):
146166
self.config.set_args(opts)
147-
album = self.config["album"].get(bool)
148-
checksum = self.config["checksum"].get(str)
149-
copy = bytestring_path(self.config["copy"].as_str())
150-
count = self.config["count"].get(bool)
151-
delete = self.config["delete"].get(bool)
152-
remove = self.config["remove"].get(bool)
153-
fmt_tmpl = self.config["format"].get(str)
154-
full = self.config["full"].get(bool)
155-
keys = self.config["keys"].as_str_seq()
156-
merge = self.config["merge"].get(bool)
157-
move = bytestring_path(self.config["move"].as_str())
158-
path = self.config["path"].get(bool)
159-
tiebreak = self.config["tiebreak"].get(dict)
160-
strict = self.config["strict"].get(bool)
161-
tag = self.config["tag"].get(str)
167+
album: bool = self.config["album"].get(bool) # type: ignore
168+
checksum: str = self.config["checksum"].get(str) # type: ignore
169+
copy: bytes = bytestring_path(self.config["copy"].as_str()) # type: ignore
170+
count: bool = self.config["count"].get(bool) # type: ignore
171+
delete: bool = self.config["delete"].get(bool) # type: ignore
172+
remove: bool = self.config["remove"].get(bool) # type: ignore
173+
fmt_tmpl: str = self.config["format"].get(str) # type: ignore
174+
full: bool = self.config["full"].get(bool) # type: ignore
175+
keys: list[str] = self.config["keys"].as_str_seq() # type: ignore
176+
merge: bool = self.config["merge"].get(bool) # type: ignore
177+
move: bytes = bytestring_path(self.config["move"].as_str()) # type: ignore
178+
path: bool = self.config["path"].get(bool) # type: ignore
179+
tiebreak: Tiebreak = self.config["tiebreak"].get(dict) # type: ignore
180+
strict: bool = self.config["strict"].get(bool) # type: ignore
181+
tag: str = self.config["tag"].get(str) # type: ignore
162182

163183
if album:
164184
if not keys:
@@ -185,9 +205,11 @@ def _dup(lib, opts, args):
185205
fmt_tmpl = "$albumartist - $album - $title"
186206

187207
if checksum:
208+
k = None
188209
for i in items:
189210
k, _ = self._checksum(i, checksum)
190-
keys = [k]
211+
if k is not None:
212+
keys = [k]
191213

192214
for obj_id, obj_count, objs in self._duplicates(
193215
items,
@@ -214,13 +236,13 @@ def _dup(lib, opts, args):
214236

215237
def _process_item(
216238
self,
217-
item,
218-
copy=False,
219-
move=False,
220-
delete=False,
221-
tag=False,
222-
fmt="",
223-
remove=False,
239+
item: Item | Album,
240+
copy: bytes | None = None,
241+
move: bytes | None = None,
242+
delete: bool = False,
243+
tag: str | None = None,
244+
fmt: str = "",
245+
remove: bool = False,
224246
):
225247
"""Process Item `item`."""
226248
print_(format(item, fmt))
@@ -242,16 +264,17 @@ def _process_item(
242264
setattr(item, k, v)
243265
item.store()
244266

245-
def _checksum(self, item, prog):
267+
def _checksum(
268+
self, item: Item | Album, prog: str
269+
) -> tuple[str, bytes | None]:
246270
"""Run external `prog` on file path associated with `item`, cache
247271
output as flexattr on a key that is the name of the program, and
248272
return the key, checksum tuple.
249273
"""
250-
args = [
251-
p.format(file=os.fsdecode(item.path)) for p in shlex.split(prog)
252-
]
274+
path = os.fsdecode(item.path)
275+
args = [p.format(file=path) for p in shlex.split(prog)]
253276
key = args[0]
254-
checksum = getattr(item, key, False)
277+
checksum = cast(bytes | None, getattr(item, key, None))
255278
if not checksum:
256279
self._log.debug(
257280
"key {} on item {.filepath} not cached:computing checksum",
@@ -275,7 +298,14 @@ def _checksum(self, item, prog):
275298
)
276299
return key, checksum
277300

278-
def _group_by(self, objs, keys, strict):
301+
def _group_by(
302+
self,
303+
objs: Results[Album] | Results[Item],
304+
keys: list[str],
305+
strict: bool,
306+
) -> (
307+
defaultdict[KeyValues, list[Album]] | defaultdict[KeyValues, list[Item]]
308+
):
279309
"""Return a dictionary with keys arbitrary concatenations of attributes
280310
and values lists of objects (Albums or Items) with those keys.
281311
@@ -305,7 +335,11 @@ def _group_by(self, objs, keys, strict):
305335

306336
return counts
307337

308-
def _order(self, objs, tiebreak=None):
338+
def _order(
339+
self,
340+
objs: list[Album] | list[Item],
341+
tiebreak: dict[str, list[str]] | None = None,
342+
) -> list[Album] | list[Item]:
309343
"""Return the objects (Items or Albums) sorted by descending
310344
order of priority.
311345
@@ -340,7 +374,7 @@ def key(x):
340374
def key(x):
341375
return len(x.items())
342376

343-
return sorted(objs, key=key, reverse=True)
377+
return sorted(objs, key=key, reverse=True) # type: ignore
344378

345379
def _merge_items(self, objs):
346380
"""Merge Item objs by copying missing fields from items in the tail to
@@ -366,7 +400,7 @@ def _merge_items(self, objs):
366400
break
367401
return objs
368402

369-
def _merge_albums(self, objs):
403+
def _merge_albums(self, objs: list[Album]) -> list[Album]:
370404
"""Merge Album objs by copying missing items from albums in the tail
371405
to the head album.
372406
@@ -400,12 +434,20 @@ def _merge(self, objs):
400434
objs = self._merge_albums(objs)
401435
return objs
402436

403-
def _duplicates(self, objs, keys, full, strict, tiebreak, merge):
437+
def _duplicates(
438+
self,
439+
objs: Results[Album] | Results[Item],
440+
keys: list[str],
441+
full: bool,
442+
strict: bool,
443+
tiebreak: dict[str, list[str]],
444+
merge: bool,
445+
) -> Iterator[tuple[KeyValues, int, list[Album] | list[Item]]]:
404446
"""Generate triples of keys, duplicate counts, and constituent objects."""
405447
offset = 0 if full else 1
406-
for k, objs in self._group_by(objs, keys, strict).items():
407-
if len(objs) > 1:
408-
objs = self._order(objs, tiebreak)
448+
for k, grouped_objs in self._group_by(objs, keys, strict).items():
449+
if len(grouped_objs) > 1:
450+
ordered_objs = self._order(grouped_objs, tiebreak)
409451
if merge:
410-
objs = self._merge(objs)
411-
yield (k, len(objs) - offset, objs[offset:])
452+
ordered_objs = self._merge(ordered_objs)
453+
yield (k, len(ordered_objs) - offset, ordered_objs[offset:])

0 commit comments

Comments
 (0)