88#
99
1010import re
11- from datetime import datetime
1211from pathlib import Path
1312from typing import Iterable
1413from typing import Tuple
@@ -35,7 +34,9 @@ class OSSAImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
3534 spdx_license_expression = "CC-BY-3.0"
3635 license_url = "https://github.com/openstack/ossa/blob/master/LICENSE"
3736 repo_url = "git+https://github.com/openstack/ossa"
38- cutoff_years = 10
37+
38+ # Advisories published before this year are not consumed due to inconsistent schema and irrelevance
39+ cutoff_year = 2016
3940
4041 @classmethod
4142 def steps (cls ):
@@ -50,33 +51,24 @@ def clone(self):
5051 self .log (f"Cloning `{ self .repo_url } `" )
5152 self .vcs_response = fetch_via_vcs (self .repo_url )
5253
53- def _get_cutoff_date (self ):
54- current_date = datetime .now (UTC )
55- cutoff_year = current_date .year - self .cutoff_years
56- return current_date .replace (year = cutoff_year )
57-
5854 def fetch (self ):
5955 ossa_dir = Path (self .vcs_response .dest_dir ) / "ossa"
60- cutoff = self ._get_cutoff_date ()
6156 self .processable_advisories = []
6257 skipped_old = 0
6358
6459 for file_path in sorted (ossa_dir .glob ("OSSA-*.yaml" )):
6560 data = load_yaml (str (file_path ))
66- date_str = data .get ("date" )
6761
68- if date_str :
69- date_published = dateparser .parse (str (date_str ))
70- date_published = date_published .replace (tzinfo = UTC )
71-
72- if date_published < cutoff :
73- skipped_old += 1
74- continue
62+ date_str = data .get ("date" )
63+ date_published = dateparser .parse (str (date_str )).replace (tzinfo = UTC )
64+ if date_published .year < self .cutoff_year :
65+ skipped_old += 1
66+ continue
7567
7668 self .processable_advisories .append (file_path )
7769
7870 if skipped_old > 0 :
79- self .log (f"Skipped { skipped_old } advisories older than { self .cutoff_years } years " )
71+ self .log (f"Skipped { skipped_old } advisories older than { self .cutoff_year } " )
8072 self .log (f"Fetched { len (self .processable_advisories )} processable advisories" )
8173
8274 def advisories_count (self ) -> int :
@@ -87,31 +79,26 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
8779 advisory = self .process_file (file_path )
8880 yield advisory
8981
90- def process_file (self , file_path ):
82+ def process_file (self , file_path ) -> AdvisoryData :
83+ """Parse a single OSSA YAML file and extract advisory data"""
9184 data = load_yaml (str (file_path ))
9285 ossa_id = data .get ("id" )
9386
94- date_published = None
9587 date_str = data .get ("date" )
96- date_published = dateparser .parse (str (date_str ))
97- date_published = date_published .replace (tzinfo = UTC )
88+ date_published = dateparser .parse (str (date_str )).replace (tzinfo = UTC )
9889
9990 aliases = []
10091 for vulnerability in data .get ("vulnerabilities" ):
101- cve = vulnerability .get ("cve-id" , "" )
92+ cve = vulnerability .get ("cve-id" )
10293 aliases .append (cve )
10394
10495 affected_packages = []
10596 for entry in data .get ("affected-products" ):
106- product = entry .get ("product" , "" )
107- version = entry .get ("version" , "" )
108-
109- if not product :
110- self .log (f"Missing affected-product: { ossa_id } " )
111- continue
97+ product = entry .get ("product" )
98+ version = entry .get ("version" )
11299
113100 for package_name , version_str in self .expand_products (product , version ):
114- purl = PackageURL (type = "pypi" , name = package_name . lower () )
101+ purl = PackageURL (type = "pypi" , name = package_name )
115102 version_range = self .parse_version_range (version_str )
116103 if purl and version_range :
117104 affected_packages .append (
@@ -129,8 +116,8 @@ def process_file(self, file_path):
129116 for link in links :
130117 references .append (ReferenceV2 (url = link ))
131118
132- title = data .get ("title" , "" )
133- description = data .get ("description" , "" )
119+ title = data .get ("title" )
120+ description = data .get ("description" )
134121 summary = f"{ title } \n \n { description } "
135122 url = f"https://security.openstack.org/ossa/{ ossa_id } .html"
136123 return AdvisoryData (
@@ -151,6 +138,7 @@ def expand_products(self, product_str, version_str) -> Iterable[Tuple[str, str]]
151138 Format 2:
152139 product="Cinder, Glance"
153140 version="<1.0"
141+ This function handles both formats and yields tuples of (product, version) for each affected product.
154142 """
155143 # Format 1: "Cinder <1.0; Glance <2.0"
156144 if ";" in version_str :
@@ -169,7 +157,10 @@ def expand_products(self, product_str, version_str) -> Iterable[Tuple[str, str]]
169157
170158 yield product_str , version_str
171159
172- def parse_version_range (self , version_str : str ):
160+ def parse_version_range (self , version_str : str ) -> PypiVersionRange :
161+ """
162+ Normalizes the version string and extracts individual constraints to create a PypiVersionRange object.
163+ """
173164 original_version_str = version_str
174165
175166 if version_str .lower () == "all versions" :
0 commit comments