1111from datetime import date , datetime , timezone
1212from pathlib import Path
1313
14+ from dateutil .parser import parse
1415import requests
1516from aboutcode .pipeline import BasePipeline , LoopProgress
1617from requests .adapters import HTTPAdapter
1718from urllib3 .util .retry import Retry
1819
1920ROOT_PATH = Path (__file__ ).parent
20- ADVISORY_PATH = ROOT_PATH / "advisory "
21+ ADVISORIES_PATH = ROOT_PATH / "advisories "
2122CHECKPOINT_FILE = ROOT_PATH / "checkpoint.json"
2223
2324HEADERS = {
24- "User-Agent" : "Vulnerablecode" ,
2525 "Accept" : "application/json" ,
2626}
2727
@@ -41,11 +41,6 @@ def steps(cls):
4141 cls .save_checkpoint ,
4242 )
4343
44- def log (self , message ):
45- now_local = datetime .now (timezone .utc ).astimezone ()
46- timestamp = now_local .strftime ("%Y-%m-%d %H:%M:%S.%f" )[:- 3 ]
47- print (f"{ timestamp } { message } " )
48-
4944 def load_checkpoint (self ):
5045 """
5146 - Load the ``last run`` date from checkpoint.json to fetch only new advisories.
@@ -56,8 +51,7 @@ def load_checkpoint(self):
5651 return
5752 with CHECKPOINT_FILE .open () as f :
5853 checkpoint = json .load (f )
59- last_run = checkpoint .get ("last_run" )
60- if last_run :
54+ if last_run := checkpoint .get ("last_run" ):
6155 self .fetch_params ["fromUpdatedDate" ] = last_run
6256
6357 def create_session (self ):
@@ -75,19 +69,15 @@ def collect_new_advisory(self):
7569
7670 - Fetch the ``total`` advisories and determine the number of pages to iterate over.
7771 - Iterate through all pages, fetching up to PAGE_SIZE advisories per request.
78- - Save each advisory as a JSON file at ``/advisory /{year}/{month}/{EUVD_ID}.json``.
79- - Advisories with missing publication dates are stored as at ``/advisory /unpublished/{EUVD_ID}.json``.
72+ - Save each advisory as a JSON file at ``/advisories /{year}/{month}/{EUVD_ID}.json``.
73+ - Advisories with missing publication dates are stored as at ``/advisories /unpublished/{EUVD_ID}.json``.
8074 """
8175 count_page = self .fetch_page ({** self .fetch_params , "size" : 1 , "page" : 0 })
8276 total = count_page .get ("total" , 0 )
83- if not total :
84- self .log ("No new advisories found" )
85- return
8677
8778 total_pages = math .ceil (total / PAGE_SIZE )
8879 self .log (f"Collecting { total } advisories across { total_pages } pages" )
8980
90- self .has_unpublished = (ADVISORY_PATH / "unpublished" ).exists ()
9181 progress = LoopProgress (total_iterations = total_pages , logger = self .log )
9282
9383 for page in progress .iter (range (total_pages )):
@@ -98,56 +88,31 @@ def collect_new_advisory(self):
9888 self .save_advisory (advisory )
9989
10090 def save_advisory (self , advisory ):
101- euvd_id = advisory .get ("id" )
102- if not euvd_id :
103- self .log (f"Advisory missing id, skipping: { advisory } " )
104- return
105-
106- date_published = advisory .get ("datePublished" , "" )
107- dir_path = self .advisory_dir (date_published )
108-
109- if dir_path is None :
110- dir_path = ADVISORY_PATH / "unpublished"
111- self .has_unpublished = True
112-
113- dir_path .mkdir (parents = True , exist_ok = True )
91+ destination = "unpublished"
92+ euvd_id = advisory ["id" ]
11493
115- # If an existing unpublished advisory is published now, remove the stale advisory from unpublished directory.
116- if self .has_unpublished and dir_path != ADVISORY_PATH / "unpublished" :
117- stale_advisory = ADVISORY_PATH / "unpublished" / f"{ euvd_id } .json"
118- if stale_advisory .exists ():
119- stale_advisory .unlink ()
94+ if published := advisory .get ("datePublished" ):
95+ published_date = parse (published )
96+ destination = f"{ published_date .year } /{ published_date .month :02d} "
12097
121- # If old advisory is updated, the new data overwrites the existing file.
122- with (dir_path / f"{ euvd_id } .json" ).open ("w" , encoding = "utf-8" ) as f :
98+ path = ADVISORIES_PATH / f"{ destination } /{ euvd_id } .json"
99+ path .parent .mkdir (parents = True , exist_ok = True )
100+ with open (path , "w" , encoding = "utf-8" ) as f :
123101 json .dump (advisory , f , indent = 2 )
124102
125- def advisory_dir (self , date_published ):
126- """
127- Returns the directory path for an advisory based on its publication date.
128- """
129- try :
130- published_at = datetime .strptime (date_published , "%b %d, %Y, %I:%M:%S %p" )
131- return (
132- ADVISORY_PATH / f"{ published_at .year :04d} " / f"{ published_at .month :02d} "
133- )
134- except (ValueError , TypeError ):
135- return None
136-
137103 def save_checkpoint (self ):
138104 with CHECKPOINT_FILE .open ("w" ) as f :
139105 json .dump ({"last_run" : date .today ().isoformat ()}, f , indent = 2 )
140106
141107 def fetch_page (self , params ):
142108 response = self .session .get (self .url , params = params , timeout = REQUEST_TIMEOUT )
143109 response .raise_for_status ()
144- data = response .json ()
145- if not isinstance (data , dict ):
146- raise ValueError (
147- f"Unexpected response type { type (data ).__name__ } for params { params } "
148- )
149- return data
110+ return response .json () or {}
150111
112+ def log (self , message ):
113+ now_local = datetime .now (timezone .utc ).astimezone ()
114+ timestamp = now_local .strftime ("%Y-%m-%d %H:%M:%S.%f" )[:- 3 ]
115+ print (f"{ timestamp } { message } " )
151116
152117if __name__ == "__main__" :
153118 mirror = EUVDAdvisoryMirror ()
0 commit comments