@@ -7,14 +7,16 @@ import { inject, injectable } from "@mainsail/container";
77import { Contracts , Identifiers } from "@mainsail/contracts" ;
88import { Application , Providers } from "@mainsail/kernel" ;
99import { assert } from "@mainsail/utils" ;
10- import { DataSource } from "typeorm" ;
10+ import { DataSource , EntityManager } from "typeorm" ;
1111import { PostgresConnectionOptions } from "typeorm/driver/postgres/PostgresConnectionOptions.js" ;
1212
1313import { Identifiers as InternalIdentifiers } from "../identifiers.js" ;
1414import { LegacyChainTip , LegacySnapshot , LegacyWallet } from "../interfaces.js" ;
1515
1616interface DatabaseOptions extends PostgresConnectionOptions {
1717 readonly v3 : {
18+ readonly host : string ;
19+ readonly port : number ;
1820 readonly user : string ;
1921 readonly password : string ;
2022 readonly database : string ;
@@ -26,32 +28,8 @@ export class Generator {
2628 @inject ( InternalIdentifiers . Application )
2729 private app ! : Application ;
2830
29- async #connect( ) : Promise < DataSource > {
30- const pluginConfig = await this . app
31- . resolve ( Providers . PluginConfiguration )
32- . discover ( "@mainsail/snapshot-legacy-exporter" , process . cwd ( ) ) ;
33-
34- const options = pluginConfig . get < DatabaseOptions > ( "database" ) ;
35- assert . defined ( options ) ;
36-
37- const dataSource = new DataSource ( {
38- ...options ,
39- entities : [ ] ,
40- migrations : [ ] ,
41- migrationsRun : false ,
42- synchronize : false ,
43- } ) ;
44-
45- await dataSource . initialize ( ) ;
46-
47- // Link v3 database
48- await dataSource . query ( `
49- CREATE EXTENSION IF NOT EXISTS dblink;
50- SELECT dblink_connect('v3_db', 'dbname=${ options . v3 . database } user=${ options . v3 . user } password=${ options . v3 . password } ');
51- ` ) ;
52-
53- return dataSource ;
54- }
31+ @inject ( Identifiers . Services . Log . Service )
32+ private readonly logger ! : Contracts . Kernel . Logger ;
5533
5634 public async generateStatic ( chainTip : LegacyChainTip , wallets : LegacyWallet [ ] ) : Promise < void > {
5735 const addressFactory = this . app . get < Contracts . Crypto . AddressFactory > (
@@ -75,64 +53,97 @@ export class Generator {
7553 }
7654
7755 public async generate ( ) : Promise < void > {
78- // Connect to V3 database
79- const dataSource = await this . #connect( ) ;
80- console . log ( "connected!" ) ;
81-
82- const [ chainTip ] = await dataSource . query (
83- "SELECT * FROM dblink('v3_db', 'SELECT id, height FROM blocks ORDER BY height DESC LIMIT 1') AS blocks(hash varchar, number bigint);" ,
84- ) ;
56+ await this . #runInTransaction( async ( entityManager ) => {
57+ this . logger . info ( "connected!" ) ;
58+
59+ const [ chainTip ] = await entityManager . query (
60+ "SELECT * FROM dblink('v3_db', 'SELECT id, height FROM blocks ORDER BY height DESC LIMIT 1') AS blocks(hash varchar, number bigint);" ,
61+ ) ;
62+
63+ const addressFactory = this . app . get < Contracts . Crypto . AddressFactory > (
64+ Identifiers . Cryptography . Identity . Address . Factory ,
65+ ) ;
66+
67+ // Loop all wallets
68+ const limit = 1000 ;
69+ let offset = 0 ;
70+
71+ const wallets : LegacyWallet [ ] = [ ] ;
72+ for ( ; ; ) {
73+ this . logger . info ( `Fetching wallets (offset: ${ offset } , limit: ${ limit } )` ) ;
74+
75+ const chunk : LegacyWallet [ ] = await entityManager . query ( `
76+ SELECT * FROM dblink(
77+ 'v3_db',
78+ '
79+ SELECT address, public_key, balance, attributes FROM wallets
80+ ORDER BY balance DESC, address ASC LIMIT ${ limit } OFFSET ${ offset }
81+ '
82+ ) AS wallets("arkAddress" varchar, "publicKey" varchar, balance bigint, attributes jsonb);
83+ ` ) ;
84+
85+ for ( const wallet of chunk ) {
86+ // sanitize
87+ if ( wallet . attributes ?. [ "delegate" ] ) {
88+ delete wallet . attributes ?. [ "delegate" ] [ "lastBlock" ] ;
89+ }
90+
91+ delete wallet . attributes ?. [ "ipfs" ] ; // ?
92+ delete wallet . attributes ?. [ "business" ] ; // ?
93+ delete wallet . attributes ?. [ "htlc" ] ; // ?
94+ delete wallet . attributes ?. [ "entities" ] ; // ?
95+
96+ wallets . push ( {
97+ ...wallet ,
98+ ...( wallet . publicKey
99+ ? {
100+ ethAddress : await addressFactory . fromPublicKey ( wallet . publicKey ) ,
101+ }
102+ : { } ) ,
103+ } ) ;
104+ }
85105
86- const addressFactory = this . app . get < Contracts . Crypto . AddressFactory > (
87- Identifiers . Cryptography . Identity . Address . Factory ,
88- ) ;
106+ offset += limit ;
89107
90- // Loop all wallets
91- const limit = 1000 ;
92- let offset = 0 ;
93-
94- const wallets : LegacyWallet [ ] = [ ] ;
95- for ( ; ; ) {
96- const chunk : LegacyWallet [ ] = await dataSource . query ( `
97- SELECT * FROM dblink(
98- 'v3_db',
99- '
100- SELECT address, public_key, balance, attributes FROM wallets -- WHERE attributes ?| array[''vote'', ''delegate'', ''username'']
101- ORDER BY balance DESC, address ASC LIMIT ${ limit } OFFSET ${ offset }
102- '
103- ) AS wallets("arkAddress" varchar, "publicKey" varchar, balance bigint, attributes jsonb);
104- ` ) ;
105-
106- for ( const wallet of chunk ) {
107- // sanitize
108- if ( wallet . attributes ?. [ "delegate" ] ) {
109- delete wallet . attributes ?. [ "delegate" ] [ "lastBlock" ] ;
108+ if ( chunk . length === 0 ) {
109+ break ;
110110 }
111+ }
111112
112- delete wallet . attributes ?. [ "ipfs" ] ; // ?
113- delete wallet . attributes ?. [ "business" ] ; // ?
114- delete wallet . attributes ?. [ "htlc" ] ; // ?
115- delete wallet . attributes ?. [ "entities" ] ; // ?
113+ await this . #writeSnapshot( chainTip , wallets ) ;
114+ } ) ;
115+ }
116116
117- wallets . push ( {
118- ...wallet ,
119- ...( wallet . publicKey
120- ? {
121- ethAddress : await addressFactory . fromPublicKey ( wallet . publicKey ) ,
122- }
123- : { } ) ,
124- } ) ;
125- }
117+ async #runInTransaction( callback : ( entityManager : EntityManager ) => Promise < void > ) : Promise < void > {
118+ const pluginConfig = await this . app
119+ . resolve ( Providers . PluginConfiguration )
120+ . discover ( "@mainsail/snapshot-legacy-exporter" , process . cwd ( ) ) ;
121+
122+ const options = pluginConfig . get < DatabaseOptions > ( "database" ) ;
123+ assert . defined ( options ) ;
126124
127- offset += limit ;
125+ const dataSource = new DataSource ( {
126+ ...options ,
127+ entities : [ ] ,
128+ migrations : [ ] ,
129+ migrationsRun : false ,
130+ synchronize : false ,
131+ } ) ;
128132
129- if ( chunk . length === 0 ) {
130- break ;
131- }
132- }
133+ await dataSource . initialize ( ) ;
133134
134135 try {
135- await this . #writeSnapshot( chainTip , wallets ) ;
136+ await dataSource . transaction ( "REPEATABLE READ" , async ( entityManager ) => {
137+ // Link v3 database
138+ await entityManager . query ( `
139+ CREATE EXTENSION IF NOT EXISTS dblink;
140+ SELECT dblink_connect('v3_db', 'host=${ options . v3 . host } port=${ options . v3 . port } dbname=${ options . v3 . database } user=${ options . v3 . user } password=${ options . v3 . password } ');
141+ ` ) ;
142+
143+ await callback ( entityManager ) ;
144+ } ) ;
145+ } catch ( ex ) {
146+ this . logger . error ( ex ) ;
136147 } finally {
137148 await dataSource . destroy ( ) ;
138149 }
@@ -159,7 +170,7 @@ export class Generator {
159170 const jsonString = JSON . stringify ( snapshot ) ;
160171 const compressedBuffer = await promisify ( brotliCompress ) ( jsonString ) ;
161172 await writeFile ( path , compressedBuffer ) ;
162- console . log ( `Wrote ${ snapshot . wallets . length } wallets to '${ path } '` ) ;
173+ this . logger . info ( `Wrote ${ snapshot . wallets . length } wallets to '${ path } '` ) ;
163174 }
164175}
165176
0 commit comments