-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparquetDataSource.js
More file actions
104 lines (95 loc) · 3.31 KB
/
parquetDataSource.js
File metadata and controls
104 lines (95 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import { parquetReadObjects, parquetSchema } from 'hyparquet'
import { whereToParquetFilter } from './parquetFilter.js'
/**
* @import { AsyncBuffer, Compressors, FileMetaData, ParquetQueryFilter } from 'hyparquet'
* @import { AsyncCells, AsyncDataSource, AsyncRow, SqlPrimitive } from 'squirreling'
*/
/**
* Creates a parquet data source for use with squirreling SQL engine.
*
* @param {AsyncBuffer} file
* @param {FileMetaData} metadata
* @param {Compressors} compressors
* @returns {AsyncDataSource}
*/
export function parquetDataSource(file, metadata, compressors) {
return {
scan({ columns, where, limit, offset, signal }) {
// Convert WHERE AST to hyparquet filter format
const whereFilter = where && whereToParquetFilter(where)
/** @type {ParquetQueryFilter | undefined} */
const filter = where ? whereFilter : undefined
const appliedWhere = Boolean(filter && whereFilter)
const appliedLimitOffset = !where || appliedWhere
// Ensure columns exist in metadata if provided
if (columns) {
const schema = parquetSchema(metadata)
for (const col of columns) {
if (!schema.children.some(child => child.element.name === col)) {
throw new Error(`Column "${col}" not found in parquet schema`)
}
}
}
return {
rows: (async function* () {
// Emit rows by row group
let groupStart = 0
let remainingLimit = limit ?? Infinity
for (const rowGroup of metadata.row_groups) {
if (signal?.aborted) break
const rowCount = Number(rowGroup.num_rows)
// Skip row groups by offset if where is fully applied
let safeOffset = 0
let safeLimit = rowCount
if (appliedLimitOffset) {
if (offset !== undefined && groupStart < offset) {
safeOffset = Math.min(rowCount, offset - groupStart)
}
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
if (safeLimit <= 0 && safeOffset < rowCount) break
}
if (safeOffset === rowCount) {
groupStart += rowCount
continue
}
// Read objects from this row group
// TODO: move to worker
const data = await parquetReadObjects({
file,
metadata,
rowStart: groupStart + safeOffset,
rowEnd: groupStart + safeOffset + safeLimit,
columns,
filter,
filterStrict: false,
compressors,
useOffsetIndex: true,
})
// Yield each row
for (const row of data) {
yield asyncRow(row)
}
remainingLimit -= data.length
groupStart += rowCount
}
})(),
appliedWhere,
appliedLimitOffset,
}
},
}
}
/**
* Creates an async row accessor that wraps a plain JavaScript object
*
* @param {Record<string, SqlPrimitive>} obj - the plain object
* @returns {AsyncRow} a row accessor interface
*/
function asyncRow(obj) {
/** @type {AsyncCells} */
const cells = {}
for (const [key, value] of Object.entries(obj)) {
cells[key] = () => Promise.resolve(value)
}
return { columns: Object.keys(obj), cells }
}