Skip to content

Commit 1cbd4a6

Browse files
rickyromboclaude
andauthored
Backfill sol_purchases from usdc_purchases (#815)
## Summary Step 1 of the purchases-domain cutover. This PR populates the new Go-indexer tables with the historical data they're missing today, adds the compatibility view + parallel notification trigger, but **leaves all readers on the legacy `usdc_purchases` table**. The route swap is a separate PR (step 2) that lands after this one is verified on production. The shape mirrors PR #809 (challenges cutover): bounded migration, view-based read translation, parallel trigger that dedupes via shared `group_id`. ## What's in this PR **Schema / migration** — `ddl/migrations/0199_backfill_sol_purchases.sql` - Adds `created_at TIMESTAMP DEFAULT NOW()` to `sol_purchases`. Same gap we hit on `sol_reward_disbursements` in #809. - Copies historical purchases from `usdc_purchases` into `sol_purchases`. `from_account` is resolved to the buyer's USDC user_bank via `usdc_user_bank_accounts` so the NOT NULL column has a real value. - Patches `created_at` on rows the Go indexer wrote before this migration (their `created_at` was just `NOW()` from the default; corrects them from the legacy table where the legacy value is older). - Explodes `usdc_purchases.splits` JSONB into one `sol_payments` row per element. Element shape is `{payout_wallet, amount, percentage, user_id, eth_wallet}` per `add_wallet_info_to_splits()` in the Python source. - Adds `sol_purchases_created_at_idx` so the route-side default sort by `created_at` doesn't degrade. **View** — `ddl/views/v_usdc_purchases.sql` - Exposes `sol_purchases` + `sol_payments` in the legacy column shape so step 2's route swap is mostly a one-token rename. - `seller_user_id` is derived from current content ownership (`tracks.owner_id` / `playlists.playlist_owner_id`). Note: this is current owner, not snapshotted at purchase time. Legacy was a snapshot — accepting this drift per design discussion. - `extra_amount` is derived as `amount - base_price` via a correlated subquery against `track_price_history` / `album_price_history` (block_timestamp <= purchase created_at, ORDER BY DESC LIMIT 1). - `splits` JSON is aggregated over `sol_payments` with user_id resolved via `COALESCE(users.spl_usdc_payout_wallet match, sol_claimable_accounts mint=USDC match)`. Network-cut payments (to the staking bridge wallet) emit `user_id: null`. - `vendor` is intentionally dropped from the view. - Filtered to `is_valid IS TRUE` to match the legacy table's semantics (Python only wrote validated purchases). **Trigger** — `ddl/functions/handle_usdc_purchase.sql` - Appends a `handle_sol_purchase` function and `on_sol_purchase AFTER INSERT ON sol_purchases` trigger. - Notification shape and `group_id` format match the legacy trigger byte-for-byte (verified against the existing function body), so during the backfill — where every inserted row fires the new trigger and tries to recreate notifications whose `group_id`s were created by the legacy trigger long ago — `ON CONFLICT DO NOTHING` makes them no-ops. - `vendor` and `extra_amount` are emitted as `null` in the new payload; downstream consumers must tolerate this. **Cleanup** — `sql/01_schema.sql` - Dropped a stale `block_timestamp` column from the `sol_purchases` table definition. No migration creates it and nothing in the repo references it; the dump had drifted from reality. ## What's NOT in this PR - **Reader changes.** All 14+ Go routes that join `usdc_purchases` (`v1_users_purchases`, `v1_users_sales`, `v1_users_purchasers`, `v1_explore_best_selling`, `v1_users_library_*`, `v1_fan_club_feed`, `comms_blasts`, `dbv1/access.go`, `comms/chat.go`, etc.) are unchanged. Until this PR's backfill is verified in production, swapping readers would risk old purchases disappearing. - **Python decommission.** `index_payment_router` keeps writing `usdc_purchases` (legacy trigger keeps firing on insert). The new trigger dedupes against it via `group_id`. - **Go indexer update for `created_at`.** A small follow-up: `solana/indexer/program/payment_router.go` should explicitly write `created_at` so new rows get the on-chain time rather than `NOW()` from the column default. Default is correct-enough until then. - **Vendor field.** Lost in the view; if anything frontend-side breaks on `vendor: null` notifications we'll need a workaround. ## Test plan After this lands, before opening the step-2 PR, verify on a prod replica: - [ ] Row-count parity: ```sql SELECT (SELECT count(*) FROM usdc_purchases) AS legacy, (SELECT count(*) FROM sol_purchases WHERE is_valid IS TRUE) AS new_valid; ``` - [ ] Splits parity for a 50-row sample: ```sql SELECT up.signature, jsonb_array_length(up.splits) AS legacy_splits, (SELECT count(*) FROM sol_payments WHERE signature = up.signature AND instruction_index = 0) AS new_splits FROM usdc_purchases up ORDER BY random() LIMIT 50; ``` - [ ] Spot-check `v_usdc_purchases` against `usdc_purchases` for a few signatures: same `buyer_user_id`, same `amount`, comparable `splits[*].user_id` and `payout_wallet`. - [ ] Confirm trigger dedupe: insert a `sol_purchases` row matching an existing `usdc_purchases` row in dev; assert no new notification row appears. - [ ] Cloud SQL logs during deploy: no `statement_timeout`, no `pg_type_typname_nsp_index`, no `deadlock detected`. - [ ] `go test ./api/...` green (no reader changes, no test changes expected). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 002c0cf commit 1cbd4a6

5 files changed

Lines changed: 353 additions & 6 deletions

File tree

api/dbv1/models.go

Lines changed: 22 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ddl/functions/handle_usdc_purchase.sql

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,91 @@ do $$ begin
5656
exception
5757
when others then null;
5858
end $$;
59+
60+
61+
-- Mirror of handle_usdc_purchase for the new indexer's table. Resolves
62+
-- seller_user_id from current content ownership (same CASE used in
63+
-- v_usdc_purchases). Notification shape and group_id format match the legacy
64+
-- trigger so the two can dual-fire during cutover (second insert is a no-op
65+
-- via on conflict). extra_amount and vendor are emitted as null in the new
66+
-- payload because they aren't denormalized onto sol_purchases.
67+
create or replace function handle_sol_purchase() returns trigger as $$
68+
declare
69+
resolved_seller_user_id integer;
70+
begin
71+
if new.is_valid is not true then
72+
return null;
73+
end if;
74+
75+
if new.content_type = 'track' then
76+
select owner_id into resolved_seller_user_id
77+
from tracks
78+
where track_id = new.content_id
79+
and is_current = true
80+
limit 1;
81+
else
82+
select playlist_owner_id into resolved_seller_user_id
83+
from playlists
84+
where playlist_id = new.content_id
85+
and is_current = true
86+
limit 1;
87+
end if;
88+
89+
if resolved_seller_user_id is null then
90+
return null;
91+
end if;
92+
93+
insert into notification
94+
(slot, user_ids, timestamp, type, specifier, group_id, data)
95+
values
96+
(
97+
new.slot,
98+
ARRAY [resolved_seller_user_id],
99+
new.created_at,
100+
'usdc_purchase_seller',
101+
new.buyer_user_id,
102+
'usdc_purchase_seller:' || 'seller_user_id:' || resolved_seller_user_id || ':buyer_user_id:' || new.buyer_user_id || ':content_id:' || new.content_id || ':content_type:' || new.content_type,
103+
json_build_object(
104+
'content_type', new.content_type,
105+
'buyer_user_id', new.buyer_user_id,
106+
'seller_user_id', resolved_seller_user_id,
107+
'amount', new.amount,
108+
'extra_amount', null,
109+
'content_id', new.content_id,
110+
'vendor', null
111+
)
112+
),
113+
(
114+
new.slot,
115+
ARRAY [new.buyer_user_id],
116+
new.created_at,
117+
'usdc_purchase_buyer',
118+
new.buyer_user_id,
119+
'usdc_purchase_buyer:' || 'seller_user_id:' || resolved_seller_user_id || ':buyer_user_id:' || new.buyer_user_id || ':content_id:' || new.content_id || ':content_type:' || new.content_type,
120+
json_build_object(
121+
'content_type', new.content_type,
122+
'buyer_user_id', new.buyer_user_id,
123+
'seller_user_id', resolved_seller_user_id,
124+
'amount', new.amount,
125+
'extra_amount', null,
126+
'content_id', new.content_id,
127+
'vendor', null
128+
)
129+
)
130+
on conflict do nothing;
131+
132+
return null;
133+
exception
134+
when others then
135+
raise warning 'An error occurred in %: %', tg_name, sqlerrm;
136+
return null;
137+
end;
138+
$$ language plpgsql;
139+
140+
do $$ begin
141+
create trigger on_sol_purchase
142+
after insert on sol_purchases
143+
for each row execute procedure handle_sol_purchase();
144+
exception
145+
when others then null;
146+
end $$;
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
BEGIN;
2+
SET LOCAL statement_timeout = 0;
3+
4+
-- Parity with the legacy usdc_purchases.created_at column. The Go indexer
5+
-- writes new rows close to on-chain time, so DEFAULT NOW() is acceptable; the
6+
-- backfill below corrects rows that came from the legacy table.
7+
ALTER TABLE sol_purchases
8+
ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT NOW();
9+
10+
-- Backfill historical purchases that predate the Go indexer. from_account is
11+
-- resolved via the buyer's USDC user_bank so the NOT NULL column has a real
12+
-- value. Falls back to '' for buyers whose bank account is unknown — the
13+
-- sol_purchases_from_account_idx tolerates empty strings.
14+
INSERT INTO sol_purchases (
15+
signature, instruction_index, amount, slot,
16+
from_account, content_type, content_id, buyer_user_id,
17+
access_type, valid_after_blocknumber, is_valid,
18+
city, region, country, created_at
19+
)
20+
SELECT
21+
up.signature,
22+
0 AS instruction_index,
23+
up.amount,
24+
up.slot,
25+
COALESCE(uuba.bank_account, '') AS from_account,
26+
up.content_type::text,
27+
up.content_id,
28+
up.buyer_user_id,
29+
up.access::text,
30+
0 AS valid_after_blocknumber,
31+
TRUE AS is_valid,
32+
up.city, up.region, up.country,
33+
up.created_at
34+
FROM usdc_purchases up
35+
LEFT JOIN users u
36+
ON u.user_id = up.buyer_user_id AND u.is_current = TRUE
37+
LEFT JOIN usdc_user_bank_accounts uuba
38+
ON uuba.ethereum_address = u.wallet
39+
ON CONFLICT (signature, instruction_index) DO NOTHING;
40+
41+
-- Correct created_at for rows the Go indexer wrote before this migration ran:
42+
-- those rows got NOW() from the column default, but the legacy table has the
43+
-- real on-chain time. Only updates rows whose existing created_at is later
44+
-- than the legacy value, so it leaves accurate Go-indexer writes alone.
45+
UPDATE sol_purchases sp
46+
SET created_at = up.created_at
47+
FROM usdc_purchases up
48+
WHERE up.signature = sp.signature
49+
AND up.created_at < sp.created_at;
50+
51+
-- Explode legacy usdc_purchases.splits JSONB into sol_payments rows. The
52+
-- element shape is {payout_wallet, amount, percentage, user_id, eth_wallet}
53+
-- per add_wallet_info_to_splits() in
54+
-- discovery-provider/src/queries/get_extended_purchase_gate.py.
55+
INSERT INTO sol_payments (signature, instruction_index, route_index, to_account, amount, slot)
56+
SELECT
57+
up.signature,
58+
0 AS instruction_index,
59+
(ord - 1)::int AS route_index,
60+
elem->>'payout_wallet' AS to_account,
61+
(elem->>'amount')::bigint AS amount,
62+
up.slot
63+
FROM usdc_purchases up
64+
CROSS JOIN LATERAL jsonb_array_elements(up.splits) WITH ORDINALITY arr(elem, ord)
65+
WHERE elem->>'payout_wallet' IS NOT NULL
66+
ON CONFLICT (signature, instruction_index, route_index) DO NOTHING;
67+
68+
-- Default sort across the purchases / sales / library routes is by created_at;
69+
-- restore the index parity the legacy table had via idx_usdc_purchases_created_at.
70+
CREATE INDEX IF NOT EXISTS sol_purchases_created_at_idx
71+
ON sol_purchases (created_at);
72+
73+
COMMIT;

ddl/views/v_usdc_purchases.sql

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
DROP VIEW IF EXISTS v_usdc_purchases;
2+
CREATE VIEW v_usdc_purchases AS
3+
SELECT
4+
sp.signature,
5+
sp.slot,
6+
sp.buyer_user_id,
7+
CASE sp.content_type
8+
WHEN 'track' THEN t.owner_id
9+
WHEN 'album' THEN p.playlist_owner_id
10+
WHEN 'playlist' THEN p.playlist_owner_id
11+
END AS seller_user_id,
12+
sp.amount,
13+
sp.content_type::usdc_purchase_content_type AS content_type,
14+
sp.content_id,
15+
sp.created_at,
16+
GREATEST(
17+
sp.amount - COALESCE(
18+
CASE sp.content_type
19+
WHEN 'track' THEN (
20+
SELECT tph.total_price_cents * 10000
21+
FROM track_price_history tph
22+
WHERE tph.track_id = sp.content_id
23+
AND tph.block_timestamp <= sp.created_at
24+
ORDER BY tph.block_timestamp DESC
25+
LIMIT 1
26+
)
27+
ELSE (
28+
SELECT aph.total_price_cents * 10000
29+
FROM album_price_history aph
30+
WHERE aph.playlist_id = sp.content_id
31+
AND aph.block_timestamp <= sp.created_at
32+
ORDER BY aph.block_timestamp DESC
33+
LIMIT 1
34+
)
35+
END,
36+
0
37+
),
38+
0
39+
) AS extra_amount,
40+
sp.access_type::usdc_purchase_access_type AS access,
41+
sp.city, sp.region, sp.country,
42+
(
43+
SELECT COALESCE(
44+
jsonb_agg(
45+
jsonb_build_object(
46+
'user_id', COALESCE(u_payout.user_id, u_sca.user_id),
47+
'payout_wallet', pay.to_account,
48+
'amount', pay.amount,
49+
'percentage', pay.amount * 100.0 / NULLIF(sp.amount, 0)
50+
)
51+
ORDER BY pay.route_index
52+
),
53+
'[]'::jsonb
54+
)
55+
FROM sol_payments pay
56+
LEFT JOIN users u_payout
57+
ON u_payout.spl_usdc_payout_wallet = pay.to_account
58+
AND u_payout.is_current = TRUE
59+
LEFT JOIN sol_claimable_accounts sca
60+
ON sca.account = pay.to_account
61+
AND sca.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v'
62+
LEFT JOIN users u_sca
63+
ON u_sca.wallet = sca.ethereum_address
64+
AND u_sca.is_current = TRUE
65+
WHERE pay.signature = sp.signature
66+
AND pay.instruction_index = sp.instruction_index
67+
) AS splits
68+
FROM sol_purchases sp
69+
LEFT JOIN tracks t
70+
ON sp.content_type = 'track'
71+
AND t.track_id = sp.content_id
72+
AND t.is_current = TRUE
73+
LEFT JOIN playlists p
74+
ON sp.content_type IN ('album', 'playlist')
75+
AND p.playlist_id = sp.content_id
76+
AND p.is_current = TRUE
77+
WHERE sp.is_valid IS TRUE;
78+
79+
COMMENT ON VIEW v_usdc_purchases IS 'Compatibility view exposing sol_purchases + sol_payments in the column shape API routes used to read from usdc_purchases. seller_user_id is the current content owner (not snapshotted at purchase time). extra_amount is amount paid minus base price from price history. vendor is intentionally dropped.';

sql/01_schema.sql

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8320,7 +8320,7 @@ CREATE TABLE public.sol_purchases (
83208320
city character varying,
83218321
region character varying,
83228322
country character varying,
8323-
block_timestamp timestamp with time zone
8323+
created_at timestamp without time zone DEFAULT now()
83248324
);
83258325

83268326

@@ -11956,6 +11956,13 @@ CREATE INDEX sol_purchases_from_account_idx ON public.sol_purchases USING btree
1195611956
COMMENT ON INDEX public.sol_purchases_from_account_idx IS 'Used for getting purchases by a user via their account.';
1195711957

1195811958

11959+
--
11960+
-- Name: sol_purchases_created_at_idx; Type: INDEX; Schema: public; Owner: -
11961+
--
11962+
11963+
CREATE INDEX sol_purchases_created_at_idx ON public.sol_purchases USING btree (created_at);
11964+
11965+
1195911966
--
1196011967
-- Name: sol_purchases_valid_idx; Type: INDEX; Schema: public; Owner: -
1196111968
--
@@ -12889,6 +12896,89 @@ CREATE VIEW public.v_challenge_disbursements AS
1288912896
JOIN public.users ON (((users.wallet = rd.recipient_eth_address) AND (users.is_current = true))));
1289012897

1289112898

12899+
--
12900+
-- Name: v_usdc_purchases; Type: VIEW; Schema: public; Owner: -
12901+
--
12902+
12903+
CREATE VIEW public.v_usdc_purchases AS
12904+
SELECT sp.signature,
12905+
sp.slot,
12906+
sp.buyer_user_id,
12907+
CASE sp.content_type
12908+
WHEN 'track'::text THEN t.owner_id
12909+
WHEN 'album'::text THEN p.playlist_owner_id
12910+
WHEN 'playlist'::text THEN p.playlist_owner_id
12911+
END AS seller_user_id,
12912+
sp.amount,
12913+
(sp.content_type)::public.usdc_purchase_content_type AS content_type,
12914+
sp.content_id,
12915+
sp.created_at,
12916+
GREATEST(
12917+
sp.amount - COALESCE(
12918+
CASE sp.content_type
12919+
WHEN 'track'::text THEN (
12920+
SELECT (tph.total_price_cents * 10000)
12921+
FROM public.track_price_history tph
12922+
WHERE tph.track_id = sp.content_id
12923+
AND tph.block_timestamp <= sp.created_at
12924+
ORDER BY tph.block_timestamp DESC
12925+
LIMIT 1
12926+
)
12927+
ELSE (
12928+
SELECT (aph.total_price_cents * 10000)
12929+
FROM public.album_price_history aph
12930+
WHERE aph.playlist_id = sp.content_id
12931+
AND aph.block_timestamp <= sp.created_at
12932+
ORDER BY aph.block_timestamp DESC
12933+
LIMIT 1
12934+
)
12935+
END,
12936+
0
12937+
),
12938+
0
12939+
) AS extra_amount,
12940+
(sp.access_type)::public.usdc_purchase_access_type AS access,
12941+
sp.city,
12942+
sp.region,
12943+
sp.country,
12944+
(
12945+
SELECT COALESCE(
12946+
jsonb_agg(
12947+
jsonb_build_object(
12948+
'user_id', COALESCE(u_payout.user_id, u_sca.user_id),
12949+
'payout_wallet', pay.to_account,
12950+
'amount', pay.amount,
12951+
'percentage', ((pay.amount * 100.0) / NULLIF(sp.amount, 0))
12952+
)
12953+
ORDER BY pay.route_index
12954+
),
12955+
'[]'::jsonb
12956+
)
12957+
FROM public.sol_payments pay
12958+
LEFT JOIN public.users u_payout
12959+
ON u_payout.spl_usdc_payout_wallet = pay.to_account
12960+
AND u_payout.is_current = true
12961+
LEFT JOIN public.sol_claimable_accounts sca
12962+
ON sca.account = pay.to_account
12963+
AND sca.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v'::text
12964+
LEFT JOIN public.users u_sca
12965+
ON u_sca.wallet = sca.ethereum_address
12966+
AND u_sca.is_current = true
12967+
WHERE pay.signature = sp.signature
12968+
AND pay.instruction_index = sp.instruction_index
12969+
) AS splits
12970+
FROM public.sol_purchases sp
12971+
LEFT JOIN public.tracks t
12972+
ON sp.content_type = 'track'::text
12973+
AND t.track_id = sp.content_id
12974+
AND t.is_current = true
12975+
LEFT JOIN public.playlists p
12976+
ON sp.content_type IN ('album'::text, 'playlist'::text)
12977+
AND p.playlist_id = sp.content_id
12978+
AND p.is_current = true
12979+
WHERE sp.is_valid IS TRUE;
12980+
12981+
1289212982
--
1289312983
-- PostgreSQL database dump complete
1289412984
--

0 commit comments

Comments
 (0)