Skip to content

Commit 5069508

Browse files
committed
Convert parquet integration to TypeScript
1 parent a0245fd commit 5069508

File tree

5 files changed

+235
-199
lines changed

5 files changed

+235
-199
lines changed
Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,24 @@
11
import { parquetReadObjects, parquetSchema } from 'hyparquet'
2+
import type { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet'
3+
import type { AsyncDataSource, AsyncRow, ScanOptions, SqlPrimitive } from 'squirreling'
24
import { whereToParquetFilter } from './parquetFilter.js'
35

4-
/**
5-
* @import { AsyncBuffer, Compressors, FileMetaData, ParquetQueryFilter } from 'hyparquet'
6-
* @import { AsyncCells, AsyncDataSource, AsyncRow, SqlPrimitive } from 'squirreling'
7-
*/
8-
96
/**
107
* Creates a parquet data source for use with squirreling SQL engine.
11-
*
12-
* @param {AsyncBuffer} file
13-
* @param {FileMetaData} metadata
14-
* @param {Compressors} compressors
15-
* @returns {AsyncDataSource}
168
*/
17-
export function parquetDataSource(file, metadata, compressors) {
9+
export function parquetDataSource(file: AsyncBuffer, metadata: FileMetaData, compressors: Compressors): AsyncDataSource {
1810
const schema = parquetSchema(metadata)
1911
return {
2012
columns: schema.children.map(child => child.element.name),
21-
scan({ columns, where, limit, offset, signal }) {
13+
scan({ columns, where, limit, offset, signal }: ScanOptions) {
2214
// Convert WHERE AST to hyparquet filter format
2315
const whereFilter = where && whereToParquetFilter(where)
24-
/** @type {ParquetQueryFilter | undefined} */
2516
const filter = where ? whereFilter : undefined
2617
const appliedWhere = Boolean(filter && whereFilter)
2718
const appliedLimitOffset = !where || appliedWhere
2819

2920
// Ensure columns exist in metadata if provided
3021
if (columns) {
31-
const schema = parquetSchema(metadata)
3222
for (const col of columns) {
3323
if (!schema.children.some(child => child.element.name === col)) {
3424
throw new Error(`Column "${col}" not found in parquet schema`)
@@ -61,7 +51,6 @@ export function parquetDataSource(file, metadata, compressors) {
6151
}
6252

6353
// Read objects from this row group
64-
// TODO: move to worker
6554
const data = await parquetReadObjects({
6655
file,
6756
metadata,
@@ -90,15 +79,8 @@ export function parquetDataSource(file, metadata, compressors) {
9079
}
9180
}
9281

93-
/**
94-
* Creates an async row accessor that wraps a plain JavaScript object
95-
*
96-
* @param {Record<string, SqlPrimitive>} obj - the plain object
97-
* @returns {AsyncRow} a row accessor interface
98-
*/
99-
function asyncRow(obj) {
100-
/** @type {AsyncCells} */
101-
const cells = {}
82+
function asyncRow(obj: Record<string, SqlPrimitive>): AsyncRow {
83+
const cells: Record<string, () => Promise<SqlPrimitive>> = {}
10284
for (const [key, value] of Object.entries(obj)) {
10385
cells[key] = () => Promise.resolve(value)
10486
}

src/lib/parquet/parquetFilter.js

Lines changed: 0 additions & 175 deletions
This file was deleted.

src/lib/parquet/parquetFilter.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import type { ParquetQueryFilter } from 'hyparquet'
2+
import type { BinaryNode, BinaryOp, ComparisonOp, ExprNode, InValuesNode, SqlPrimitive } from 'squirreling/src/ast.js'
3+
4+
/**
5+
* Converts a WHERE clause AST to hyparquet filter format.
6+
* Returns undefined if the expression cannot be fully converted.
7+
*/
8+
export function whereToParquetFilter(where: ExprNode | undefined): ParquetQueryFilter | undefined {
9+
if (!where) return undefined
10+
return convertExpr(where, false)
11+
}
12+
13+
function convertExpr(node: ExprNode, negate: boolean): ParquetQueryFilter | undefined {
14+
if (node.type === 'unary' && node.op === 'NOT') {
15+
return convertExpr(node.argument, !negate)
16+
}
17+
if (node.type === 'binary') {
18+
return convertBinary(node, negate)
19+
}
20+
if (node.type === 'in valuelist') {
21+
return convertInValues(node, negate)
22+
}
23+
if (node.type === 'cast') {
24+
return convertExpr(node.expr, negate)
25+
}
26+
return undefined
27+
}
28+
29+
function convertBinary({ op, left, right }: BinaryNode, negate: boolean): ParquetQueryFilter | undefined {
30+
if (op === 'AND') {
31+
const leftFilter = convertExpr(left, negate)
32+
const rightFilter = convertExpr(right, negate)
33+
if (!leftFilter || !rightFilter) return
34+
return negate
35+
? { $or: [leftFilter, rightFilter] }
36+
: { $and: [leftFilter, rightFilter] }
37+
}
38+
if (op === 'OR') {
39+
const leftFilter = convertExpr(left, false)
40+
const rightFilter = convertExpr(right, false)
41+
if (!leftFilter || !rightFilter) return
42+
return negate
43+
? { $nor: [leftFilter, rightFilter] }
44+
: { $or: [leftFilter, rightFilter] }
45+
}
46+
47+
if (op === 'LIKE') return
48+
49+
const { column, value, flipped } = extractColumnAndValue(left, right)
50+
if (!column || value === undefined) return
51+
52+
const mongoOp = mapOperator(op, flipped, negate)
53+
if (!mongoOp) return
54+
return { [column]: { [mongoOp]: value } }
55+
}
56+
57+
function extractColumnAndValue(left: ExprNode, right: ExprNode): { column: string | undefined; value: SqlPrimitive | undefined; flipped: boolean } {
58+
if (left.type === 'identifier' && right.type === 'literal') {
59+
return { column: left.name, value: right.value, flipped: false }
60+
}
61+
if (left.type === 'literal' && right.type === 'identifier') {
62+
return { column: right.name, value: left.value, flipped: true }
63+
}
64+
return { column: undefined, value: undefined, flipped: false }
65+
}
66+
67+
export function isBinaryOp(op: string): op is ComparisonOp {
68+
return ['AND', 'OR', 'LIKE', '=', '!=', '<>', '<', '>', '<=', '>='].includes(op)
69+
}
70+
71+
function mapOperator(op: BinaryOp, flipped: boolean, negate: boolean): string | undefined {
72+
if (!isBinaryOp(op)) return
73+
74+
let mappedOp: ComparisonOp = op
75+
if (negate) mappedOp = neg(mappedOp)
76+
if (flipped) mappedOp = flip(mappedOp)
77+
switch (mappedOp) {
78+
case '=': return '$eq'
79+
case '!=': case '<>': return '$ne'
80+
case '<': return '$lt'
81+
case '<=': return '$lte'
82+
case '>': return '$gt'
83+
case '>=': return '$gte'
84+
}
85+
}
86+
87+
function neg(op: ComparisonOp): ComparisonOp {
88+
switch (op) {
89+
case '<': return '>='
90+
case '<=': return '>'
91+
case '>': return '<='
92+
case '>=': return '<'
93+
case '=': return '!='
94+
case '!=': return '='
95+
case '<>': return '='
96+
}
97+
}
98+
99+
function flip(op: ComparisonOp): ComparisonOp {
100+
switch (op) {
101+
case '<': return '>'
102+
case '<=': return '>='
103+
case '>': return '<'
104+
case '>=': return '<='
105+
case '=': return '='
106+
case '!=': return '!='
107+
case '<>': return '='
108+
}
109+
}
110+
111+
function convertInValues(node: InValuesNode, negate: boolean): ParquetQueryFilter | undefined {
112+
if (node.expr.type !== 'identifier') return
113+
114+
const values: SqlPrimitive[] = []
115+
for (const val of node.values) {
116+
if (val.type !== 'literal') return
117+
values.push(val.value)
118+
}
119+
120+
return { [node.expr.name]: { [negate ? '$nin' : '$in']: values } }
121+
}

0 commit comments

Comments
 (0)