-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparquetWorkerClient.ts
More file actions
128 lines (121 loc) · 5.02 KB
/
parquetWorkerClient.ts
File metadata and controls
128 lines (121 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import type { ColumnData } from 'hyparquet'
import ParquetWorker from './parquetWorker?worker&inline'
import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'
/// ^ the worker is bundled with the main thread code (inline) which is easier for users to import
/// (no need to copy the worker file to the right place)
let worker: Worker | undefined
let nextQueryId = 0
interface Agent {
onComplete?: ((rows: Rows) => void)
onChunk?: (chunk: ColumnData) => void
onPage?: (page: ColumnData) => void
reject: (error: Error) => void
parquetReadResolve?: () => void
parquetReadObjectsResolve?: (rows: Rows) => void
parquetQueryResolve?: (rows: Rows) => void
}
const pendingAgents = new Map<number, Agent>()
function getWorker() {
if (!worker) {
worker = new ParquetWorker()
worker.onmessage = ({ data }: { data: WorkerMessage }) => {
const pendingAgent = pendingAgents.get(data.queryId)
if (!pendingAgent) {
console.warn(
`Unexpected: no pending promise found for queryId: ${data.queryId.toString()}`
)
return
}
const { onComplete, onChunk, onPage, reject, parquetReadResolve, parquetReadObjectsResolve, parquetQueryResolve } = pendingAgent
switch (data.kind) {
case 'onComplete':
onComplete?.(data.rows)
break
case 'onChunk':
onChunk?.(data.chunk)
break
case 'onPage':
onPage?.(data.page)
break
default:
switch (data.kind) {
case 'onReject':
if ('error' in data) { // check, just in case
reject(data.error)
}
break
case 'onParquetReadResolve':
parquetReadResolve?.()
break
case 'onParquetReadObjectsResolve':
parquetReadObjectsResolve?.(data.rows)
break
case 'onParquetQueryResolve':
parquetQueryResolve?.(data.rows)
break
}
/* clean up */
pendingAgents.delete(data.queryId)
// TODO(SL): maybe terminate the worker when no pending agents left
}
}
}
return worker
}
/**
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*
* Note that it only supports 'rowFormat: object' (the default).
*/
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions }
worker.postMessage(message)
})
}
/**
* Presents almost the same interface as parquetReadObjects, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*
* Note that it only supports 'rowFormat: object' (the default).
*/
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
const { onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions }
worker.postMessage(message)
})
}
/**
* Presents almost the same interface as parquetQuery, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*
* Note that it only supports 'rowFormat: object' (the default).
*/
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions }
worker.postMessage(message)
})
}