-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparquetDataSource.js
More file actions
47 lines (45 loc) · 1.45 KB
/
parquetDataSource.js
File metadata and controls
47 lines (45 loc) · 1.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import { parquetPlan } from 'hyparquet/src/plan.js'
import { asyncGroupToRows, readRowGroup } from 'hyparquet/src/rowgroup.js'
import { whereToParquetFilter } from './parquetFilter.js'
/**
* @import { AsyncBuffer, FileMetaData } from 'hyparquet'
* @import { AsyncDataSource, AsyncRow } from 'squirreling'
*/
/**
* Creates a parquet data source for use with squirreling SQL engine.
*
* @param {AsyncBuffer} file
* @param {FileMetaData} metadata
* @param {import('hyparquet-compressors').Compressors} compressors
* @returns {AsyncDataSource}
*/
export function parquetDataSource(file, metadata, compressors) {
return {
async *getRows(hints) {
const options = {
file,
metadata,
compressors,
columns: hints?.columns,
filter: whereToParquetFilter(hints?.where),
}
const plan = parquetPlan(options)
let count = 0
for (const subplan of plan.groups) {
const rg = readRowGroup(options, plan, subplan)
const rows = await asyncGroupToRows(rg, 0, rg.groupRows, undefined, 'object')
for (const asyncRow of rows) {
/** @type {AsyncRow} */
const row = {}
for (const [key, value] of Object.entries(asyncRow)) {
row[key] = () => Promise.resolve(value)
}
yield row
count++
// Check limit after each row
if (hints?.limit !== undefined && count >= hints.limit) return
}
}
},
}
}