-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparquetWorkerClient.ts
More file actions
122 lines (115 loc) · 4.82 KB
/
parquetWorkerClient.ts
File metadata and controls
122 lines (115 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import type { ColumnData } from 'hyparquet'
import ParquetWorker from './parquetWorker?worker&inline'
import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'
/// ^ the worker is bundled with the main thread code (inline) which is easier for users to import
/// (no need to copy the worker file to the right place)
let worker: Worker | undefined
let nextQueryId = 0
interface Agent {
onComplete?: (rows: Rows) => void
onChunk?: (chunk: ColumnData) => void
onPage?: (page: ColumnData) => void
reject: (error: Error) => void
parquetReadResolve?: () => void
parquetReadObjectsResolve?: (rows: Rows) => void
parquetQueryResolve?: (rows: Rows) => void
}
const pendingAgents = new Map<number, Agent>()
function getWorker() {
if (!worker) {
worker = new ParquetWorker()
worker.onmessage = ({ data }: { data: WorkerMessage }) => {
const pendingAgent = pendingAgents.get(data.queryId)
if (!pendingAgent) {
console.warn(
`Unexpected: no pending promise found for queryId: ${data.queryId.toString()}`
)
return
}
const { onComplete, onChunk, onPage, reject, parquetReadResolve, parquetReadObjectsResolve, parquetQueryResolve } = pendingAgent
switch (data.kind) {
case 'onComplete':
onComplete?.(data.rows)
break
case 'onChunk':
onChunk?.(data.chunk)
break
case 'onPage':
onPage?.(data.page)
break
default:
switch (data.kind) {
case 'onReject':
if ('error' in data) { // check, just in case
reject(data.error)
}
break
case 'onParquetReadResolve':
parquetReadResolve?.()
break
case 'onParquetReadObjectsResolve':
parquetReadObjectsResolve?.(data.rows)
break
case 'onParquetQueryResolve':
parquetQueryResolve?.(data.rows)
break
}
/* clean up */
pendingAgents.delete(data.queryId)
// TODO(SL): maybe terminate the worker when no pending agents left
}
}
}
return worker
}
/**
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*/
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions }
worker.postMessage(message)
})
}
/**
* Presents almost the same interface as parquetReadObjects, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*/
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
const { onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions }
worker.postMessage(message)
})
}
/**
* Presents almost the same interface as parquetQuery, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*/
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions }
worker.postMessage(message)
})
}