-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathlicense_matcher.py
More file actions
155 lines (137 loc) · 5.53 KB
/
license_matcher.py
File metadata and controls
155 lines (137 loc) · 5.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
from sqlalchemy import asc, func
from sqlalchemy.orm import Session
from shared.common.license_utils import resolve_license, MatchingLicense
from shared.database.database import with_db_session
from shared.database_gen.sqlacodegen_models import Feed, FeedLicenseChange
from shared.helpers.runtime_metrics import track_metrics
def get_parameters(payload):
dry_run = payload.get("dry_run", False)
only_unmatched = payload.get("only_unmatched", True)
feed_stable_id = payload.get("feed_stable_id", None)
return dry_run, only_unmatched, feed_stable_id
def match_license_handler(payload):
"""
Handler for matching licenses with feeds.
Args:
payload (dict): Incoming payload data.
"""
(dry_run, only_unmatched, feed_stable_id) = get_parameters(payload)
return match_licenses_task(dry_run, only_unmatched, feed_stable_id)
def assign_feed_license(feed: Feed, license_match: MatchingLicense):
"""Assign the matched license to the feed and log the change if license is different."""
if license_match.license_id != feed.license_id:
logging.info(
"New license match for feed %s: %s",
feed.stable_id,
license_match.license_id,
)
from shared.common.license_utils import _AUTO_VERIFY_THRESHOLD
is_verified = (
license_match.match_type == "exact"
or license_match.confidence >= _AUTO_VERIFY_THRESHOLD
)
feed.license_id = license_match.license_id
feed.license_notes = license_match.notes
feed_license_change: FeedLicenseChange = FeedLicenseChange(
feed_id=feed.id,
changed_at=None, # will be set by DB default
feed_license_url=feed.license_url,
matched_license_id=license_match.license_id,
confidence=license_match.confidence,
match_type=license_match.match_type,
matched_name=license_match.matched_name,
matched_catalog_url=license_match.matched_catalog_url,
matched_source=license_match.matched_source,
notes=license_match.notes,
regional_id=license_match.regional_id,
verified=is_verified,
)
feed.feed_license_changes.append(feed_license_change)
else:
logging.info("Feed %s license unchanged: %s", feed.stable_id, feed.license_id)
def process_feed(feed, dry_run, db_session):
"""Process a single feed to match its license."""
result = None
license_matches = resolve_license(feed.license_url, db_session=db_session)
if license_matches:
license_first_match = sorted(
license_matches, key=lambda x: x.confidence, reverse=True
)[0]
result = {
"feed_id": feed.id,
"feed_stable_id": feed.stable_id,
"feed_data_type": feed.data_type,
"feed_license_url": feed.license_url,
"matched_license_id": license_first_match.license_id,
"matched_spdx_id": license_first_match.spdx_id,
"confidence": license_first_match.confidence,
"match_type": license_first_match.match_type,
"matched_name": license_first_match.matched_name,
"matched_catalog_url": license_first_match.matched_catalog_url,
"matched_source": license_first_match.matched_source,
"notes": license_first_match.notes,
"regional_id": license_first_match.regional_id,
}
if not dry_run:
assign_feed_license(feed, license_first_match)
return result
@track_metrics(metrics=("time", "memory", "cpu"))
@with_db_session
def match_licenses_task(
dry_run: bool,
only_unmatched: bool,
feed_stable_id: str = None,
db_session: Session = None,
):
result = []
if feed_stable_id:
feed = db_session.query(Feed).filter(Feed.stable_id == feed_stable_id).first()
if not feed:
logging.error("Feed with stable_id %s not found.", feed_stable_id)
raise ValueError(f"Feed with stable_id {feed_stable_id} not found.")
result.append(process_feed(feed, dry_run, db_session))
else:
result = process_all_feeds(dry_run, only_unmatched, db_session)
return result
def process_all_feeds(dry_run: bool, only_unmatched: bool, db_session: Session | None):
result = []
batch_size = 500
last_id = None
i = 0
total_processed = 0
while True:
logging.info("Processing batch %d", i)
batch_query = db_session.query(Feed).filter(
"" != func.coalesce(Feed.license_url, "")
)
if last_id is not None:
batch_query = batch_query.filter(Feed.id > last_id)
if only_unmatched:
batch_query = batch_query.filter(Feed.license_id.is_(None))
batch = batch_query.order_by(asc(Feed.id)).limit(batch_size).all()
if not batch:
break
total_processed += len(batch)
for feed in batch:
feed_match = process_feed(feed, dry_run, db_session)
if feed_match:
result.append(feed_match)
if not dry_run:
# Flush the batch updates to the database
db_session.flush()
last_id = batch[-1].id
db_session.expunge_all()
logging.info(
"Processed batch %d. Total processed %d, so far matched licenses: %d",
i,
total_processed,
len(result),
)
i += 1
logging.info(
"Total processed feeds %d. Total matched licenses: %d",
total_processed,
len(result),
)
return result