Skip to content

Commit 730c345

Browse files
committed
add transaction safety and update comments from code review
Signed-off-by: mrrajan <86094767+mrrajan@users.noreply.github.com.>
1 parent 6aa1b8c commit 730c345

4 files changed

Lines changed: 63 additions & 32 deletions

File tree

modules/fundamental/src/license/service/mod.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -297,22 +297,29 @@ impl LicenseService {
297297
.distinct()
298298
.column_as(expanded_license::Column::ExpandedText, LICENSE_TEXT);
299299

300-
// Build query for non-expanded licenses: includes both
300+
// Build query for licenses not yet linked to any SBOM: includes both
301301
// (a) pre-loaded SPDX dictionary entries with no SBOM connection yet, AND
302-
// (b) CycloneDX licenses that exist in sbom_package_license but were never expanded.
303-
// A LEFT JOIN on sbom_package_license (instead of INNER JOIN) ensures pre-loaded licenses
304-
// with no SBOM attachment are included. Then filtering for sbom_license_expanded IS NULL
305-
// removes SPDX licenses that have already been expanded (they appear in spdx_query instead).
302+
// (b) licenses from older SBOMs ingested before license expansion was implemented.
303+
// Use NOT EXISTS instead of LEFT JOIN + IS NULL to find licenses without SBOMs.
304+
// On large tables, LEFT JOIN scans all rows while NOT EXISTS
305+
// uses a Nested Loop Anti Join with index-only scan.
306+
let exists_subquery = sea_query::Query::select()
307+
.expr(Expr::val(1))
308+
.from(sbom_license_expanded::Entity)
309+
.and_where(
310+
Expr::col((
311+
sbom_license_expanded::Entity,
312+
sbom_license_expanded::Column::LicenseId,
313+
))
314+
.equals((license::Entity, license::Column::Id)),
315+
)
316+
.to_owned();
317+
306318
let mut non_sbom_query = license::Entity::find()
307319
.select_only()
308320
.distinct()
309321
.column_as(license::Column::Text, LICENSE_TEXT)
310-
.join(JoinType::LeftJoin, license::Relation::PackageLicense.def())
311-
.join(
312-
JoinType::LeftJoin,
313-
sbom_license_expanded::Relation::License.def().rev(),
314-
)
315-
.filter(sbom_license_expanded::Column::LicenseId.is_null());
322+
.filter(Expr::exists(exists_subquery).not());
316323

317324
// Apply filtering to both queries (without sorting - that's applied to the UNION result)
318325
let filter_only = Query {

modules/ingestor/src/graph/sbom/common/expanded_license.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use sea_orm::{ConnectionTrait, DbErr, Statement};
1+
use sea_orm::{ConnectionTrait, DbErr, Statement, TransactionTrait};
22
use uuid::Uuid;
33

44
/// Populates expanded_license and sbom_license_expanded tables during SBOM ingestion
@@ -16,13 +16,27 @@ use uuid::Uuid;
1616
///
1717
/// While SeaORM could express this via custom expressions, it would be significantly
1818
/// more verbose and harder to maintain than the raw SQL.
19-
pub async fn populate_expanded_license(
20-
sbom_id: Uuid,
21-
db: &impl ConnectionTrait,
22-
) -> Result<(), DbErr> {
19+
///
20+
/// # Differences from Migration Backfill
21+
///
22+
/// The migration in m0002120_normalize_expanded_license/up.sql performs a similar
23+
/// operation but with key differences:
24+
/// - Migration: Pre-deduplicates by (text, sbom_id) and uses WHERE NOT EXISTS to skip
25+
/// already-backfilled SBOMs. Optimized for one-time bulk processing.
26+
/// - Ingestion: Filters by specific sbom_id parameter for single-SBOM processing.
27+
/// Uses ON CONFLICT for idempotent re-ingestion of the same SBOM.
28+
///
29+
/// Both use the same core logic (expand_license_expression_with_mappings + md5 hash
30+
/// matching) but optimize for their different use cases.
31+
pub async fn populate_expanded_license<C>(sbom_id: Uuid, db: &C) -> Result<(), DbErr>
32+
where
33+
C: ConnectionTrait + TransactionTrait,
34+
{
35+
let txn = db.begin().await?;
36+
2337
// Step 1: Insert into expanded_license dictionary
24-
db.execute(Statement::from_sql_and_values(
25-
db.get_database_backend(),
38+
txn.execute(Statement::from_sql_and_values(
39+
txn.get_database_backend(),
2640
r#"
2741
INSERT INTO expanded_license (expanded_text)
2842
SELECT DISTINCT expand_license_expression_with_mappings(
@@ -45,8 +59,8 @@ ON CONFLICT (text_hash) DO NOTHING
4559

4660
// Step 2: Insert into sbom_license_expanded junction table
4761
// Use CTE to call expand_license_expression_with_mappings() only once per (sbom_id, license_id)
48-
db.execute(Statement::from_sql_and_values(
49-
db.get_database_backend(),
62+
txn.execute(Statement::from_sql_and_values(
63+
txn.get_database_backend(),
5064
r#"
5165
WITH license_expansions AS (
5266
SELECT DISTINCT
@@ -76,5 +90,7 @@ SET expanded_license_id = EXCLUDED.expanded_license_id
7690
))
7791
.await?;
7892

93+
txn.commit().await?;
94+
7995
Ok(())
8096
}

modules/ingestor/src/graph/sbom/cyclonedx.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use sbom_walker::{
2121
model::sbom::serde_cyclonedx::Sbom,
2222
report::{ReportSink, check},
2323
};
24-
use sea_orm::ConnectionTrait;
24+
use sea_orm::{ConnectionTrait, TransactionTrait};
2525
use serde_cyclonedx::cyclonedx::v_1_6::{
2626
Component, ComponentEvidenceIdentity, CycloneDx, LicenseChoiceUrl, OrganizationalContact,
2727
};
@@ -138,12 +138,15 @@ impl<'a> From<Information<'a>> for SbomInformation {
138138

139139
impl SbomContext {
140140
#[instrument(skip(connection, sbom, warnings), err(level=tracing::Level::INFO))]
141-
pub async fn ingest_cyclonedx<C: ConnectionTrait>(
141+
pub async fn ingest_cyclonedx<C>(
142142
&self,
143143
mut sbom: Box<CycloneDx>,
144144
warnings: &dyn ReportSink,
145145
connection: &C,
146-
) -> Result<(), Error> {
146+
) -> Result<(), Error>
147+
where
148+
C: ConnectionTrait + TransactionTrait,
149+
{
147150
// pre-flight checks
148151

149152
check::serde_cyclonedx::all(warnings, &Sbom::V1_6(Cow::Borrowed(&sbom)));
@@ -285,11 +288,10 @@ impl<'a> Creator<'a> {
285288
}
286289

287290
#[instrument(skip(self, db, processors), err(level=tracing::Level::INFO))]
288-
pub async fn create(
289-
self,
290-
db: &impl ConnectionTrait,
291-
processors: &mut [Box<dyn Processor>],
292-
) -> Result<(), Error> {
291+
pub async fn create<C>(self, db: &C, processors: &mut [Box<dyn Processor>]) -> Result<(), Error>
292+
where
293+
C: ConnectionTrait + TransactionTrait,
294+
{
293295
let mut creator = ComponentCreator::new(self.sbom_id, self.components.len());
294296

295297
for comp in self.components {
@@ -576,7 +578,10 @@ impl ComponentCreator {
576578
// order matters to prevent cross-table deadlocks when running
577579
// concurrent SBOM ingestions. All SBOM loaders must use the same
578580
// table insertion order.
579-
async fn create(self, db: &impl ConnectionTrait) -> Result<(), Error> {
581+
async fn create<C>(self, db: &C) -> Result<(), Error>
582+
where
583+
C: ConnectionTrait + TransactionTrait,
584+
{
580585
self.licenses.create(db).await?;
581586
self.purls.create(db).await?;
582587
self.cpes.create(db).await?;

modules/ingestor/src/graph/sbom/spdx.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
service::Error,
1717
};
1818
use sbom_walker::report::{ReportSink, check};
19-
use sea_orm::ConnectionTrait;
19+
use sea_orm::{ConnectionTrait, TransactionTrait};
2020
use spdx_rs::models::{RelationshipType, SPDX};
2121
use std::collections::HashSet;
2222
use std::str::FromStr;
@@ -103,12 +103,15 @@ impl<'a> From<Information<'a>> for SbomInformation {
103103

104104
impl SbomContext {
105105
#[instrument(skip(db, sbom_data, warnings), ret(level=tracing::Level::DEBUG))]
106-
pub async fn ingest_spdx<C: ConnectionTrait>(
106+
pub async fn ingest_spdx<C>(
107107
&self,
108108
sbom_data: SPDX,
109109
warnings: &dyn ReportSink,
110110
db: &C,
111-
) -> Result<(), Error> {
111+
) -> Result<(), Error>
112+
where
113+
C: ConnectionTrait + TransactionTrait,
114+
{
112115
// pre-flight checks
113116

114117
check::spdx::all(warnings, &sbom_data);

0 commit comments

Comments
 (0)