Skip to content

Commit b6ae6cb

Browse files
authored
Update squirreling (#374)
1 parent ff2b629 commit b6ae6cb

File tree

2 files changed

+61
-49
lines changed

2 files changed

+61
-49
lines changed

bin/tools/parquetDataSource.js

Lines changed: 60 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { parquetMetadataAsync, parquetReadObjects } from 'hyparquet'
1+
import { parquetReadObjects, parquetSchema } from 'hyparquet'
22
import { whereToParquetFilter } from './parquetFilter.js'
33

44
/**
@@ -16,61 +16,73 @@ import { whereToParquetFilter } from './parquetFilter.js'
1616
*/
1717
export function parquetDataSource(file, metadata, compressors) {
1818
return {
19-
async *scan({ hints, signal }) {
20-
metadata ??= await parquetMetadataAsync(file)
21-
19+
scan({ columns, where, limit, offset, signal }) {
2220
// Convert WHERE AST to hyparquet filter format
23-
const whereFilter = hints?.where && whereToParquetFilter(hints.where)
21+
const whereFilter = where && whereToParquetFilter(where)
2422
/** @type {ParquetQueryFilter | undefined} */
25-
const filter = hints?.where ? whereFilter : undefined
26-
const filterApplied = !filter || whereFilter
27-
28-
// Emit rows by row group
29-
let groupStart = 0
30-
let remainingLimit = hints?.limit ?? Infinity
31-
for (const rowGroup of metadata.row_groups) {
32-
if (signal?.aborted) break
33-
const rowCount = Number(rowGroup.num_rows)
23+
const filter = where ? whereFilter : undefined
24+
const appliedWhere = Boolean(filter && whereFilter)
25+
const appliedLimitOffset = !where || appliedWhere
3426

35-
// Skip row groups by offset if where is fully applied
36-
let safeOffset = 0
37-
let safeLimit = rowCount
38-
if (filterApplied) {
39-
if (hints?.offset !== undefined && groupStart < hints.offset) {
40-
safeOffset = Math.min(rowCount, hints.offset - groupStart)
27+
// Ensure columns exist in metadata if provided
28+
if (columns) {
29+
const schema = parquetSchema(metadata)
30+
for (const col of columns) {
31+
if (!schema.children.some(child => child.element.name === col)) {
32+
throw new Error(`Column "${col}" not found in parquet schema`)
4133
}
42-
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
43-
if (safeLimit <= 0 && safeOffset < rowCount) break
44-
}
45-
for (let i = 0; i < safeOffset; i++) {
46-
// yield empty rows
47-
yield asyncRow({})
48-
}
49-
if (safeOffset === rowCount) {
50-
groupStart += rowCount
51-
continue
5234
}
35+
}
5336

54-
// Read objects from this row group
55-
const data = await parquetReadObjects({
56-
file,
57-
metadata,
58-
rowStart: groupStart + safeOffset,
59-
rowEnd: groupStart + safeOffset + safeLimit,
60-
columns: hints?.columns,
61-
filter,
62-
filterStrict: false,
63-
compressors,
64-
useOffsetIndex: true,
65-
})
37+
return {
38+
rows: (async function* () {
39+
// Emit rows by row group
40+
let groupStart = 0
41+
let remainingLimit = limit ?? Infinity
42+
for (const rowGroup of metadata.row_groups) {
43+
if (signal?.aborted) break
44+
const rowCount = Number(rowGroup.num_rows)
6645

67-
// Yield each row
68-
for (const row of data) {
69-
yield asyncRow(row)
70-
}
46+
// Skip row groups by offset if where is fully applied
47+
let safeOffset = 0
48+
let safeLimit = rowCount
49+
if (appliedLimitOffset) {
50+
if (offset !== undefined && groupStart < offset) {
51+
safeOffset = Math.min(rowCount, offset - groupStart)
52+
}
53+
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
54+
if (safeLimit <= 0 && safeOffset < rowCount) break
55+
}
56+
if (safeOffset === rowCount) {
57+
groupStart += rowCount
58+
continue
59+
}
60+
61+
// Read objects from this row group
62+
// TODO: move to worker
63+
const data = await parquetReadObjects({
64+
file,
65+
metadata,
66+
rowStart: groupStart + safeOffset,
67+
rowEnd: groupStart + safeOffset + safeLimit,
68+
columns,
69+
filter,
70+
filterStrict: false,
71+
compressors,
72+
useOffsetIndex: true,
73+
})
7174

72-
remainingLimit -= data.length
73-
groupStart += rowCount
75+
// Yield each row
76+
for (const row of data) {
77+
yield asyncRow(row)
78+
}
79+
80+
remainingLimit -= data.length
81+
groupStart += rowCount
82+
}
83+
})(),
84+
appliedWhere,
85+
appliedLimitOffset,
7486
}
7587
},
7688
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
"hyparquet": "1.25.0",
6060
"hyparquet-compressors": "1.1.1",
6161
"icebird": "0.3.1",
62-
"squirreling": "0.7.9"
62+
"squirreling": "0.9.1"
6363
},
6464
"devDependencies": {
6565
"@storybook/react-vite": "10.2.10",

0 commit comments

Comments
 (0)