File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ from concurrent .futures import ThreadPoolExecutor
2+ from typing import Callable
3+
4+
5+ def func_wrapper (args ):
6+ (f , offset , * args ) = args
7+ items = f (* args )
8+ return list ((i + offset , item ) for i , item in enumerate (items ))
9+
10+
11+ def get_items (
12+ func : Callable ,
13+ * args ,
14+ parse : Callable = lambda _ : _ ,
15+ chunk_size : int = 50 ,
16+ processes : int = 5 ,
17+ ):
18+ """
19+ This function performs pagination on a function that supports
20+ `limit`/`offset` parameters and it runs API requests in parallel to speed
21+ things up.
22+ """
23+ items = []
24+ offsets = [- chunk_size ]
25+ remaining = chunk_size * processes
26+
27+ with ThreadPoolExecutor (
28+ processes , thread_name_prefix = f"mopidy-tidal-{ func .__name__ } -"
29+ ) as pool :
30+ while remaining == chunk_size * processes :
31+ offsets = [offsets [- 1 ] + chunk_size * (i + 1 ) for i in range (processes )]
32+
33+ pool_results = pool .map (
34+ func_wrapper ,
35+ [
36+ (
37+ func ,
38+ offset ,
39+ * args ,
40+ chunk_size , # limit
41+ offset , # offset
42+ )
43+ for offset in offsets
44+ ],
45+ )
46+
47+ new_items = []
48+ for results in pool_results :
49+ new_items .extend (results )
50+
51+ remaining = len (new_items )
52+ items .extend (new_items )
53+
54+ items = [_ for _ in items if _ ]
55+ sorted_items = list (
56+ map (lambda item : item [1 ], sorted (items , key = lambda item : item [0 ]))
57+ )
58+
59+ return list (map (parse , sorted_items ))
You can’t perform that action at this time.
0 commit comments