1+ #
2+ # Copyright (c) nexB Inc. and others. All rights reserved.
3+ # VulnerableCode is a trademark of nexB Inc.
4+ # SPDX-License-Identifier: Apache-2.0
5+ # See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+ # See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+ # See https://aboutcode.org for more information about nexB OSS projects.
8+ #
9+ import json
10+ import logging
11+ import re
12+ from datetime import timezone
13+ from typing import Iterable
14+ import gzip
15+ import io
16+ from xml .etree import ElementTree as ET
17+
18+ import requests
19+ from bs4 import BeautifulSoup
20+ from dateutil import parser as dateparser
21+ from packageurl import PackageURL
22+ from univers .version_range import GenericVersionRange
23+ from univers .version_range import VersionRange
24+ from univers .versions import SemverVersion
25+
26+ from vulnerabilities .importer import AdvisoryData
27+ from vulnerabilities .importer import AffectedPackage
28+ from vulnerabilities .importer import Reference
29+ from vulnerabilities .importer import VulnerabilitySeverity
30+ from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipeline
31+ from vulnerabilities .severity_systems import GENERIC
32+ from vulnerabilities .severity_systems import CVSSV31
33+ from vulnerabilities .utils import fetch_response
34+ from vulnerabilities .utils import get_item
35+
36+ logging .basicConfig (level = logging .INFO )
37+ logger = logging .getLogger (__name__ )
38+
39+
40+ class BottleRocketImporterPipeline (VulnerableCodeBaseImporterPipeline ):
41+ """Collect Advisories from BottleRocket"""
42+
43+ pipeline_id = "bottlerocket_importer"
44+ spdx_license_expression = "Apache-2.0"
45+ license_url = "https://github.com/bottlerocket-os/bottlerocket/blob/develop/LICENSE-APACHE"
46+ root_url = "https://advisories.bottlerocket.aws/updateinfo.xml.gz"
47+ importer_name = "Bottle Rocket Importer"
48+
49+ def __init__ (self ):
50+ super ().__init__ ()
51+
52+ @classmethod
53+ def steps (cls ):
54+ return (
55+ cls .collect_and_store_advisories ,
56+ cls .import_new_advisories ,
57+ )
58+
59+ # num of advisories
60+ def advisories_count (self ) -> int :
61+ return len (fetch_advisory_data (self .root_url ))
62+
63+ # parse the response data
64+ def collect_advisories (self ) -> Iterable [AdvisoryData ]:
65+ advisory_data = fetch_advisory_data (self .root_url ) #list
66+
67+ for data in advisory_data :
68+ yield to_advisory_data (data )
69+
70+
71+
72+ def fetch_advisory_data (url ):
73+ """Fetches advisory data from the gzipped xml file,returns a list"""
74+ response = requests .get (url , stream = True )
75+
76+ if response .status_code == 200 :
77+ with gzip .GzipFile (fileobj = io .BytesIO (response .content )) as gz :
78+ xml_content = gz .read ()
79+
80+ #parsing the xml content
81+ root = ET .fromstring (xml_content )
82+
83+ #extract and filter updates
84+ filtered_updates = [] #list containing dicts
85+ """each element looks like this
86+ {
87+ 'issued_date': '2025-03-07T01:00:15Z',
88+ 'severity': 'important',
89+ 'description': 'In the Linux kernel, the following vulnerability has been resolved: ext4: fix timer use-after-free on failed mount',
90+ 'references': [
91+ {
92+ 'href': 'http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-49960',
93+ 'id': 'CVE-2024-49960',
94+ 'type': 'cve'
95+ },
96+ {
97+ 'href': 'https://github.com/bottlerocket-os/bottlerocket-kernel-kit/blob/develop/advisories/1.2.1/BRSA-th6e2wrokkoq.toml',
98+ 'id': 'BRSA-th6e2wrokkoq',
99+ 'type': 'brsa'
100+ }
101+ ],
102+ 'packages': [
103+ {
104+ 'arch': 'x86_64',
105+ 'name': 'kernel-5.10',
106+ 'version': '5.10.234',
107+ 'release': '1.1741301886.9165eb8.br1',
108+ 'epoch': '0'
109+ }
110+ ]
111+ }
112+ """
113+ for update in root .findall ('update' ):
114+ filtered_packages = [] #filtered packages with arch='x86_64' : [{'arch': 'x86_64', 'name': 'kernel-5.15', 'version': '5.15.178', 'release': '1.1740527062.132b0a1.br1', 'epoch': '0'}, {'arch': 'x86_64', 'name': 'bottlerocket-kernel-5.15', 'version': '5.15.178', 'release': '1.1740527062.132b0a1.br1', 'epoch': '0'}]
115+ for pkg in update .find ('pkglist' ).find ('collection' ).findall ('package' ):
116+ if pkg .attrib ['arch' ] == 'x86_64' :
117+ filtered_packages .append (pkg .attrib )
118+
119+ filtered_update = {
120+ 'issued_date' : update .find ('issued' ).attrib ['date' ],
121+ 'severity' : update .find ('severity' ).text ,
122+ 'description' : update .find ('description' ).text ,
123+ 'references' : [ref .attrib for ref in update .find ('references' ).findall ('reference' )], #contains the cve id
124+ 'packages' : filtered_packages
125+ }
126+ filtered_updates .append (filtered_update )
127+
128+ return filtered_updates
129+ else :
130+ print (f"failed to fetch the file.Code:{ response .status_code } " )
131+
132+
133+
134+ def to_advisory_data (raw_data ) -> AdvisoryData :
135+ """Parses extracted data to Advisory Data"""
136+
137+ #aliases
138+ aliases = []
139+
140+ # severity
141+ severity = VulnerabilitySeverity (
142+ system = GENERIC ,
143+ value = get_item (raw_data ,"severity" )
144+ )
145+
146+ #references
147+ filtered_references = []
148+ references = get_item (raw_data ,"references" ) # a list
149+ for reference in references :
150+ url = get_item (reference ,"href" )
151+ id = get_item (reference ,"id" )
152+ filtered_references .append (
153+ Reference (
154+ severities = [severity ],
155+ reference_id = id ,
156+ url = url
157+ )
158+ )
159+ aliases .append (id )
160+
161+
162+ # affected packages
163+ filtered_affected_packages = []
164+ affected_packages = get_item (raw_data , "packages" ) # list of dicts
165+ for package in affected_packages :
166+ package_name = get_item (package ,"name" )
167+ fix_version = get_item (package ,"version" )
168+ filtered_affected_packages .append (
169+ AffectedPackage (
170+ package = PackageURL (type = "bottle-rocket" , name = package_name ),
171+ affected_version_range = VersionRange .from_native (f"<{ fix_version } " ),
172+ fixed_version = SemverVersion (fix_version )
173+ )
174+ )
175+
176+ # description
177+ description = get_item (raw_data , "description" )
178+
179+ # date published
180+ date_published = get_item (raw_data , "issued_date" )
181+ date_published = dateparser .parse (date_published , yearfirst = True ).replace (tzinfo = timezone .utc )
182+
183+ return AdvisoryData (
184+ aliases = aliases ,
185+ summary = description ,
186+ affected_packages = filtered_affected_packages ,
187+ references = filtered_references ,
188+ date_published = date_published ,
189+ )
0 commit comments