Skip to content

Commit 05b43df

Browse files
authored
Re-validate pending sol_purchases when blocknumber catches up (#818)
## Summary - Adds a trigger on `tracks`/`playlists` that emits `NOTIFY 'pending_purchase_revalidation'` when `blocknumber` advances past any pending `sol_purchases` row's `valid_after_blocknumber`. Cheap EXISTS guard so the vast majority of track/playlist updates skip the notify entirely. - Adds a Go listener (`PurchaseRevalidator`) in the Solana indexer that consumes notifications and re-runs the existing `validatePurchase` for affected pending rows. A 5-minute sweep + startup sweep covers cases where NOTIFY drops (no connected listener) or pending rows that predate the trigger. - Validation logic stays in `validatePurchase` as the single source of truth — the trigger only signals which `content_id` changed. No SQL port, no parity drift with config-driven values like `NetworkTakeRate`. ### Why A `sol_purchases` row is inserted with `is_valid = NULL` ("pending") when its `valid_after_blocknumber` hasn't been indexed by the local node yet (`MAX(blocks.number) < valid_after_blocknumber` at insert time). Before this PR there was no mechanism to flip pending rows to a final verdict, so they stayed `NULL` indefinitely even after the indexer caught up. ### Notes for reviewers - `sql/01_schema.sql` diff is large because the dump hadn't been regenerated since #815 merged — it picks up that PR's `handle_sol_purchase` function and a few other already-merged trigger updates alongside this PR's `notify_pending_purchase_revalidation`. - Bumped `pgxpool.MaxConns` by 1 to budget for the revalidator's persistent LISTEN connection (in addition to the existing artist_coins listener). - Lifecycle is purely ctx-driven — no explicit `Stop()`. Cancelling the parent ctx unblocks `WaitForNotification`, the deferred `conn.Release()` returns the LISTEN connection to the pool, both goroutines exit. ## Test plan - [x] Trigger function applied via regenerated schema dump (`make test-schema`) - [x] `TestParseRevalidationPayload` — unit coverage for payload parsing - [x] `TestRevalidatorRevalidateContent` — direct call: pending row + matching `sol_payments` flips to `true` - [x] `TestRevalidatorEndToEnd` — full trigger → NOTIFY → LISTEN → revalidate chain via `UPDATE tracks SET blocknumber` - [x] Existing `TestPurchaseValidation` still passes - [ ] Deploy to stage and watch a pending row resolve in the wild 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent a284c53 commit 05b43df

6 files changed

Lines changed: 1059 additions & 133 deletions

File tree

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
-- Notifies Go to re-run validatePurchase for sol_purchases rows that became
2+
-- eligible because a tracks/playlists row's blocknumber advanced past their
3+
-- valid_after_blocknumber. The trigger does no validation itself — it just
4+
-- signals which content_id changed; the Go indexer LISTENs on
5+
-- 'pending_purchase_revalidation' and re-runs the existing validator so the
6+
-- math stays in one place. A periodic sweep in Go catches anything missed
7+
-- (NOTIFY is dropped if no listener is connected at the moment).
8+
create or replace function notify_pending_purchase_revalidation() returns trigger as $$
9+
declare
10+
v_content_type text;
11+
v_content_id int;
12+
v_blocknumber int;
13+
begin
14+
if tg_table_name = 'tracks' then
15+
v_content_type := 'track';
16+
v_content_id := new.track_id;
17+
elsif tg_table_name = 'playlists' then
18+
v_content_type := 'album';
19+
v_content_id := new.playlist_id;
20+
else
21+
return null;
22+
end if;
23+
24+
v_blocknumber := new.blocknumber;
25+
if v_blocknumber is null then
26+
return null;
27+
end if;
28+
29+
-- Cheap EXISTS guard: tracks/playlists update churn dwarfs the pending
30+
-- purchase set, so it's almost always a no-op. Uses sol_purchases_valid_idx
31+
-- + sol_purchases_content_idx.
32+
if exists (
33+
select 1 from sol_purchases sp
34+
where sp.content_id = v_content_id
35+
and sp.content_type = v_content_type
36+
and sp.is_valid is null
37+
and sp.valid_after_blocknumber <= v_blocknumber
38+
) then
39+
perform pg_notify(
40+
'pending_purchase_revalidation',
41+
v_content_type || ':' || v_content_id::text
42+
);
43+
end if;
44+
45+
return null;
46+
exception
47+
when others then
48+
-- Never let a notify failure break a tracks/playlists write. The sweep
49+
-- will catch any pending rows that don't get notified.
50+
raise warning 'An error occurred in %: %', tg_name, sqlerrm;
51+
return null;
52+
end;
53+
$$ language plpgsql;
54+
55+
56+
do $$ begin
57+
create trigger on_track_notify_pending_purchase_revalidation
58+
after insert or update of blocknumber on tracks
59+
for each row execute procedure notify_pending_purchase_revalidation();
60+
exception
61+
when others then null;
62+
end $$;
63+
64+
do $$ begin
65+
create trigger on_playlist_notify_pending_purchase_revalidation
66+
after insert or update of blocknumber on playlists
67+
for each row execute procedure notify_pending_purchase_revalidation();
68+
exception
69+
when others then null;
70+
end $$;

solana/indexer/program/indexer.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Indexer struct {
2626
rpcClient common.RpcClient
2727
config config.Config
2828
transactionCache *otter.Cache[solana.Signature, *rpc.GetTransactionResult]
29+
revalidator *Revalidator
2930
logger *zap.Logger
3031
}
3132

@@ -37,17 +38,21 @@ func New(
3738
transactionCache *otter.Cache[solana.Signature, *rpc.GetTransactionResult],
3839
logger *zap.Logger,
3940
) *Indexer {
41+
namedLogger := logger.Named(NAME)
4042
return &Indexer{
4143
pool: pool,
4244
grpcConfig: grpcConfig,
4345
rpcClient: rpcClient,
4446
config: config,
4547
transactionCache: transactionCache,
46-
logger: logger.Named(NAME),
48+
revalidator: NewRevalidator(pool, config, namedLogger),
49+
logger: namedLogger,
4750
}
4851
}
4952

5053
func (d *Indexer) Start(ctx context.Context) {
54+
d.revalidator.Start(ctx)
55+
5156
client, err := d.subscribe(ctx)
5257
if err != nil {
5358
d.logger.Fatal("failed to start subscription", zap.Error(err))

0 commit comments

Comments
 (0)