1515from collections .abc import Iterable
1616from pathlib import Path
1717
18- from aboutcode .federated import DataCluster
1918from aboutcode .federated import DataFederation
2019from aboutcode .pipeline import LoopProgress
20+ from packageurl import PackageURL
2121from scanpipe .pipelines import Pipeline
2222from scanpipe .pipes import federatedcode
2323
2424from minecode_pipelines import pipes
25+ from minecode_pipelines .pipes import write_package_data_to_file
2526from minecode_pipelines .pipes import write_packageurls_to_file
2627
2728module_logger = logging .getLogger (__name__ )
@@ -89,15 +90,20 @@ def fetch_federation_config(self):
8990 name = "aboutcode-data" ,
9091 remote_root_url = "https://github.com/aboutcode-data" ,
9192 )
92- self .data_cluster = data_federation .get_cluster ("purls" )
93+ self .data_clusters = {
94+ "purls" : data_federation .get_cluster ("purls" ),
95+ "api_package_version_responses" : data_federation .get_cluster (
96+ "api_package_version_responses"
97+ ),
98+ }
9399
94100 def mine_and_publish_packageurls (self ):
95101 """Mine and publish PackageURLs."""
96102
97103 _mine_and_publish_packageurls (
98104 packageurls = self .mine_packageurls (),
99105 total_package_count = self .packages_count (),
100- data_cluster = self .data_cluster ,
106+ data_clusters = self .data_clusters ,
101107 checked_out_repos = self .checked_out_repos ,
102108 working_path = self .working_path ,
103109 append_purls = self .append_purls ,
@@ -141,16 +147,75 @@ def log(self, message, level=logging.INFO):
141147 self .append_to_log (message )
142148
143149
150+ def commit_and_push_packageurls (
151+ current_working_repos ,
152+ commit_msg_func ,
153+ checkpoint_func ,
154+ checkpoint_on_commit ,
155+ checkpoint_interval ,
156+ last_checkpoint_call ,
157+ logger ,
158+ ):
159+ """
160+ Given a list of `current_working_repos`, commit and push changes to each repo with the commit message returned from `commit_msg_func`.
161+
162+ If `checkpoint_on_commit` is True and `checkpoint_func` exists, then we execute `checkpoint_func`.
163+
164+ If `checkpoint_on_commit` is False, then we determine if it is time to call `checkpoint_func` or not.
165+ """
166+
167+ if logger :
168+ logger ("Trying to commit PackageURLs." )
169+
170+ for repo_checkout in current_working_repos :
171+ pipes .commit_and_push_checkout (
172+ local_checkout = repo_checkout ,
173+ commit_message = commit_msg_func (repo_checkout ["commit_count" ] + 1 ),
174+ logger = logger ,
175+ )
176+
177+ if checkpoint_on_commit and checkpoint_func :
178+ checkpoint_func ()
179+
180+ if not checkpoint_on_commit :
181+ time_now = time .time ()
182+ checkpoint_due = time_now - last_checkpoint_call >= checkpoint_interval
183+ if checkpoint_func and checkpoint_due :
184+ checkpoint_func ()
185+ last_checkpoint_call = time_now
186+
187+
188+ def get_repo_checkout_from_data_cluster (
189+ data_cluster , purl , checked_out_repos , working_path , logger , datafile_name = None
190+ ):
191+ """
192+ Return a `repo_checkout` and `datafile_path` for a given `purl`, `data_cluster`, and `working_path`.
193+
194+ Add `repo_checkout` to `checked_out_repos`.
195+ """
196+ repo , datafile_path = data_cluster .get_datafile_repo_and_path (
197+ purl = purl , datafile_name = datafile_name
198+ )
199+ if repo not in checked_out_repos :
200+ checked_out_repos [repo ] = pipes .init_local_checkout (
201+ repo_name = repo ,
202+ working_path = working_path ,
203+ logger = logger ,
204+ )
205+ repo_checkout = checked_out_repos [repo ]
206+ return repo_checkout , datafile_path
207+
208+
144209def _mine_and_publish_packageurls (
145210 packageurls : Iterable ,
146211 total_package_count : int ,
147- data_cluster : DataCluster ,
212+ data_clusters ,
148213 checked_out_repos : dict ,
149214 working_path : Path ,
150215 append_purls : bool ,
151216 commit_msg_func : Callable ,
152217 logger : Callable ,
153- batch_size : int = 4000 ,
218+ batch_size : int = 100 ,
154219 checkpoint_on_commit : bool = False ,
155220 checkpoint_func : Callable = None ,
156221 checkpoint_freq : int = 30 ,
@@ -172,45 +237,94 @@ def _mine_and_publish_packageurls(
172237 iterator = progress .iter (iterator )
173238 logger (f"Mine PackageURL for { total_package_count :,d} packages." )
174239
175- for base , purls in iterator :
240+ purls_data_cluster = data_clusters ["purls" ]
241+ api_package_version_responses_data_cluster = data_clusters ["api_package_version_responses" ]
242+
243+ current_working_repos = []
244+ currently_processed_files_count = 0
245+ for base , purls , purls_and_package_data in iterator :
176246 if not purls or not base :
177247 continue
178248
179- package_repo , datafile_path = data_cluster .get_datafile_repo_and_path (purl = base )
180- if package_repo not in checked_out_repos :
181- checked_out_repos [package_repo ] = pipes .init_local_checkout (
182- repo_name = package_repo ,
183- working_path = working_path ,
184- logger = logger ,
185- )
249+ purls_package_repo_checkout , purls_datafile_path = get_repo_checkout_from_data_cluster (
250+ data_cluster = purls_data_cluster ,
251+ purl = base ,
252+ checked_out_repos = checked_out_repos ,
253+ working_path = working_path ,
254+ logger = logger ,
255+ )
256+ if purls_package_repo_checkout not in current_working_repos :
257+ current_working_repos .append (purls_package_repo_checkout )
186258
187- checkout = checked_out_repos [package_repo ]
188259 purl_file = write_packageurls_to_file (
189- repo = checkout ["repo" ],
190- relative_datafile_path = datafile_path ,
260+ repo = purls_package_repo_checkout ["repo" ],
261+ relative_datafile_path = purls_datafile_path ,
191262 packageurls = purls ,
192263 append = append_purls ,
193264 )
194- checkout ["file_to_commit" ].add (purl_file )
195- checkout ["file_processed_count" ] += 1
196-
197- if len (checkout ["file_to_commit" ]) > batch_size :
198- if logger :
199- logger ("Trying to commit PackageURLs." )
200- pipes .commit_and_push_checkout (
201- local_checkout = checkout ,
202- commit_message = commit_msg_func (checkout ["commit_count" ] + 1 ),
265+ purls_package_repo_checkout ["file_to_commit" ].add (purl_file )
266+ purls_package_repo_checkout ["file_processed_count" ] += 1
267+ currently_processed_files_count += 1
268+
269+ if currently_processed_files_count > batch_size :
270+ commit_and_push_packageurls (
271+ current_working_repos = current_working_repos ,
272+ commit_msg_func = commit_msg_func ,
273+ checkpoint_func = checkpoint_func ,
274+ checkpoint_on_commit = checkpoint_on_commit ,
275+ checkpoint_interval = checkpoint_interval ,
276+ last_checkpoint_call = last_checkpoint_call ,
203277 logger = logger ,
204278 )
205- if checkpoint_on_commit and checkpoint_func :
206- checkpoint_func ()
207-
208- if not checkpoint_on_commit :
209- time_now = time .time ()
210- checkpoint_due = time_now - last_checkpoint_call >= checkpoint_interval
211- if checkpoint_func and checkpoint_due :
212- checkpoint_func ()
213- last_checkpoint_call = time_now
279+ current_working_repos = []
280+ currently_processed_files_count = 0
281+
282+ for purl , api_package_version_response in purls_and_package_data :
283+ if not isinstance (purl , PackageURL ):
284+ package_url = PackageURL .from_string (purl )
285+ else :
286+ package_url = purl
287+ if package_url .type == "maven" :
288+ datafile_name = "pom.xml"
289+ else :
290+ datafile_name = "api_package_version_response.json"
291+ api_package_version_responses_repo_checkout , api_package_metadata_datafile_path = (
292+ get_repo_checkout_from_data_cluster (
293+ data_cluster = api_package_version_responses_data_cluster ,
294+ purl = purl ,
295+ checked_out_repos = checked_out_repos ,
296+ working_path = working_path ,
297+ logger = logger ,
298+ datafile_name = datafile_name ,
299+ )
300+ )
301+ if api_package_version_responses_repo_checkout not in current_working_repos :
302+ current_working_repos .append (api_package_version_responses_repo_checkout )
303+
304+ api_package_version_response_file = write_package_data_to_file (
305+ repo = api_package_version_responses_repo_checkout ["repo" ],
306+ relative_api_package_metadata_datafile_path = api_package_metadata_datafile_path ,
307+ package_data = api_package_version_response ,
308+ )
309+
310+ api_package_version_responses_repo_checkout ["file_to_commit" ].add (
311+ api_package_version_response_file
312+ )
313+ api_package_version_responses_repo_checkout ["file_processed_count" ] += 1
314+ currently_processed_files_count += 1
315+
316+ if currently_processed_files_count > batch_size :
317+ commit_and_push_packageurls (
318+ current_working_repos = current_working_repos ,
319+ commit_msg_func = commit_msg_func ,
320+ checkpoint_func = checkpoint_func ,
321+ checkpoint_on_commit = checkpoint_on_commit ,
322+ checkpoint_interval = checkpoint_interval ,
323+ last_checkpoint_call = last_checkpoint_call ,
324+ logger = logger ,
325+ )
326+ current_working_repos = []
327+ currently_processed_files_count = 0
214328
215329 for checkout in checked_out_repos .values ():
216330 final_commit_count = checkout ["commit_count" ] + 1
0 commit comments