99
1010# Author: Navonil Das (@NavonilDas)
1111
12+ import json
13+ import os
14+ import tempfile
1215from pathlib import Path
1316from typing import Iterable
1417
1518import pytz
19+ import requests
1620from dateutil .parser import parse
1721from fetchcode .vcs import fetch_via_vcs
1822from packageurl import PackageURL
1923from univers .version_range import NpmVersionRange
24+ from univers .versions import SemverVersion
2025
2126from vulnerabilities .importer import AdvisoryData
2227from vulnerabilities .importer import AffectedPackage
@@ -39,28 +44,88 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipeline):
3944 repo_url = "git+https://github.com/nodejs/security-wg"
4045 importer_name = "Npm Importer"
4146
47+ is_batch_run = True
48+
49+ def __init__ (self , * args , purl = None , ** kwargs ):
50+ super ().__init__ (* args , ** kwargs )
51+ self .purl = purl
52+ if self .purl :
53+ NpmImporterPipeline .is_batch_run = False
54+ if self .purl .type != "npm" :
55+ print (f"Warning: This importer handles NPM packages. Current PURL: { self .purl !s} " )
56+
4257 @classmethod
4358 def steps (cls ):
44- return (
59+ if not cls .is_batch_run :
60+ return [
61+ cls .fetch_package_advisories ,
62+ cls .collect_and_store_advisories ,
63+ cls .import_new_advisories ,
64+ ]
65+
66+ return [
4567 cls .clone ,
4668 cls .collect_and_store_advisories ,
4769 cls .import_new_advisories ,
4870 cls .clean_downloads ,
49- )
71+ ]
5072
5173 def clone (self ):
5274 self .log (f"Cloning `{ self .repo_url } `" )
5375 self .vcs_response = fetch_via_vcs (self .repo_url )
5476
77+ def fetch_package_advisories (self ):
78+ if not self .purl or self .purl .type != "npm" :
79+ return
80+
81+ self .log (f"Fetching advisories for package { self .purl .name } " )
82+
83+ package_name = self .purl .name
84+
85+ self .temp_dir = tempfile .mkdtemp ()
86+ self .package_advisories = []
87+
88+ api_url = "https://api.github.com/repos/nodejs/security-wg/contents/vuln/npm"
89+ response = requests .get (api_url )
90+
91+ if response .status_code != 200 :
92+ self .log (f"Failed to fetch advisories directory: { response .status_code } " )
93+ return
94+
95+ for item in response .json ():
96+ if item ["type" ] == "file" and item ["name" ].endswith (".json" ):
97+ file_url = item ["download_url" ]
98+ try :
99+ file_content = requests .get (file_url ).json ()
100+
101+ if file_content .get ("module_name" ) == package_name :
102+ file_path = os .path .join (self .temp_dir , item ["name" ])
103+ with open (file_path , "w" ) as f :
104+ json .dump (file_content , f )
105+ self .package_advisories .append (file_path )
106+ except Exception as e :
107+ self .log (f"Error processing advisory file { item ['name' ]} : { str (e )} " )
108+
109+ self .log (f"Found { len (self .package_advisories )} advisories for package { package_name } " )
110+
55111 def advisories_count (self ):
56- vuln_directory = Path (self .vcs_response .dest_dir ) / "vuln" / "npm"
57- return sum (1 for _ in vuln_directory .glob ("*.json" ))
112+ if NpmImporterPipeline .is_batch_run :
113+ vuln_directory = Path (self .vcs_response .dest_dir ) / "vuln" / "npm"
114+ return sum (1 for _ in vuln_directory .glob ("*.json" ))
115+ else :
116+ return len (getattr (self , "package_advisories" , []))
58117
59118 def collect_advisories (self ) -> Iterable [AdvisoryData ]:
60- vuln_directory = Path (self .vcs_response .dest_dir ) / "vuln" / "npm"
119+ if NpmImporterPipeline .is_batch_run :
120+ vuln_directory = Path (self .vcs_response .dest_dir ) / "vuln" / "npm"
121+ for advisory in vuln_directory .glob ("*.json" ):
122+ yield from self .to_advisory_data (advisory )
123+ else :
124+ if not hasattr (self , "package_advisories" ):
125+ return
61126
62- for advisory in vuln_directory . glob ( "*.json" ) :
63- yield from self .to_advisory_data (advisory )
127+ for advisory_path in self . package_advisories :
128+ yield from self .to_advisory_data (Path ( advisory_path ) )
64129
65130 def to_advisory_data (self , file : Path ) -> Iterable [AdvisoryData ]:
66131 data = load_json (file )
@@ -112,6 +177,11 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]:
112177 affected_packages .append (self .get_affected_package (data , package_name ))
113178 advsisory_aliases = data .get ("cves" ) or []
114179
180+ if self .purl and self .purl .version :
181+ affected_package = affected_packages [0 ] if affected_packages else None
182+ if affected_package and not self ._version_is_affected (affected_package ):
183+ return
184+
115185 for alias in advsisory_aliases :
116186 yield AdvisoryData (
117187 summary = build_description (summary = summary , description = description ),
@@ -122,6 +192,13 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]:
122192 url = f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{ id } .json" ,
123193 )
124194
195+ def _version_is_affected (self , affected_package ):
196+ if not self .purl .version or not affected_package .affected_version_range :
197+ return True
198+
199+ purl_version = SemverVersion (self .purl .version )
200+ return purl_version in affected_package .affected_version_range
201+
125202 def get_affected_package (self , data , package_name ):
126203 affected_version_range = None
127204 unaffected_version_range = None
@@ -164,5 +241,11 @@ def clean_downloads(self):
164241 self .log (f"Removing cloned repository" )
165242 self .vcs_response .delete ()
166243
244+ if hasattr (self , "temp_dir" ) and os .path .exists (self .temp_dir ):
245+ import shutil
246+
247+ self .log (f"Removing temporary directory" )
248+ shutil .rmtree (self .temp_dir )
249+
167250 def on_failure (self ):
168251 self .clean_downloads ()
0 commit comments