Skip to content

Commit 26c826f

Browse files
committed
Refactor to checkpoint based mirror sync
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
1 parent c5ac22f commit 26c826f

File tree

2 files changed

+101
-178
lines changed

2 files changed

+101
-178
lines changed

.github/workflows/sync.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ jobs:
2424
- name: Install required packages
2525
run: pip install -r requirements.txt
2626

27-
- name: Run sync (daily)
28-
run: python sync_catalog.py daily
27+
- name: Run sync
28+
run: python sync_catalog.py
2929

3030
- name: Commit and push if it changed
3131
run: |-

sync_catalog.py

Lines changed: 99 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -5,231 +5,154 @@
55
# See https://aboutcode.org for more information about nexB OSS projects.
66
#
77

8-
import argparse
98
import json
9+
import math
1010
import sys
11-
from datetime import date, datetime, timedelta, timezone
11+
from datetime import date, datetime, timezone
1212
from pathlib import Path
13-
from typing import Any, Dict
1413

1514
import requests
16-
1715
from aboutcode.pipeline import BasePipeline, LoopProgress
16+
from requests.adapters import HTTPAdapter
17+
from urllib3.util.retry import Retry
1818

1919
ROOT_PATH = Path(__file__).parent
20-
CATALOG_PATH = ROOT_PATH / "catalog"
21-
PAGE_DIRECTORY = CATALOG_PATH / "pages"
22-
23-
API_URL = "https://euvdservices.enisa.europa.eu/api/search"
20+
ADVISORY_PATH = ROOT_PATH / "advisory"
21+
CHECKPOINT_FILE = ROOT_PATH / "checkpoint.json"
2422

2523
HEADERS = {
2624
"User-Agent": "Vulnerablecode",
2725
"Accept": "application/json",
2826
}
2927

3028
PAGE_SIZE = 100
31-
DEFAULT_START_YEAR = 1970
3229
REQUEST_TIMEOUT = 10
3330

3431

35-
class EUVDCatalogMirror(BasePipeline):
32+
class EUVDAdvisoryMirror(BasePipeline):
33+
34+
url = "https://euvdservices.enisa.europa.eu/api/search"
35+
3636
@classmethod
3737
def steps(cls):
38-
return (cls.collect_catalog,)
38+
return (
39+
cls.load_checkpoint,
40+
cls.create_session,
41+
cls.collect_new_advisory,
42+
cls.save_checkpoint,
43+
)
3944

40-
def collect_catalog(self) -> None:
41-
if getattr(self, "mode", "backfill") == "daily":
42-
return self.sync_yesterday()
43-
self.backfill_from_year(DEFAULT_START_YEAR)
45+
def log(self, message):
46+
now_local = datetime.now(timezone.utc).astimezone()
47+
timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
48+
print(f"{timestamp} {message}")
4449

45-
def backfill_from_year(self, start_year: int) -> None:
50+
def load_checkpoint(self):
4651
"""
47-
Backfill EUVD catalog data starting from the given year(DEFAULT_START_YEAR) up to today. Data is collected month by month and stored in year/month directories as JSON files.
52+
- Load the ``last run`` date from checkpoint.json to fetch only new advisories.
53+
- If the checkpoint.json does not exist, fetch all advisories.
4854
"""
49-
today = date.today()
50-
backfill_start = date(start_year, 1, 1)
51-
backfill_end = today
52-
53-
months: list[tuple[int, int]] = []
54-
current = backfill_start
55-
56-
while current <= backfill_end:
57-
months.append((current.year, current.month))
58-
if current.month == 12:
59-
current = date(current.year + 1, 1, 1)
60-
continue
61-
current = date(current.year, current.month + 1, 1)
62-
63-
self.log(f"Starting backfill for {len(months)} months")
64-
65-
progress = LoopProgress(total_iterations=len(months), logger=self.log)
66-
67-
for year, month in progress.iter(months):
68-
month_start = date(year, month, 1)
69-
day_token = month_start.isoformat()
70-
71-
if year == backfill_end.year and month == backfill_end.month:
72-
self._collect_paginated(
73-
start=month_start,
74-
end=backfill_end,
75-
year=year,
76-
month=month,
77-
day_token=day_token,
78-
)
79-
continue
80-
81-
if month == 12:
82-
next_month = date(year + 1, 1, 1)
83-
else:
84-
next_month = date(year, month + 1, 1)
85-
month_end = next_month - timedelta(days=1)
86-
87-
self._collect_paginated(
88-
start=month_start,
89-
end=month_end,
90-
year=year,
91-
month=month,
92-
day_token=day_token,
93-
)
94-
95-
self.log("Backfill completed")
55+
self.fetch_params = {}
56+
if not CHECKPOINT_FILE.exists():
57+
return
58+
with CHECKPOINT_FILE.open() as f:
59+
checkpoint = json.load(f)
60+
last_run = checkpoint.get("last_run")
61+
if last_run:
62+
self.fetch_params["fromUpdatedDate"] = last_run
63+
64+
def create_session(self):
65+
retry = Retry(
66+
total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]
67+
)
68+
adapter = HTTPAdapter(max_retries=retry)
69+
self.session = requests.Session()
70+
self.session.headers.update(HEADERS)
71+
self.session.mount("https://", adapter)
9672

97-
def sync_yesterday(self) -> None:
98-
target_date = date.today() - timedelta(days=1)
73+
def collect_new_advisory(self):
74+
"""
75+
Fetch new advisories from the EUVD API with paginated requests.
9976
100-
first_page = self.fetch_page(
101-
{
102-
"fromUpdatedDate": target_date.isoformat(),
103-
"toUpdatedDate": target_date.isoformat(),
104-
"size": 1,
105-
"page": 0,
106-
}
107-
)
77+
- Fetch the ``total`` advisories and determine the number of pages to iterate over.
78+
- Iterate through all pages, fetching up to PAGE_SIZE advisories per request.
79+
- Save each advisory as a JSON file at ``/advisory/{year}/{month}/{EUVD_ID}.json``.
80+
- Advisories with missing publication dates are stored as at ``/advisory/unpublished/{EUVD_ID}.json``.
81+
"""
82+
count_page = self.fetch_page({**self.fetch_params, "size": 1, "page": 0})
83+
total = count_page.get("total", 0)
84+
if not total:
85+
self.log("No new advisories found")
86+
return
10887

109-
total = first_page.get("total", 0)
110-
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE
88+
total_pages = math.ceil(total / PAGE_SIZE)
89+
self.log(f"Collecting {total} advisories across {total_pages} pages")
11190

91+
self.has_unpublished = (ADVISORY_PATH / "unpublished").exists()
11292
progress = LoopProgress(total_iterations=total_pages, logger=self.log)
11393

11494
for page in progress.iter(range(total_pages)):
11595
data = self.fetch_page(
116-
{
117-
"fromUpdatedDate": target_date.isoformat(),
118-
"toUpdatedDate": target_date.isoformat(),
119-
"size": PAGE_SIZE,
120-
"page": page,
121-
}
96+
{**self.fetch_params, "size": PAGE_SIZE, "page": page}
12297
)
98+
for advisory in data.get("items", []):
99+
self.save_advisory(advisory)
123100

124-
self.write_page_file(
125-
year=target_date.year,
126-
month=target_date.month,
127-
day_token=target_date.isoformat(),
128-
page_number=page + 1,
129-
payload=data,
130-
)
131-
132-
def _collect_paginated(
133-
self,
134-
start: date,
135-
end: date,
136-
year: int,
137-
month: int,
138-
day_token: str,
139-
) -> None:
140-
"""
141-
Fetch all EUVD results for the given date range using paginated requests.The total number of results is read from the API response and used to determine how many pages to fetch. Each page is written as a separate JSON file under the corresponding year/month directory.
142-
"""
143-
144-
first_page = self.fetch_page(
145-
{
146-
"fromUpdatedDate": start.isoformat(),
147-
"toUpdatedDate": end.isoformat(),
148-
"size": 1,
149-
"page": 0,
150-
}
151-
)
152-
153-
total = first_page.get("total", 0)
154-
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE
101+
def save_advisory(self, advisory):
102+
euvd_id = advisory.get("id")
103+
if not euvd_id:
104+
self.log(f"Advisory missing id, skipping: {advisory}")
105+
return
155106

156-
for page in range(total_pages):
157-
data = self.fetch_page(
158-
{
159-
"fromUpdatedDate": start.isoformat(),
160-
"toUpdatedDate": end.isoformat(),
161-
"size": PAGE_SIZE,
162-
"page": page,
163-
}
164-
)
107+
date_published = advisory.get("datePublished", "")
108+
dir_path = self.advisory_dir(date_published)
165109

166-
self.write_page_file(
167-
year=year,
168-
month=month,
169-
day_token=day_token,
170-
page_number=page + 1,
171-
payload=data,
172-
)
110+
if dir_path is None:
111+
dir_path = ADVISORY_PATH / "unpublished"
112+
self.has_unpublished = True
173113

174-
def write_page_file(
175-
self,
176-
year: int,
177-
month: int,
178-
day_token: str,
179-
page_number: int,
180-
payload: Dict[str, Any],
181-
) -> None:
182-
year_str = f"{year:04d}"
183-
month_str = f"{month:02d}"
184-
page_str = f"{page_number:04d}"
185-
186-
dir_path = PAGE_DIRECTORY / year_str / month_str
187114
dir_path.mkdir(parents=True, exist_ok=True)
188115

189-
filename = f"page{day_token}-{page_str}.json"
190-
path = dir_path / filename
116+
# If an existing unpublished advisory is published now, remove the stale advisory from unpublished directory.
117+
if self.has_unpublished and dir_path != ADVISORY_PATH / "unpublished":
118+
stale_advisory = ADVISORY_PATH / "unpublished" / f"{euvd_id}.json"
119+
if stale_advisory.exists():
120+
stale_advisory.unlink()
191121

192-
if path.exists():
193-
return
122+
# If old advisory is updated, the new data overwrites the existing file.
123+
with (dir_path / f"{euvd_id}.json").open("w", encoding="utf-8") as f:
124+
json.dump(advisory, f, indent=2)
194125

195-
with path.open("w", encoding="utf-8") as output:
196-
json.dump(payload, output, indent=2)
126+
def advisory_dir(self, date_published):
127+
"""
128+
Returns the directory path for an advisory based on its publication date.
129+
"""
130+
try:
131+
published_at = datetime.strptime(date_published, "%b %d, %Y, %I:%M:%S %p")
132+
return (
133+
ADVISORY_PATH / f"{published_at.year:04d}" / f"{published_at.month:02d}"
134+
)
135+
except (ValueError, TypeError):
136+
return None
197137

198-
def fetch_page(self, params: Dict[str, Any]) -> Dict[str, Any]:
199-
response = requests.get(
200-
API_URL,
201-
params=params,
202-
headers=HEADERS,
203-
timeout=REQUEST_TIMEOUT,
204-
)
138+
def save_checkpoint(self):
139+
with CHECKPOINT_FILE.open("w") as f:
140+
json.dump({"last_run": date.today().isoformat()}, f, indent=2)
141+
142+
def fetch_page(self, params):
143+
response = self.session.get(self.url, params=params, timeout=REQUEST_TIMEOUT)
205144
response.raise_for_status()
206-
data: Any = response.json()
145+
data = response.json()
207146
if not isinstance(data, dict):
208-
return {}
147+
raise ValueError(
148+
f"Unexpected response type {type(data).__name__} for params {params}"
149+
)
209150
return data
210151

211-
def log(self, message: str) -> None:
212-
now = datetime.now(timezone.utc).astimezone()
213-
stamp = now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
214-
print(f"{stamp} {message}")
215-
216152

217153
if __name__ == "__main__":
218-
parser = argparse.ArgumentParser(description="EUVD Catalog Mirror")
219-
parser.add_argument(
220-
"mode",
221-
nargs="?",
222-
default="daily",
223-
choices=["backfill", "daily"],
224-
help="Sync mode: 'backfill' for full history, 'daily' for yesterday only",
225-
)
226-
args = parser.parse_args()
227-
228-
mirror = EUVDCatalogMirror()
229-
mirror.mode = args.mode
230-
154+
mirror = EUVDAdvisoryMirror()
231155
status_code, error_message = mirror.execute()
232156
if error_message:
233157
print(error_message)
234158
sys.exit(status_code)
235-

0 commit comments

Comments
 (0)