Skip to content

Commit 8d73c6b

Browse files
committed
fix(service): keyset paginate provider match loops so page fetches stay constant-time
1 parent 67e5b82 commit 8d73c6b

3 files changed

Lines changed: 96 additions & 32 deletions

File tree

service/src/db/game.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -347,19 +347,21 @@ pub async fn get_unpopulated_clone_of_games(
347347
pub fn get_unmatched_games_without_clone_of_with_limit<'a>(
348348
provider: MetadataProviderEnum,
349349
page_size: u64,
350+
cursor: Option<Uuid>,
350351
conn: DbConn,
351352
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
352-
get_unmatched_games_with_limit(provider, true, true, page_size, conn)
353+
get_unmatched_games_with_limit(provider, true, true, page_size, cursor, conn)
353354
}
354355

355356
/// Return up to `page_size` games without a successful mapping for `provider` that are
356357
/// clones of another game. Returns `Ok(None)` when there is nothing left to process.
357358
pub fn get_unmatched_games_with_clone_of_with_limit<'a>(
358359
provider: MetadataProviderEnum,
359360
page_size: u64,
361+
cursor: Option<Uuid>,
360362
conn: DbConn,
361363
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
362-
get_unmatched_games_with_limit(provider, false, true, page_size, conn)
364+
get_unmatched_games_with_limit(provider, false, true, page_size, cursor, conn)
363365
}
364366

365367
/// Same as [`get_unmatched_games_without_clone_of_with_limit`] but does not require the
@@ -369,19 +371,21 @@ pub fn get_unmatched_games_with_clone_of_with_limit<'a>(
369371
pub fn get_unmatched_games_without_clone_of_with_limit_no_platform_gate<'a>(
370372
provider: MetadataProviderEnum,
371373
page_size: u64,
374+
cursor: Option<Uuid>,
372375
conn: DbConn,
373376
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
374-
get_unmatched_games_with_limit(provider, true, false, page_size, conn)
377+
get_unmatched_games_with_limit(provider, true, false, page_size, cursor, conn)
375378
}
376379

377380
/// Same as [`get_unmatched_games_with_clone_of_with_limit`] but without the platform
378381
/// mapping gate. See [`get_unmatched_games_without_clone_of_with_limit_no_platform_gate`].
379382
pub fn get_unmatched_games_with_clone_of_with_limit_no_platform_gate<'a>(
380383
provider: MetadataProviderEnum,
381384
page_size: u64,
385+
cursor: Option<Uuid>,
382386
conn: DbConn,
383387
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
384-
get_unmatched_games_with_limit(provider, false, false, page_size, conn)
388+
get_unmatched_games_with_limit(provider, false, false, page_size, cursor, conn)
385389
}
386390

387391
/// Min interval between cross-pass attempts on the same failed mapping.
@@ -401,13 +405,14 @@ pub const CROSS_MATCH_RETRY_INTERVAL_DAYS: i64 = 7;
401405
pub fn get_failed_games_for_cross_pass_with_limit<'a>(
402406
provider: MetadataProviderEnum,
403407
page_size: u64,
408+
cursor: Option<Uuid>,
404409
conn: DbConn,
405410
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
406411
Box::pin(async move {
407412
let cooldown = Utc::now() - Duration::days(CROSS_MATCH_RETRY_INTERVAL_DAYS);
408413
let cooldown_naive: NaiveDateTime = cooldown.naive_utc();
409414

410-
let res = Game::find()
415+
let mut query = Game::find()
411416
.join(
412417
JoinType::InnerJoin,
413418
game::Relation::SignatureMetadataMapping.def(),
@@ -450,7 +455,13 @@ pub fn get_failed_games_for_cross_pass_with_limit<'a>(
450455
)
451456
.to_owned(),
452457
)),
453-
)
458+
);
459+
460+
if let Some(after) = cursor {
461+
query = query.filter(game::Column::Id.gt(after));
462+
}
463+
464+
let res = query
454465
.order_by_asc(game::Column::Id)
455466
.limit(page_size)
456467
.all(&conn)
@@ -470,13 +481,14 @@ pub fn get_failed_games_for_cross_pass_with_limit<'a>(
470481
pub fn get_automatic_match_failed_games_with_limit<'a>(
471482
provider: MetadataProviderEnum,
472483
page_size: u64,
484+
cursor: Option<Uuid>,
473485
conn: DbConn,
474486
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
475487
Box::pin(async move {
476488
let sixty_days_ago = Utc::now() - Duration::days(60);
477489
let sixty_days_ago_naive: NaiveDateTime = sixty_days_ago.naive_utc();
478490

479-
let res = Game::find()
491+
let mut query = Game::find()
480492
.join(
481493
JoinType::LeftJoin,
482494
game::Relation::SignatureMetadataMapping.def(),
@@ -490,7 +502,13 @@ pub fn get_automatic_match_failed_games_with_limit<'a>(
490502
)
491503
.and(signature_metadata_mapping::Column::UpdatedAt.lt(sixty_days_ago_naive))
492504
.and(signature_metadata_mapping::Column::Provider.eq(provider)),
493-
)
505+
);
506+
507+
if let Some(after) = cursor {
508+
query = query.filter(game::Column::Id.gt(after));
509+
}
510+
511+
let res = query
494512
.order_by_asc(game::Column::Id)
495513
.limit(page_size)
496514
.all(&conn)
@@ -509,6 +527,7 @@ fn get_unmatched_games_with_limit<'a>(
509527
clone_of_null: bool,
510528
require_platform_mapping: bool,
511529
page_size: u64,
530+
cursor: Option<Uuid>,
512531
conn: DbConn,
513532
) -> BoxFuture<'a, anyhow::Result<Option<Vec<game::Model>>>> {
514533
Box::pin(async move {
@@ -543,7 +562,7 @@ fn get_unmatched_games_with_limit<'a>(
543562
);
544563
}
545564

546-
let res = query
565+
query = query
547566
.filter(if clone_of_null {
548567
game::Column::CloneOf.is_null()
549568
} else {
@@ -569,7 +588,13 @@ fn get_unmatched_games_with_limit<'a>(
569588
.to_owned(),
570589
)
571590
.not(),
572-
)
591+
);
592+
593+
if let Some(after) = cursor {
594+
query = query.filter(game::Column::Id.gt(after));
595+
}
596+
597+
let res = query
573598
.order_by_asc(game::Column::Id)
574599
.limit(page_size)
575600
.all(&conn)

service/src/db/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ macro_rules! unmatched_entities_with_limit {
3434
pub fn $fn(
3535
provider: ::entity::sea_orm_active_enums::MetadataProviderEnum,
3636
limit: u64,
37+
cursor: ::std::option::Option<::sea_orm::prelude::Uuid>,
3738
conn: ::sea_orm::DbConn,
3839
) -> ::futures_util::future::BoxFuture<'static, ::anyhow::Result<Option<Vec<$model>>>> {
3940
Box::pin(async move {
4041
use ::sea_orm::ActiveEnum;
41-
let rows = $entity::find()
42+
let mut query = $entity::find()
4243
.filter(
4344
::sea_orm::sea_query::Expr::exists(
4445
::sea_orm::sea_query::Query::select()
@@ -63,7 +64,11 @@ macro_rules! unmatched_entities_with_limit {
6364
.to_owned(),
6465
)
6566
.not(),
66-
)
67+
);
68+
if let ::std::option::Option::Some(after) = cursor {
69+
query = query.filter($column.gt(after));
70+
}
71+
let rows = query
6772
.order_by_asc($column)
6873
.limit(limit)
6974
.all(&conn)

service/src/providers/mod.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use entity::sea_orm_active_enums::{
1818
AutomaticMatchReasonEnum, FailedMatchReasonEnum, MatchTypeEnum, MetadataProviderEnum,
1919
};
2020
use futures_util::future::BoxFuture;
21-
use log::{debug, error, info, warn};
21+
use log::{debug, error, info};
2222
use sea_orm::DbConn;
2323
use sea_orm::prelude::Uuid;
2424
use std::sync::Arc;
@@ -30,11 +30,42 @@ pub const DEFAULT_CHUNK_SIZE: usize = 4;
3030

3131
pub const DEFAULT_PAGE_SIZE: u64 = 100;
3232

33+
/// Exposes the primary-key UUID used as the keyset pagination cursor.
34+
/// [`drive_match_pipeline`] reads `page_cursor()` off the last row of each
35+
/// page and passes it back to the next [`FetchPageFn`] call so the query
36+
/// can use `WHERE id > $cursor` instead of re-scanning from the start.
37+
pub trait PageCursor {
38+
fn page_cursor(&self) -> Uuid;
39+
}
40+
41+
impl PageCursor for entity::game::Model {
42+
fn page_cursor(&self) -> Uuid {
43+
self.id
44+
}
45+
}
46+
47+
impl PageCursor for entity::company::Model {
48+
fn page_cursor(&self) -> Uuid {
49+
self.id
50+
}
51+
}
52+
53+
impl PageCursor for entity::platform::Model {
54+
fn page_cursor(&self) -> Uuid {
55+
self.id
56+
}
57+
}
58+
3359
/// Function pointer signature for the per-page fetch callback used by
34-
/// [`drive_match_pipeline`]. Returns the next page of unmatched entities of
35-
/// type `M` for the given provider, or `None` when the queue is drained.
36-
pub type FetchPageFn<M> =
37-
fn(MetadataProviderEnum, u64, DbConn) -> BoxFuture<'static, anyhow::Result<Option<Vec<M>>>>;
60+
/// [`drive_match_pipeline`]. Takes a `cursor` of the last id from the previous
61+
/// page (or `None` for the first call) so the underlying query can skip rows
62+
/// that have already been processed.
63+
pub type FetchPageFn<M> = fn(
64+
MetadataProviderEnum,
65+
u64,
66+
Option<Uuid>,
67+
DbConn,
68+
) -> BoxFuture<'static, anyhow::Result<Option<Vec<M>>>>;
3869

3970
/// Function pointer signature for the per-entity match callback used by
4071
/// [`drive_match_pipeline`]. Receives one entity, an `Arc` clone of the
@@ -178,11 +209,9 @@ fn failed_reason_label(r: FailedMatchReasonEnum) -> &'static str {
178209
/// Per-entity errors are logged with the `label` prefix and do not abort
179210
/// the loop.
180211
///
181-
/// The loop also aborts when two consecutive page fetches return the same
182-
/// entities. That happens when every task in a page bailed without writing
183-
/// (e.g. ScreenScraper hit its quota and every game-task early-returns
184-
/// `Ok(())` without a mapping write); without this guard the page query
185-
/// would keep returning the same set forever and starve the next provider.
212+
/// Uses keyset pagination via [`PageCursor`]: the last id of each page seeds
213+
/// the next `fetch_fn` call so the query skips already-processed rows in
214+
/// O(log N) rather than re-scanning from the smallest id every iteration.
186215
pub async fn drive_match_pipeline<M, C>(
187216
label: &'static str,
188217
provider: MetadataProviderEnum,
@@ -193,17 +222,12 @@ pub async fn drive_match_pipeline<M, C>(
193222
chunk_size: usize,
194223
) -> anyhow::Result<()>
195224
where
196-
M: Clone + PartialEq + Send + 'static,
225+
M: Clone + PageCursor + Send + 'static,
197226
C: Send + Sync + 'static,
198227
{
199-
let mut last_page: Option<Vec<M>> = None;
200-
while let Some(page) = fetch_fn(provider, DEFAULT_PAGE_SIZE, db_conn.clone()).await? {
201-
if last_page.as_ref() == Some(&page) {
202-
warn!(
203-
"Provider returned the same {label} page twice in a row; aborting cycle to avoid infinite loop (likely cause: upstream quota or persistent error)"
204-
);
205-
break;
206-
}
228+
let mut cursor: Option<Uuid> = None;
229+
while let Some(page) = fetch_fn(provider, DEFAULT_PAGE_SIZE, cursor, db_conn.clone()).await? {
230+
let next_cursor = page.last().map(PageCursor::page_cursor);
207231
for chunk in page.chunks(chunk_size) {
208232
let mut handles = Vec::with_capacity(chunk.len());
209233
for entity in chunk.iter().cloned() {
@@ -217,7 +241,10 @@ where
217241
}
218242
}
219243
}
220-
last_page = Some(page);
244+
cursor = match next_cursor {
245+
Some(c) => Some(c),
246+
None => break,
247+
};
221248
}
222249
Ok(())
223250
}
@@ -340,13 +367,16 @@ where
340367
C: Send + Sync + 'static,
341368
{
342369
let provider_label = provider_label_for(provider);
370+
let mut cursor: Option<Uuid> = None;
343371
while let Some(page) = crate::db::game::get_failed_games_for_cross_pass_with_limit(
344372
provider,
345373
DEFAULT_PAGE_SIZE,
374+
cursor,
346375
db_conn.clone(),
347376
)
348377
.await?
349378
{
379+
let next_cursor = page.last().map(PageCursor::page_cursor);
350380
for chunk in page.chunks(chunk_size) {
351381
let mut handles = Vec::with_capacity(chunk.len());
352382
for game in chunk.iter().cloned() {
@@ -389,6 +419,10 @@ where
389419
);
390420
}
391421
}
422+
cursor = match next_cursor {
423+
Some(c) => Some(c),
424+
None => break,
425+
};
392426
}
393427
Ok(())
394428
}

0 commit comments

Comments
 (0)