Skip to content

Commit fd083e9

Browse files
committed
Virtual row groups of size 1000
1 parent 72dd899 commit fd083e9

2 files changed

Lines changed: 37 additions & 31 deletions

File tree

eslint.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export default typescript.config(
4848
eqeqeq: 'error',
4949
'func-style': ['error', 'declaration'],
5050
indent: ['error', 2, { SwitchCase: 1 }],
51+
'key-spacing': 'error',
5152
'no-constant-condition': 'off',
5253
'no-extra-parens': 'error',
5354
'no-multi-spaces': 'error',

src/lib/tableProvider.ts

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'desce
2828
})
2929
}
3030

31+
interface VirtualRowGroup {
32+
groupStart: number
33+
groupEnd: number
34+
fetching: boolean
35+
}
36+
3137
/**
3238
* Convert a parquet file into a dataframe.
3339
*/
@@ -37,33 +43,41 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
3743
const sortCache = new Map<string, Promise<number[]>>()
3844
const columnRanksCache = new Map<string, Promise<number[]>>()
3945
const data = new Array<ResolvableRow | undefined>(Number(metadata.num_rows))
40-
const groups = new Array(metadata.row_groups.length).fill(false)
46+
47+
// virtual row groups are up to 1000 rows within row group boundaries
48+
const groups: VirtualRowGroup[] = []
4149
let groupStart = 0
42-
const groupEnds = metadata.row_groups.map(group => groupStart += Number(group.num_rows))
50+
for (const rg of metadata.row_groups) {
51+
// make virtual row groups of size 1000
52+
for (let j = 0; j < rg.num_rows; j += 1000) {
53+
const groupSize = Math.min(1000, Number(rg.num_rows) - j)
54+
const groupEnd = groupStart + groupSize
55+
groups.push({ groupStart, groupEnd, fetching: false })
56+
groupStart = groupEnd
57+
}
58+
}
4359

4460
function fetchRowGroup(groupIndex: number) {
45-
if (!groups[groupIndex]) {
46-
const rowStart = groupEnds[groupIndex - 1] ?? 0
47-
const rowEnd = groupEnds[groupIndex]
48-
if (rowEnd === undefined) {
49-
throw new Error(`Missing groupEnd for groupIndex: ${groupIndex}`)
50-
}
61+
const group = groups[groupIndex]
62+
if (group && !group.fetching) {
63+
group.fetching = true
64+
const { groupStart, groupEnd } = group
5165
// Initialize with resolvable promises
52-
for (let i = rowStart; i < rowEnd; i++) {
66+
for (let i = groupStart; i < groupEnd; i++) {
5367
data[i] = resolvableRow(header)
5468
data[i]?.index.resolve(i)
5569
}
56-
parquetQueryWorker({ from, metadata, rowStart, rowEnd })
57-
.then((groupData) => {
58-
for (let i = rowStart; i < rowEnd; i++) {
70+
parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd })
71+
.then(groupData => {
72+
for (let i = groupStart; i < groupEnd; i++) {
5973
const dataRow = data[i]
6074
if (dataRow === undefined) {
6175
throw new Error(`Missing data row for index ${i}`)
6276
}
63-
const j = i - rowStart
77+
const j = i - groupStart
6478
const row = groupData[j]
6579
if (row === undefined) {
66-
throw new Error(`Missing row in groupData for index: ${i - rowStart}`)
80+
throw new Error(`Missing row in groupData for index: ${i - groupStart}`)
6781
}
6882
for (const [key, value] of Object.entries(row)) {
6983
const cell = dataRow.cells[key]
@@ -75,13 +89,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
7589
}
7690
})
7791
.catch((error: unknown) => {
78-
const prefix = `Error fetching row group ${groupIndex} (${rowStart}-${rowEnd}).`
92+
const prefix = `Error fetching row group ${groupIndex} (${groupStart}-${groupEnd}).`
7993
console.error(prefix, error)
8094
const reason = `${prefix} ${error}`
8195
// reject the index of the first row (it's enough to trigger the error bar)
82-
data[rowStart]?.index.reject(reason)
96+
data[groupStart]?.index.reject(reason)
8397
})
84-
groups[groupIndex] = true
8598
}
8699
}
87100

@@ -110,15 +123,15 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
110123
return {
111124
header,
112125
numRows: Number(metadata.num_rows),
113-
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) {
126+
rows({ start, end, orderBy }) {
114127
if (orderBy?.length) {
115128
const numRows = end - start
116129
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))
117130

118131
getSortIndex(orderBy).then(indices => {
119132
// Compute row groups to fetch
120133
for (const index of indices.slice(start, end)) {
121-
const groupIndex = groupEnds.findIndex(end => index < end)
134+
const groupIndex = groups.findIndex(({ groupEnd }) => index < groupEnd)
122135
fetchRowGroup(groupIndex)
123136
}
124137

@@ -158,24 +171,16 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
158171
}
159172
}
160173
}).catch((error: unknown) => {
161-
console.error(
162-
'Error fetching sort index or resolving sorted rows',
163-
error
164-
)
174+
console.error('Error fetching sort index or resolving sorted rows', error)
165175
})
166176

167177
return wrapped
168178
} else {
169-
for (let i = 0; i < groups.length; i++) {
170-
const groupStart = groupEnds[i - 1] ?? 0
171-
const groupEnd = groupEnds[i]
172-
if (groupEnd === undefined) {
173-
throw new Error(`Missing group end at index ${i}`)
174-
}
175-
if (start < groupEnd && end > groupStart) {
179+
groups.forEach(({ groupStart, groupEnd }, i) => {
180+
if (groupStart < end && groupEnd > start) {
176181
fetchRowGroup(i)
177182
}
178-
}
183+
})
179184
const wrapped = data.slice(start, end)
180185
if (wrapped.some(row => row === undefined)) {
181186
throw new Error('Row not fetched')

0 commit comments

Comments
 (0)