-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparquetWorker.ts
More file actions
69 lines (65 loc) · 2.86 KB
/
parquetWorker.ts
File metadata and controls
69 lines (65 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { ColumnData, parquetQuery } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import { parquetReadColumn } from 'hyparquet/src/read.js'
import { asyncBufferFrom } from '../utils.js'
import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js'
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
self.postMessage({ chunk, queryId })
}
function postResultMessage ({ result, queryId }: ResultMessage) {
self.postMessage({ result, queryId })
}
function postErrorMessage ({ error, queryId }: ErrorMessage) {
self.postMessage({ error, queryId })
}
function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) {
self.postMessage({ columnRanks, queryId })
}
self.onmessage = async ({ data }: { data: ClientMessage }) => {
const { metadata, from, kind, queryId } = data
const file = await asyncBufferFrom(from)
if (kind === 'columnRanks') {
const { column } = data
// return the column ranks in ascending order
// we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of
// the descending order, because the rank is the first, not the last, of the ties. But it's enough for the
// purpose of sorting.
try {
const sortColumn: unknown[] = Array.from(await parquetReadColumn({ file, metadata, columns: [column], compressors }))
const valuesWithIndex = sortColumn.map((value, index) => ({ value, index }))
const sortedValuesWithIndex = valuesWithIndex.sort(({ value: a }, { value: b }) => compare<unknown>(a, b))
const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => {
const { lastValue, lastRank, ranks } = accumulator
const { value, index } = currentValue
if (value === lastValue) {
ranks[index] = lastRank
return { ranks, lastValue, lastRank }
} else {
ranks[index] = rank
return { ranks, lastValue: value, lastRank: rank }
}
}, {
ranks: Array(sortColumn.length).fill(-1) as number[],
lastValue: undefined as unknown,
lastRank: 0,
}).ranks
postColumnRanksMessage({ columnRanks, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
} else {
const { rowStart, rowEnd, columns, orderBy, filter, chunks } = data
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
try {
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, columns, orderBy, filter, compressors, onChunk })
postResultMessage({ result, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
}
}
function compare<T>(a: T, b: T): number {
if (a < b) return -1
if (a > b) return 1
return 1 // TODO: how to handle nulls?
}