1616from typing import Optional
1717
1818import pytz
19+ import requests
1920from bs4 import BeautifulSoup
2021from packageurl import PackageURL
2122from univers .versions import RpmVersion
2526from vulnerabilities .importer import Importer
2627from vulnerabilities .importer import Reference
2728from vulnerabilities .importer import VulnerabilitySeverity
29+ from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipeline
2830from vulnerabilities .rpm_utils import rpm_to_purl
2931from vulnerabilities .severity_systems import SCORING_SYSTEMS
3032from vulnerabilities .utils import fetch_response
3133from vulnerabilities .utils import is_cve
3234
3335LOGGER = logging .getLogger (__name__ )
34- BASE_URL = "https://alas.aws.amazon.com/"
3536
3637
37- class AmazonLinuxImporter (Importer ):
38- spdx_license_expression = "CC BY 4.0"
39- license_url = " " # TODO
38+ class AmazonLinuxImporterPipeline (VulnerableCodeBaseImporterPipeline ):
39+ """Imports Amazon Linux security advisories"""
4040
41+ pipeline_id = "amazon_linux_importer"
42+ BASE_URL = "https://alas.aws.amazon.com/"
43+ spdx_license_expression = "CC BY 4.0"
44+ license_url = "Unknown"
4145 importer_name = "Amazon Linux Importer"
4246
43- def advisory_data (self ) -> Iterable [AdvisoryData ]:
44- amazon_linux_1_url = BASE_URL + "/index.html"
45- amazon_linux_2_url = BASE_URL + "/alas2.html"
46- amazon_linux_2023_url = BASE_URL + "/alas2023.html"
47+ @classmethod
48+ def steps (cls ):
49+ return (
50+ cls .fetch ,
51+ cls .collect_and_store_advisories ,
52+ cls .import_new_advisories ,
53+ )
54+
55+ def fetch (self ):
56+ self .log (f"Fetch `{ self .BASE_URL } `" )
57+ amazon_linux_1_url = self .BASE_URL + "/index.html"
58+ amazon_linux_2_url = self .BASE_URL + "/alas2.html"
59+ amazon_linux_2023_url = self .BASE_URL + "/alas2023.html"
4760 amazonlinux_advisories_pages = [
4861 amazon_linux_1_url ,
4962 amazon_linux_2_url ,
@@ -52,18 +65,40 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
5265 alas_dict = {}
5366 for amazonlinux_advisories_page in amazonlinux_advisories_pages :
5467 alas_dict .update (fetch_alas_id_and_advisory_links (amazonlinux_advisories_page ))
68+ self .advisory_data = alas_dict
69+ # self.advisory_data = requests.get(self.url).text
70+
71+ def advisories_count (self ):
72+ return len (self .advisory_data )
73+
74+ def collect_advisories (self ) -> Iterable [AdvisoryData ]:
75+ """
76+ Yield AdvisoryData from nginx security advisories HTML
77+ web page.
78+ """
5579
56- for alas_id , alas_url in alas_dict .items ():
80+ for alas_id , alas_url in self . advisory_data .items ():
5781 # It iterates through alas_dict to get alas ids and alas url
58- if alas_id and alas_url :
59- alas_advisory_page_content = fetch_response (alas_url ).content
60- yield process_advisory_data (alas_id , alas_advisory_page_content , alas_url )
82+ if not (alas_id and alas_url ):
83+ continue
84+ try :
85+ # Fetch the advisory page content
86+ response = fetch_response (alas_url )
87+ alas_advisory_page_content = response .content
88+
89+ except Exception as e :
90+ # Log the error and continue to the next item
91+ LOGGER .error (f"Failed to fetch advisory { alas_id } from { alas_url } : { str (e )} " )
92+ continue
93+
94+ # Process and yield data if successful
95+ yield process_advisory_data (alas_id , alas_advisory_page_content , alas_url )
6196
6297
6398def fetch_alas_id_and_advisory_links (page_url : str ) -> dict [str , str ]:
6499 """
65100 Return a dictionary where 'ALAS' entries are the keys and
66- their corresponding advisory page links are the values.
101+ their corresponding advisory page link strings are the values.
67102 """
68103
69104 page_response_content = fetch_response (page_url ).content
@@ -253,7 +288,6 @@ def get_date_published(release_date_string):
253288 # Parse the date and time
254289 if release_date_string :
255290 date_part = release_date_string [:16 ]
256- time_zone = release_date_string [17 :]
257291 else :
258292 return None
259293
0 commit comments