22"""CoLRev pdf_get operation: Get PDF documents."""
33from __future__ import annotations
44
5+ import logging
56import shutil
67import typing
78from glob import glob
2324from colrev .writer .write_utils import write_file
2425
2526
27+ def relink_pdfs_in_source (
28+ * ,
29+ source : colrev .search_file .ExtendedSearchFile ,
30+ records : dict ,
31+ pdf_dir : Path ,
32+ logger : logging .Logger ,
33+ ) -> None :
34+
35+ # pylint: disable=too-many-locals
36+
37+ def get_pdf_candidates () -> dict :
38+ logger .info ("Calculate CPIDs to link PDF file(s)" )
39+ candidates : dict [str , Path ] = {}
40+ for pdf_candidate in pdf_dir .glob ("**/*.pdf" ):
41+ colrev_pdf_id = colrev .record .record_pdf .PDFRecord .get_colrev_pdf_id (
42+ pdf_candidate
43+ )
44+ relative_path = pdf_candidate .relative_to (home_path )
45+ candidates [colrev_pdf_id ] = relative_path
46+ return candidates
47+
48+ logger .info (
49+ "Checking PDFs in same directory to reassign when "
50+ f"the cpid is identical ({ source .search_results_path } )"
51+ )
52+
53+ home_path = pdf_dir .parent .parent
54+
55+ pdf_candidates : typing .Dict [str , Path ] = {}
56+
57+ source_records_dict = colrev .loader .load_utils .load (
58+ filename = source .search_results_path ,
59+ logger = logger ,
60+ )
61+ source_records = list (source_records_dict .values ())
62+ if len (source_records ) == 0 :
63+ logger .info ("No records to relink" )
64+ return
65+
66+ logger .info ("Start relinking procedure" )
67+
68+ corresponding_origin = str (source .get_origin_prefix ())
69+ for record in records .values ():
70+
71+ if Fields .FILE not in record :
72+ continue
73+
74+ # Note: we check the source_records based on the cpids
75+ # in the record because cpids are not stored in the source_record
76+ # (pdf hashes may change after import/preparation)
77+ source_rec = {}
78+ if corresponding_origin != "" :
79+ source_origin_l = [
80+ o for o in record [Fields .ORIGIN ] if corresponding_origin in o
81+ ]
82+ if len (source_origin_l ) == 1 :
83+ source_origin = source_origin_l [0 ]
84+ source_origin = source_origin .replace (f"{ corresponding_origin } /" , "" )
85+ source_rec_l = [
86+ s for s in source_records if s [Fields .ID ] == source_origin
87+ ]
88+ if len (source_rec_l ) == 1 :
89+ source_rec = source_rec_l [0 ]
90+
91+ if not source_rec and (home_path / Path (record [Fields .FILE ])).is_file ():
92+ continue
93+
94+ if not (home_path / Path (record [Fields .FILE ])).is_file ():
95+ logger .info (
96+ f"Primary record ({ record [Fields .ID ]} ): "
97+ f"Broken file path { Colors .RED } { record [Fields .FILE ]} { Colors .END } "
98+ )
99+ if not (home_path / Path (source_rec [Fields .FILE ])).is_file ():
100+ logger .info (
101+ f"Source record ({ source .search_results_path } { source_rec [Fields .ID ]} ) "
102+ f"of { record [Fields .ID ]} : Broken file path { Colors .RED } { source_rec [Fields .FILE ]} { Colors .END } "
103+ )
104+
105+ if (home_path / Path (record [Fields .FILE ])).is_file () and (
106+ home_path / Path (source_rec [Fields .FILE ])
107+ ).is_file ():
108+ if record [Fields .FILE ] != source_rec [Fields .FILE ]:
109+ logger .warning (
110+ f"{ Colors .ORANGE } Source record has { source_rec [Fields .FILE ]} "
111+ f"but primary record has { record [Fields .FILE ]} (both exist) "
112+ f"- Resolve manually.{ Colors .END } "
113+ )
114+ continue
115+
116+ if not source_rec and not (home_path / Path (record [Fields .FILE ])).is_file ():
117+ if not pdf_candidates :
118+ pdf_candidates = get_pdf_candidates ()
119+
120+ record_cpid = record .get ("colrev_pdf_id" )
121+ if record_cpid and record_cpid in pdf_candidates :
122+ pdf_candidate = pdf_candidates [record_cpid ]
123+ logger .info (
124+ "- Primary record: Updated path to file with matching CPID: "
125+ f"{ Colors .GREEN } { pdf_candidate } { Colors .END } "
126+ )
127+ record [Fields .FILE ] = str (pdf_candidate )
128+ else :
129+ logger .warning (
130+ f"{ Colors .RED } - Primary record: Did not find the PDF file based on CPID.{ Colors .END } "
131+ )
132+ continue
133+
134+ id_named_pdf = "data/pdfs/" + record [Fields .ID ] + ".pdf"
135+ if (
136+ not (home_path / Path (record [Fields .FILE ])).is_file ()
137+ and (home_path / Path (id_named_pdf )).is_file ()
138+ ):
139+
140+ logger .info (
141+ "- Primary record: Updated path to match existing file: "
142+ f"{ Colors .GREEN } { record [Fields .ID ]} .pdf{ Colors .END } "
143+ )
144+ record [Fields .FILE ] = Path (id_named_pdf )
145+ if source_rec and str (source_rec [Fields .FILE ]) != id_named_pdf :
146+ logger .info (
147+ "- Source record: Updated path to match existing file: "
148+ f"{ Colors .GREEN } { record [Fields .ID ]} .pdf{ Colors .END } "
149+ )
150+ source_rec [Fields .FILE ] = Path (id_named_pdf )
151+
152+ if (home_path / Path (record [Fields .FILE ])).is_file () and not (
153+ home_path / Path (source_rec [Fields .FILE ])
154+ ).is_file ():
155+ logger .info (
156+ "- Source record: Updated to path from primary record: "
157+ f"{ Colors .GREEN } { record [Fields .FILE ]} { Colors .END } "
158+ )
159+ source_rec [Fields .FILE ] = record [Fields .FILE ]
160+ continue
161+
162+ logger .info ("Relinking completed. Save results." )
163+ source_records_dict = {r [Fields .ID ]: r for r in source_records }
164+ write_file (records_dict = source_records_dict , filename = source .search_results_path )
165+
166+
26167class PDFGet (colrev .process .operation .Operation ):
27168 """Get the PDFs"""
28169
@@ -247,88 +388,6 @@ def _fix_broken_symlinks(self) -> None:
247388 broken_symlink .unlink ()
248389 broken_symlink .symlink_to (new_file )
249390
250- def _relink_pdfs_in_source (
251- self , source : colrev .search_file .ExtendedSearchFile
252- ) -> None :
253-
254- # pylint: disable=too-many-locals
255-
256- self .review_manager .logger .info (
257- "Checking PDFs in same directory to reassign when "
258- f"the cpid is identical { source .search_results_path } "
259- )
260-
261- pdf_dir = self .review_manager .paths .pdf
262- pdf_candidates = {}
263- for pdf_candidate in list (pdf_dir .glob ("**/*.pdf" )):
264- colrev_pdf_id = colrev .record .record_pdf .PDFRecord .get_colrev_pdf_id (
265- pdf_candidate
266- )
267- relative_path = pdf_candidate .relative_to (self .review_manager .path )
268- pdf_candidates [relative_path ] = colrev_pdf_id
269-
270- source_records_dict = colrev .loader .load_utils .load (
271- filename = source .search_results_path ,
272- logger = self .review_manager .logger ,
273- )
274- source_records = list (source_records_dict .values ())
275- corresponding_origin = str (source .get_origin_prefix ())
276- records = self .review_manager .dataset .load_records_dict ()
277- for record in records .values ():
278- if Fields .FILE not in record :
279- continue
280-
281- # Note: we check the source_records based on the cpids
282- # in the record because cpids are not stored in the source_record
283- # (pdf hashes may change after import/preparation)
284- source_rec = {}
285- if corresponding_origin != "" :
286- source_origin_l = [
287- o for o in record [Fields .ORIGIN ] if corresponding_origin in o
288- ]
289- if len (source_origin_l ) == 1 :
290- source_origin = source_origin_l [0 ]
291- source_origin = source_origin .replace (
292- f"{ corresponding_origin } /" , ""
293- )
294- source_rec_l = [
295- s for s in source_records if s [Fields .ID ] == source_origin
296- ]
297- if len (source_rec_l ) == 1 :
298- source_rec = source_rec_l [0 ]
299-
300- if source_rec :
301- if (
302- self .review_manager .path / Path (record [Fields .FILE ])
303- ).is_file () and (
304- self .review_manager .path / Path (source_rec [Fields .FILE ])
305- ).is_file ():
306- continue
307- else :
308- if (self .review_manager .path / Path (record [Fields .FILE ])).is_file ():
309- continue
310-
311- self .review_manager .logger .info (record [Fields .ID ])
312-
313- for pdf_candidate , cpid in pdf_candidates .items ():
314- if record .get ("colrev_pdf_id" , "" ) == cpid :
315- record [Fields .FILE ] = str (pdf_candidate )
316- source_rec [Fields .FILE ] = str (pdf_candidate )
317-
318- self .review_manager .logger .info (
319- f"Found and linked PDF: { pdf_candidate } "
320- )
321- break
322-
323- if len (source_records ) > 0 :
324- source_records_dict = {r [Fields .ID ]: r for r in source_records }
325- write_file (
326- records_dict = source_records_dict , filename = source .search_results_path
327- )
328-
329- self .review_manager .dataset .save_records_dict (records )
330- self .review_manager .dataset .git_repo .add_changes (source .search_results_path )
331-
332391 def relink_pdfs (self ) -> None :
333392 """Relink record files to the corresponding PDFs (if available)"""
334393
@@ -339,8 +398,18 @@ def relink_pdfs(self) -> None:
339398 for s in self .review_manager .settings .sources
340399 if s .platform == "colrev.files_dir" and s .search_results_path .is_file ()
341400 ]
401+ records = self .review_manager .dataset .load_records_dict ()
402+
342403 for source in sources :
343- self ._relink_pdfs_in_source (source )
404+ relink_pdfs_in_source (
405+ source = source ,
406+ records = records ,
407+ pdf_dir = self .review_manager .paths .pdf ,
408+ logger = self .review_manager .logger ,
409+ )
410+ self .review_manager .dataset .git_repo .add_changes (source .search_results_path )
411+
412+ self .review_manager .dataset .save_records_dict (records )
344413
345414 self .review_manager .create_commit (msg = "Relink PDFs" )
346415
0 commit comments