Skip to content

Commit 6f4a5b2

Browse files
committed
Fix SubQL runtime image smoke failures
1 parent 0a690e9 commit 6f4a5b2

23 files changed

Lines changed: 837 additions & 48 deletions

packages/chain-stats-subql/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ RUN set -e \
1919
&& rm -rf /app/dist \
2020
&& ./node_modules/.bin/subql codegen \
2121
&& ./node_modules/.bin/subql build --output dist \
22+
&& mkdir -p /app/dist/data \
23+
&& cp /app/src/data/*.json /app/dist/data/ \
2224
&& for i in 1 2 3 4 5; do [ -d /app/dist ] && break; sleep 1; done \
2325
&& if [ ! -d /app/dist ]; then \
2426
echo "SubQL build did not create /app/dist"; \

packages/chain-stats-subql/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
"private": true,
1010
"scripts": {
1111
"start": "bun run build && ./node_modules/.bin/subql-node -f . --timeout=512 --batch-size=2 --port=3123",
12-
"build": "subql codegen && subql build",
12+
"build": "subql codegen && subql build && mkdir -p dist/data && cp src/data/*.json dist/data/",
1313
"prepack": "rm -rf dist && npm build",
14-
"test": "jest",
14+
"test": "bun test tests/*.test.ts",
1515
"codegen": "./node_modules/.bin/subql codegen"
1616
},
1717
"files": [
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
import './runtimePolyfills';
2+
13
//Exports all handler functions
24
export * from './mappings/mappingHandlers'
Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import { SubstrateBlock } from "@subql/types/dist/interfaces";
22
import { getBlock, getToken } from "../utils/records";
3-
import acalaNative from "../data/acala-1870000-native.json";
4-
import acalaNonNative from "../data/acala-1870000-non-native.json";
5-
import karuraNative from "../data/karura-2650000-native.json";
6-
import karuraNonNative from "../data/karura-2650000-non-native.json";
73
import { updateAccountBalance } from "../utils/updateAccountBalance";
84
import { insertBefore, startAt } from "./mappingHandlers";
5+
import { getInitialBalanceRecords } from "../utils/balanceData";
96

107

118
export async function handleBlock(block: SubstrateBlock) {
@@ -17,24 +14,16 @@ export async function handleBlock(block: SubstrateBlock) {
1714
record.timestamp = block.timestamp;
1815

1916
if (blockNumber <= insertBefore) {
20-
const isAcala = api.registry.chainSS58 === 10
21-
let accountData = []
22-
23-
if (isAcala) {
24-
accountData = [...acalaNative as any, ...acalaNonNative as any]
25-
} else {
26-
accountData = [...karuraNative as any, ...karuraNonNative as any]
27-
}
28-
17+
const network = api.registry.chainSS58 === 10 ? "acala" : "karura";
2918
const round = Number(blockNumber - startAt);
19+
const startIndex = round * 1000;
20+
const accountData = await getInitialBalanceRecords(network, startIndex, 1000);
3021

31-
logger.info(`total ${accountData.length}, start insert ${round * 1000}`);
32-
33-
for (let i = 0; i < 1000; i++) {
34-
const data = accountData[i + round * 1000] as { account: string; token: string; free: string; reserved: string; frozen: string };
22+
logger.info(`start insert ${startIndex}, batch ${accountData.length}`);
3523

36-
if (!data) return;
24+
if (accountData.length === 0) return;
3725

26+
for (const data of accountData) {
3827
const token = await getToken(data.token);
3928

4029
await token.save();
@@ -52,4 +41,4 @@ export async function handleBlock(block: SubstrateBlock) {
5241

5342

5443
await record.save();
55-
}
44+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { TextDecoder, TextEncoder } from "util";
2+
3+
export const installRuntimePolyfills = () => {
4+
const runtime = globalThis as any;
5+
6+
if (typeof runtime.TextEncoder === "undefined") {
7+
runtime.TextEncoder = TextEncoder;
8+
}
9+
10+
if (typeof runtime.TextDecoder === "undefined") {
11+
runtime.TextDecoder = TextDecoder;
12+
}
13+
};
14+
15+
installRuntimePolyfills();
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { createReadStream, existsSync } from "fs";
2+
import { join } from "path";
3+
4+
export type InitialBalanceRecord = {
5+
account: string;
6+
token: string;
7+
free: string;
8+
reserved: string;
9+
frozen: string;
10+
};
11+
12+
type Network = "acala" | "karura";
13+
14+
const DATA_FILES: Record<Network, string[]> = {
15+
acala: ["acala-1870000-native.json", "acala-1870000-non-native.json"],
16+
karura: ["karura-2650000-native.json", "karura-2650000-non-native.json"],
17+
};
18+
19+
const getWorkingDirectory = () => {
20+
const cwd = (globalThis as { process?: { cwd?: unknown } }).process?.cwd;
21+
22+
return typeof cwd === "function" ? cwd() : "/app";
23+
};
24+
25+
const getDataDirCandidates = () => {
26+
const cwd = getWorkingDirectory();
27+
28+
return [
29+
join(cwd, "dist", "data"),
30+
join(cwd, "src", "data"),
31+
join(cwd, "data"),
32+
join(cwd, "packages", "chain-stats-subql", "dist", "data"),
33+
join(cwd, "packages", "chain-stats-subql", "src", "data"),
34+
"/app/dist/data",
35+
"/app/src/data",
36+
];
37+
};
38+
39+
type ReaderState = {
40+
nextIndex: number;
41+
iterator: AsyncIterator<InitialBalanceRecord>;
42+
};
43+
44+
const readers = new Map<Network, ReaderState>();
45+
46+
const getDataFilePath = (fileName: string) => {
47+
for (const directory of getDataDirCandidates()) {
48+
const filePath = join(directory, fileName);
49+
50+
if (existsSync(filePath)) {
51+
return filePath;
52+
}
53+
}
54+
55+
throw new Error(`Unable to locate chain-stats balance data file ${fileName}`);
56+
};
57+
58+
async function* readJsonArrayObjects(filePath: string): AsyncGenerator<InitialBalanceRecord> {
59+
let buffer = "";
60+
let depth = 0;
61+
let inString = false;
62+
let escaped = false;
63+
let capturing = false;
64+
65+
for await (const chunk of createReadStream(filePath, { encoding: "utf8" })) {
66+
for (const char of chunk) {
67+
if (!capturing) {
68+
if (char === "{") {
69+
capturing = true;
70+
depth = 1;
71+
buffer = char;
72+
}
73+
74+
continue;
75+
}
76+
77+
buffer += char;
78+
79+
if (escaped) {
80+
escaped = false;
81+
continue;
82+
}
83+
84+
if (char === "\\") {
85+
escaped = true;
86+
continue;
87+
}
88+
89+
if (char === "\"") {
90+
inString = !inString;
91+
continue;
92+
}
93+
94+
if (inString) {
95+
continue;
96+
}
97+
98+
if (char === "{") {
99+
depth += 1;
100+
} else if (char === "}") {
101+
depth -= 1;
102+
103+
if (depth === 0) {
104+
yield JSON.parse(buffer) as InitialBalanceRecord;
105+
buffer = "";
106+
capturing = false;
107+
}
108+
}
109+
}
110+
}
111+
}
112+
113+
async function* readNetworkRecords(network: Network): AsyncGenerator<InitialBalanceRecord> {
114+
for (const fileName of DATA_FILES[network]) {
115+
yield* readJsonArrayObjects(getDataFilePath(fileName));
116+
}
117+
}
118+
119+
const getReader = (network: Network, startIndex: number) => {
120+
const current = readers.get(network);
121+
122+
if (current && startIndex >= current.nextIndex) {
123+
return current;
124+
}
125+
126+
const next = {
127+
nextIndex: 0,
128+
iterator: readNetworkRecords(network)[Symbol.asyncIterator](),
129+
};
130+
131+
readers.set(network, next);
132+
return next;
133+
};
134+
135+
export const getInitialBalanceRecords = async (
136+
network: Network,
137+
startIndex: number,
138+
limit: number,
139+
) => {
140+
const reader = getReader(network, startIndex);
141+
142+
while (reader.nextIndex < startIndex) {
143+
const skipped = await reader.iterator.next();
144+
145+
if (skipped.done) {
146+
return [];
147+
}
148+
149+
reader.nextIndex += 1;
150+
}
151+
152+
const records: InitialBalanceRecord[] = [];
153+
154+
while (records.length < limit) {
155+
const next = await reader.iterator.next();
156+
157+
if (next.done) {
158+
break;
159+
}
160+
161+
records.push(next.value);
162+
reader.nextIndex += 1;
163+
}
164+
165+
return records;
166+
};
167+
168+
export const resetInitialBalanceReadersForTests = () => {
169+
readers.clear();
170+
};

packages/chain-stats-subql/src/utils/records.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,33 @@
11
import { isSystemAccount, getNativeCurrency, getTokenDecimals, isTokenEqual, getSystemAccountName } from '@acala-network/subql-utils'
2+
import { stringToHex, u8aToHex } from '@polkadot/util'
3+
import { decodeAddress } from '@polkadot/util-crypto'
24
import { Block, Token, Account, AccountBalance, DailyAccountBalance, HourAccountBalance, HourToken, DailyToken } from '../types/models'
35

46
const nativeToken = getNativeCurrency(api as any);
7+
const SYSTEM_ACCOUNT_PREFIX = stringToHex('modl');
8+
9+
const isSystemAccountByPublicKey = (address: string) => {
10+
try {
11+
return u8aToHex(decodeAddress(address, true)).startsWith(SYSTEM_ACCOUNT_PREFIX)
12+
} catch {
13+
return false
14+
}
15+
}
16+
17+
export const isSystemAccountSafe = (address: string) => {
18+
try {
19+
return isSystemAccount(address)
20+
} catch {
21+
const isSystem = isSystemAccountByPublicKey(address)
22+
const logger = (globalThis as any).logger
23+
24+
if (typeof logger?.warn === 'function') {
25+
logger.warn(`Unable to classify account ${address} with checksum validation; fallback system=${isSystem}`)
26+
}
27+
28+
return isSystem
29+
}
30+
}
531

632
export async function getBlock(id: string) {
733
let record = await Block.get(id)
@@ -79,7 +105,7 @@ export async function getAccount(id: string) {
79105
let record = await Account.get(id)
80106

81107
if (!record) {
82-
const isSystem = isSystemAccount(id)
108+
const isSystem = isSystemAccountSafe(id)
83109

84110
record = new Account(id, id)
85111

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { beforeEach, expect, test } from "bun:test";
2+
3+
const records = new Map<string, Map<string, any>>();
4+
5+
const getEntityRecords = (entity: string) => {
6+
let entityRecords = records.get(entity);
7+
8+
if (!entityRecords) {
9+
entityRecords = new Map<string, any>();
10+
records.set(entity, entityRecords);
11+
}
12+
13+
return entityRecords;
14+
};
15+
16+
beforeEach(() => {
17+
records.clear();
18+
19+
(globalThis as any).api = {
20+
consts: {
21+
currencies: {
22+
getNativeCurrencyId: "KAR",
23+
},
24+
},
25+
};
26+
27+
(globalThis as any).store = {
28+
get: async (entity: string, id: string) => getEntityRecords(entity).get(id),
29+
set: async (entity: string, id: string, value: any) => {
30+
getEntityRecords(entity).set(id, { ...value });
31+
},
32+
remove: async (entity: string, id: string) => {
33+
getEntityRecords(entity).delete(id);
34+
},
35+
getByField: async () => [],
36+
getByFields: async () => [],
37+
};
38+
39+
(globalThis as any).logger = {
40+
warn: () => undefined,
41+
};
42+
});
43+
44+
test("does not throw when a Karura snapshot account has an invalid checksum", async () => {
45+
const { getAccount, isSystemAccountSafe } = await import("../src/utils/records");
46+
47+
expect(isSystemAccountSafe("qHz7aFFfQCcVcVVkzeYeNGoaDMivV9jUXLMBqSMLzFYkfmC")).toBe(false);
48+
49+
const account = await getAccount("qHz7aFFfQCcVcVVkzeYeNGoaDMivV9jUXLMBqSMLzFYkfmC");
50+
51+
expect(account.address).toBe("qHz7aFFfQCcVcVVkzeYeNGoaDMivV9jUXLMBqSMLzFYkfmC");
52+
expect(account.mark).toBe("user");
53+
});
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { expect, test } from "bun:test";
2+
import { getInitialBalanceRecords, resetInitialBalanceReadersForTests } from "../src/utils/balanceData";
3+
4+
test("streams only the requested Acala initial-balance batch", async () => {
5+
resetInitialBalanceReadersForTests();
6+
7+
const records = await getInitialBalanceRecords("acala", 0, 3);
8+
9+
expect(records).toHaveLength(3);
10+
expect(records[0].account).toBeString();
11+
expect(records[0].token).toBe("ACA");
12+
expect(records[0].free).toBeString();
13+
});
14+
15+
test("streams Karura batches without loading the full snapshot into the bundle", async () => {
16+
resetInitialBalanceReadersForTests();
17+
18+
const firstBatch = await getInitialBalanceRecords("karura", 0, 2);
19+
const secondBatch = await getInitialBalanceRecords("karura", 2, 2);
20+
21+
expect(firstBatch).toHaveLength(2);
22+
expect(secondBatch).toHaveLength(2);
23+
expect(secondBatch[0].account).not.toBe(firstBatch[0].account);
24+
expect(secondBatch[0].token).toBe("KAR");
25+
});

0 commit comments

Comments
 (0)