Skip to content

Commit 7efd230

Browse files
committed
Add Check Point security advisories importer
Signed-off-by: Anmol Vats <anmolvats2003@gmail.com>
1 parent 2dbbd38 commit 7efd230

File tree

4 files changed

+462
-0
lines changed

4 files changed

+462
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
4848
from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2
4949
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
50+
from vulnerabilities.pipelines.v2_importers import checkpoint_importer as checkpoint_importer_v2
5051
from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2
5152
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
5253
from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2
@@ -88,6 +89,7 @@
8889
[
8990
archlinux_importer_v2.ArchLinuxImporterPipeline,
9091
apache_kafka_importer_v2.ApacheKafkaImporterPipeline,
92+
checkpoint_importer_v2.CheckPointImporterPipeline,
9193
nvd_importer_v2.NVDImporterPipeline,
9294
elixir_security_importer_v2.ElixirSecurityImporterPipeline,
9395
npm_importer_v2.NpmImporterPipeline,
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import datetime
11+
import json
12+
import logging
13+
from typing import Iterable
14+
15+
import dateparser
16+
import requests
17+
from bs4 import BeautifulSoup
18+
19+
from vulnerabilities.importer import AdvisoryDataV2
20+
from vulnerabilities.importer import ReferenceV2
21+
from vulnerabilities.importer import VulnerabilitySeverity
22+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
23+
from vulnerabilities.severity_systems import GENERIC
24+
25+
logger = logging.getLogger(__name__)
26+
27+
ADVISORY_BASE_URL = "https://advisories.checkpoint.com"
28+
ADVISORY_LIST_URL = "https://advisories.checkpoint.com/advisories/"
29+
30+
31+
class CheckPointImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
32+
"""Collect Check Point security advisories."""
33+
34+
pipeline_id = "checkpoint_importer"
35+
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
36+
license_url = "https://advisories.checkpoint.com/"
37+
url = ADVISORY_LIST_URL
38+
precedence = 200
39+
40+
@classmethod
41+
def steps(cls):
42+
return (
43+
cls.fetch,
44+
cls.collect_and_store_advisories,
45+
)
46+
47+
def fetch(self):
48+
self.log(f"Fetch `{self.url}`")
49+
self.advisories_data = list(fetch_all_advisory_rows(self.log))
50+
51+
def advisories_count(self):
52+
return len(self.advisories_data)
53+
54+
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
55+
for row_data in self.advisories_data:
56+
advisory = parse_advisory(row_data)
57+
if advisory:
58+
yield advisory
59+
60+
61+
def get_available_years(soup: BeautifulSoup) -> list:
62+
"""Return sorted list of years from year-navigation links, including current year."""
63+
years = set()
64+
for link in soup.find_all("a", href=True):
65+
href = link["href"]
66+
if "/defense/advisories/public/" in href:
67+
part = href.rstrip("/").split("/")[-1]
68+
if part.isdigit() and len(part) == 4:
69+
years.add(int(part))
70+
years.add(datetime.date.today().year)
71+
return sorted(years)
72+
73+
74+
def get_total_pages(soup: BeautifulSoup) -> int:
75+
"""Return total page count from pagination links."""
76+
page_nums = []
77+
for link in soup.find_all("a", href=True):
78+
href = link["href"]
79+
if "/advisories/page/" in href:
80+
part = href.split("/page/")[-1].split("?")[0].strip("/")
81+
if part.isdigit():
82+
page_nums.append(int(part))
83+
return max(page_nums) if page_nums else 1
84+
85+
86+
def fetch_all_advisory_rows(log_fn) -> Iterable[dict]:
87+
"""Yield row dicts for all advisories across all years and pages."""
88+
try:
89+
resp = requests.get(ADVISORY_LIST_URL, timeout=30)
90+
resp.raise_for_status()
91+
except requests.exceptions.RequestException as e:
92+
log_fn(f"Failed to fetch {ADVISORY_LIST_URL}: {e}")
93+
return
94+
95+
soup = BeautifulSoup(resp.text, features="lxml")
96+
years = get_available_years(soup)
97+
if not years:
98+
log_fn("No years found on advisories page")
99+
return
100+
101+
for year in years:
102+
url = f"{ADVISORY_LIST_URL}?year={year}"
103+
try:
104+
resp = requests.get(url, timeout=30)
105+
resp.raise_for_status()
106+
except requests.exceptions.RequestException as e:
107+
log_fn(f"Failed to fetch {url}: {e}")
108+
continue
109+
110+
year_soup = BeautifulSoup(resp.text, features="lxml")
111+
total_pages = get_total_pages(year_soup)
112+
yield from parse_table_rows(resp.text)
113+
114+
for page in range(2, total_pages + 1):
115+
page_url = f"{ADVISORY_LIST_URL}page/{page}/?year={year}"
116+
try:
117+
resp = requests.get(page_url, timeout=30)
118+
resp.raise_for_status()
119+
except requests.exceptions.RequestException as e:
120+
log_fn(f"Failed to fetch {page_url}: {e}")
121+
break
122+
yield from parse_table_rows(resp.text)
123+
124+
125+
def parse_table_rows(html: str) -> list:
126+
"""Return list of row dicts from the advisories table HTML."""
127+
soup = BeautifulSoup(html, features="lxml")
128+
table = soup.find("table", {"id": "cp_advisory_table_sorter"})
129+
if not table:
130+
return []
131+
132+
rows = []
133+
for tr in table.find_all("tr")[1:]:
134+
cells = tr.find_all("td")
135+
# 7 cols: Severity, Date Published, Date Updated, CPAI Ref, Source, Industry Ref, Description
136+
if len(cells) < 7:
137+
continue
138+
139+
cpai_link = cells[3].find("a")
140+
if not cpai_link:
141+
continue
142+
143+
advisory_id = cpai_link.get_text(strip=True)
144+
href = cpai_link.get("href", "")
145+
advisory_url = f"{ADVISORY_BASE_URL}{href}" if href.startswith("/") else href
146+
147+
cve_link = cells[5].find("a")
148+
cve_text = cve_link.get_text(strip=True) if cve_link else cells[5].get_text(strip=True)
149+
# strip " (and N others)" if present
150+
cve_id = cve_text.split(" (")[0].strip()
151+
152+
summary_link = cells[6].find("a")
153+
summary = (
154+
summary_link.get_text(strip=True) if summary_link else cells[6].get_text(strip=True)
155+
)
156+
157+
rows.append(
158+
{
159+
"advisory_id": advisory_id,
160+
"advisory_url": advisory_url,
161+
"cve_id": cve_id,
162+
"severity": cells[0].get_text(strip=True),
163+
"date_published": cells[1].get_text(strip=True),
164+
"summary": summary,
165+
}
166+
)
167+
168+
return rows
169+
170+
171+
def parse_advisory(row_data: dict):
172+
"""Return AdvisoryDataV2 from a row data dict, or None if advisory_id is missing."""
173+
advisory_id = row_data.get("advisory_id") or ""
174+
if not advisory_id or not advisory_id.startswith("CPAI-"):
175+
return None
176+
177+
date_published = None
178+
raw_date = row_data.get("date_published") or ""
179+
if raw_date:
180+
date_published = dateparser.parse(
181+
raw_date,
182+
settings={"TIMEZONE": "UTC", "RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"},
183+
)
184+
if date_published is None:
185+
logger.warning("Could not parse date %r for %s", raw_date, advisory_id)
186+
187+
cve_id = row_data.get("cve_id") or ""
188+
aliases = [cve_id] if cve_id.startswith("CVE-") else []
189+
190+
advisory_url = row_data.get("advisory_url") or ""
191+
references = []
192+
if advisory_url:
193+
references.append(ReferenceV2(url=advisory_url, reference_id=advisory_id))
194+
if cve_id.startswith("CVE-"):
195+
references.append(
196+
ReferenceV2(
197+
url=f"https://nvd.nist.gov/vuln/detail/{cve_id}",
198+
reference_id=cve_id,
199+
)
200+
)
201+
202+
severities = []
203+
severity = row_data.get("severity") or ""
204+
if severity:
205+
severities.append(VulnerabilitySeverity(system=GENERIC, value=severity))
206+
207+
return AdvisoryDataV2(
208+
advisory_id=advisory_id,
209+
aliases=aliases,
210+
summary=row_data.get("summary") or "",
211+
affected_packages=[],
212+
references=references,
213+
date_published=date_published,
214+
weaknesses=[],
215+
severities=severities,
216+
url=advisory_url,
217+
original_advisory_text=json.dumps(row_data, indent=2, ensure_ascii=False),
218+
)

0 commit comments

Comments
 (0)