Skip to content

Commit d6c7f81

Browse files
committed
WIP: feat(run-pipeline): add support for running pthread modules
1 parent db002b5 commit d6c7f81

8 files changed

Lines changed: 377 additions & 34 deletions

File tree

packages/core/typescript/itk-wasm/src/pipeline/internal/load-emscripten-module-main-thread.ts

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@ import { ZSTDDecoder } from '@thewtex/zstddec'
44

55
import EmscriptenModule from '../itk-wasm-emscripten-module.js'
66
import RunPipelineOptions from '../run-pipeline-options.js'
7+
import pthreadSupportAvailable from '../pthread-support-available.js'
78

89
const decoder = new ZSTDDecoder()
910
let decoderInitialized = false
1011

11-
async function loadEmscriptenModuleMainThread (
12+
async function loadEmscriptenModuleMainThread(
1213
moduleRelativePathOrURL: string | URL,
1314
baseUrl?: string,
14-
queryParams?: RunPipelineOptions['pipelineQueryParams']
15+
queryParams?: RunPipelineOptions['pipelineQueryParams'],
16+
disableThreads?: boolean
1517
): Promise<EmscriptenModule> {
1618
let modulePrefix: string = 'unknown'
1719
if (typeof moduleRelativePathOrURL !== 'string') {
@@ -33,16 +35,105 @@ async function loadEmscriptenModuleMainThread (
3335
if (modulePrefix.endsWith('.wasm.zst')) {
3436
modulePrefix = modulePrefix.substring(0, modulePrefix.length - 9)
3537
}
38+
39+
// Check for pthread support and use the appropriate WASM file
40+
const hasPthreadSupport = pthreadSupportAvailable() && disableThreads !== true
3641
const wasmBinaryPath = `${modulePrefix}.wasm`
42+
43+
if (hasPthreadSupport) {
44+
// Try to load the threaded version first
45+
const threadsWasmPath = `${modulePrefix}.threads.wasm`
46+
try {
47+
const threadsResponse = await axios.get(`${threadsWasmPath}.zst`, {
48+
responseType: 'arraybuffer',
49+
params: queryParams,
50+
timeout: 10000, // 10 second timeout for threaded WASM
51+
validateStatus: (status) => status === 200
52+
})
53+
54+
// Validate the response data
55+
if (
56+
threadsResponse.data == null ||
57+
threadsResponse.data.byteLength === 0
58+
) {
59+
throw new Error('Empty response data for threaded WASM')
60+
}
61+
62+
if (!decoderInitialized) {
63+
await decoder.init()
64+
decoderInitialized = true
65+
}
66+
const decompressedArray = decoder.decode(
67+
new Uint8Array(threadsResponse.data)
68+
)
69+
70+
// Validate the decompressed data
71+
if (decompressedArray == null || decompressedArray.byteLength === 0) {
72+
throw new Error('Failed to decompress threaded WASM data')
73+
}
74+
75+
// Validate WASM magic bytes (0x00, 0x61, 0x73, 0x6d)
76+
if (
77+
decompressedArray.byteLength < 4 ||
78+
decompressedArray[0] !== 0x00 ||
79+
decompressedArray[1] !== 0x61 ||
80+
decompressedArray[2] !== 0x73 ||
81+
decompressedArray[3] !== 0x6d
82+
) {
83+
throw new Error('Invalid WASM magic bytes in threaded WASM data')
84+
}
85+
86+
const wasmBinary = decompressedArray.buffer
87+
const fullModulePath = `${modulePrefix}.js`
88+
const result = await import(
89+
/* webpackIgnore: true */ /* @vite-ignore */ fullModulePath
90+
)
91+
const instantiated = result.default({ wasmBinary }) as EmscriptenModule
92+
return instantiated
93+
} catch (error) {
94+
// Fall back to non-threaded version if threaded version is not available
95+
const errorMessage =
96+
error instanceof Error ? error.message : String(error)
97+
console.warn(
98+
`Threaded WASM not available for ${modulePrefix}, falling back to non-threaded version:`,
99+
errorMessage
100+
)
101+
}
102+
}
103+
104+
// Load non-threaded version
37105
const response = await axios.get(`${wasmBinaryPath}.zst`, {
38106
responseType: 'arraybuffer',
39107
params: queryParams
40108
})
109+
110+
// Validate the response data
111+
if (response.data == null || response.data.byteLength === 0) {
112+
throw new Error('Empty response data for non-threaded WASM')
113+
}
114+
41115
if (!decoderInitialized) {
42116
await decoder.init()
43117
decoderInitialized = true
44118
}
45119
const decompressedArray = decoder.decode(new Uint8Array(response.data))
120+
121+
// Validate the decompressed data
122+
if (decompressedArray == null || decompressedArray.byteLength === 0) {
123+
throw new Error('Failed to decompress non-threaded WASM data')
124+
}
125+
126+
// Validate WASM magic bytes (0x00, 0x61, 0x73, 0x6d)
127+
if (
128+
decompressedArray.byteLength < 4 ||
129+
decompressedArray[0] !== 0x00 ||
130+
decompressedArray[1] !== 0x61 ||
131+
decompressedArray[2] !== 0x73 ||
132+
decompressedArray[3] !== 0x6d
133+
) {
134+
throw new Error('Invalid WASM magic bytes in non-threaded WASM data')
135+
}
136+
46137
const wasmBinary = decompressedArray.buffer
47138
const fullModulePath = `${modulePrefix}.js`
48139
const result = await import(

packages/core/typescript/itk-wasm/src/pipeline/internal/load-emscripten-module-web-worker.ts

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { ZSTDDecoder } from '@thewtex/zstddec'
44

55
import ITKWasmEmscriptenModule from '../itk-wasm-emscripten-module.js'
66
import RunPipelineOptions from '../run-pipeline-options.js'
7+
import pthreadSupportAvailable from '../pthread-support-available.js'
78

89
const decoder = new ZSTDDecoder()
910
let decoderInitialized = false
@@ -12,10 +13,11 @@ let decoderInitialized = false
1213
//
1314
// baseUrl is usually taken from 'getPipelinesBaseUrl()', but a different value
1415
// could be passed.
15-
async function loadEmscriptenModuleWebWorker (
16+
async function loadEmscriptenModuleWebWorker(
1617
moduleRelativePathOrURL: string | URL,
1718
baseUrl: string,
18-
queryParams?: RunPipelineOptions['pipelineQueryParams']
19+
queryParams?: RunPipelineOptions['pipelineQueryParams'],
20+
disableThreads?: boolean
1921
): Promise<ITKWasmEmscriptenModule> {
2022
let modulePrefix = null
2123
if (typeof moduleRelativePathOrURL !== 'string') {
@@ -34,7 +36,75 @@ async function loadEmscriptenModuleWebWorker (
3436
if (modulePrefix.endsWith('.wasm.zst')) {
3537
modulePrefix = modulePrefix.substring(0, modulePrefix.length - 9)
3638
}
39+
40+
// Check for pthread support and use the appropriate WASM file
41+
const hasPthreadSupport = pthreadSupportAvailable() && disableThreads !== true
3742
const wasmBinaryPath = `${modulePrefix}.wasm`
43+
44+
if (hasPthreadSupport) {
45+
// Try to load the threaded version first
46+
const threadsWasmPath = `${modulePrefix}.threads.wasm`
47+
try {
48+
const threadsResponse = await axios.get(`${threadsWasmPath}.zst`, {
49+
responseType: 'arraybuffer',
50+
params: queryParams,
51+
timeout: 10000, // 10 second timeout for threaded WASM
52+
validateStatus: (status) => status === 200
53+
})
54+
55+
// Validate the response data
56+
if (
57+
threadsResponse.data == null ||
58+
threadsResponse.data.byteLength === 0
59+
) {
60+
throw new Error('Empty response data for threaded WASM')
61+
}
62+
63+
if (!decoderInitialized) {
64+
await decoder.init()
65+
decoderInitialized = true
66+
}
67+
const decompressedArray = decoder.decode(
68+
new Uint8Array(threadsResponse.data)
69+
)
70+
71+
// Validate the decompressed data
72+
if (decompressedArray == null || decompressedArray.byteLength === 0) {
73+
throw new Error('Failed to decompress threaded WASM data')
74+
}
75+
76+
// Validate WASM magic bytes (0x00, 0x61, 0x73, 0x6d)
77+
if (
78+
decompressedArray.byteLength < 4 ||
79+
decompressedArray[0] !== 0x00 ||
80+
decompressedArray[1] !== 0x61 ||
81+
decompressedArray[2] !== 0x73 ||
82+
decompressedArray[3] !== 0x6d
83+
) {
84+
throw new Error('Invalid WASM magic bytes in threaded WASM data')
85+
}
86+
87+
const wasmBinary = decompressedArray.buffer
88+
const modulePath = `${modulePrefix}.js`
89+
const result = await import(
90+
/* webpackIgnore: true */ /* @vite-ignore */ modulePath
91+
)
92+
const emscriptenModule = result.default({
93+
wasmBinary
94+
}) as ITKWasmEmscriptenModule
95+
return emscriptenModule
96+
} catch (error) {
97+
// Fall back to non-threaded version if threaded version is not available
98+
const errorMessage =
99+
error instanceof Error ? error.message : String(error)
100+
console.warn(
101+
`Threaded WASM not available for ${modulePrefix}, falling back to non-threaded version:`,
102+
errorMessage
103+
)
104+
}
105+
}
106+
107+
// Load non-threaded version
38108
const response = await axios.get(`${wasmBinaryPath}.zst`, {
39109
responseType: 'arraybuffer',
40110
params: queryParams
@@ -44,6 +114,23 @@ async function loadEmscriptenModuleWebWorker (
44114
decoderInitialized = true
45115
}
46116
const decompressedArray = decoder.decode(new Uint8Array(response.data))
117+
118+
// Validate the decompressed data
119+
if (decompressedArray == null || decompressedArray.byteLength === 0) {
120+
throw new Error('Failed to decompress standard WASM data')
121+
}
122+
123+
// Validate WASM magic bytes (0x00, 0x61, 0x73, 0x6d)
124+
if (
125+
decompressedArray.byteLength < 4 ||
126+
decompressedArray[0] !== 0x00 ||
127+
decompressedArray[1] !== 0x61 ||
128+
decompressedArray[2] !== 0x73 ||
129+
decompressedArray[3] !== 0x6d
130+
) {
131+
throw new Error('Invalid WASM magic bytes in standard WASM data')
132+
}
133+
47134
const wasmBinary = decompressedArray.buffer
48135
const modulePath = `${modulePrefix}.js`
49136
const result = await import(

packages/core/typescript/itk-wasm/src/pipeline/run-pipeline.ts

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,31 @@ function defaultPipelinesQueryParams (): RunPipelineOptions['pipelineQueryParams
5555
return result
5656
}
5757

58+
function processThreadsArgs (args: string[]): {
59+
disableThreads: boolean
60+
filteredArgs: string[]
61+
} {
62+
const filteredArgs: string[] = []
63+
let disableThreads = false
64+
65+
for (let i = 0; i < args.length; i++) {
66+
if (args[i] === '--threads' && i + 1 < args.length && args[i + 1] === '0') {
67+
disableThreads = true
68+
// Skip both '--threads' and '0'
69+
i++ // Skip the next element ('0')
70+
} else {
71+
filteredArgs.push(args[i])
72+
}
73+
}
74+
75+
return { disableThreads, filteredArgs }
76+
}
77+
5878
async function loadPipelineModule (
5979
pipelinePath: string | URL,
6080
pipelineBaseUrl?: string | URL,
61-
pipelineQueryParams?: RunPipelineOptions['pipelineQueryParams']
81+
pipelineQueryParams?: RunPipelineOptions['pipelineQueryParams'],
82+
disableThreads?: boolean
6283
): Promise<PipelineEmscriptenModule> {
6384
let moduleRelativePathOrURL: string | URL = pipelinePath as string
6485
let pipeline = pipelinePath as string
@@ -72,7 +93,8 @@ async function loadPipelineModule (
7293
const pipelineModule = (await loadEmscriptenModuleMainThread(
7394
pipelinePath,
7495
pipelineBaseUrl?.toString() ?? defaultPipelinesBaseUrl(),
75-
pipelineQueryParams ?? defaultPipelinesQueryParams()
96+
pipelineQueryParams ?? defaultPipelinesQueryParams(),
97+
disableThreads
7698
)) as PipelineEmscriptenModule
7799
pipelineToModule.set(pipeline, pipelineModule)
78100
return pipelineModule
@@ -86,25 +108,39 @@ async function runPipeline (
86108
inputs: PipelineInput[] | null,
87109
options?: RunPipelineOptions
88110
): Promise<RunPipelineResult> {
89-
if (!await simd()) {
90-
const simdErrorMessage = 'WebAssembly SIMD support is required -- please update your browser.'
111+
if (!(await simd())) {
112+
const simdErrorMessage =
113+
'WebAssembly SIMD support is required -- please update your browser.'
91114
alert(simdErrorMessage)
92115
throw new Error(simdErrorMessage)
93116
}
117+
118+
const { disableThreads, filteredArgs } = processThreadsArgs(args)
94119
const webWorker = options?.webWorker ?? null
95120

96121
if (webWorker === false) {
97122
const pipelineModule = await loadPipelineModule(
98123
pipelinePath.toString(),
99124
options?.pipelineBaseUrl,
100-
options?.pipelineQueryParams ?? defaultPipelinesQueryParams()
125+
options?.pipelineQueryParams ?? defaultPipelinesQueryParams(),
126+
disableThreads
127+
)
128+
const result = runPipelineEmscripten(
129+
pipelineModule,
130+
filteredArgs,
131+
outputs,
132+
inputs
101133
)
102-
const result = runPipelineEmscripten(pipelineModule, args, outputs, inputs)
103134
return result
104135
}
105136
let worker = webWorker
106-
const pipelineWorkerUrl = options?.pipelineWorkerUrl ?? defaultPipelineWorkerUrl()
107-
const pipelineWorkerUrlString = typeof pipelineWorkerUrl !== 'string' && typeof pipelineWorkerUrl?.href !== 'undefined' ? pipelineWorkerUrl.href : pipelineWorkerUrl
137+
const pipelineWorkerUrl =
138+
options?.pipelineWorkerUrl ?? defaultPipelineWorkerUrl()
139+
const pipelineWorkerUrlString =
140+
typeof pipelineWorkerUrl !== 'string' &&
141+
typeof pipelineWorkerUrl?.href !== 'undefined'
142+
? pipelineWorkerUrl.href
143+
: pipelineWorkerUrl
108144
const { workerProxy, worker: usedWorker } = await createWorkerProxy(
109145
worker as Worker | null,
110146
pipelineWorkerUrlString as string | undefined | null,
@@ -140,12 +176,22 @@ async function runPipeline (
140176
})
141177
}
142178
const pipelineBaseUrl = options?.pipelineBaseUrl ?? defaultPipelinesBaseUrl()
143-
const pipelineBaseUrlString = typeof pipelineBaseUrl !== 'string' && typeof pipelineBaseUrl?.href !== 'undefined' ? pipelineBaseUrl.href : pipelineBaseUrl
144-
const transferedInputs = (inputs != null) ? Comlink.transfer(inputs, getTransferables(transferables, options?.noCopy)) : null
179+
const pipelineBaseUrlString =
180+
typeof pipelineBaseUrl !== 'string' &&
181+
typeof pipelineBaseUrl?.href !== 'undefined'
182+
? pipelineBaseUrl.href
183+
: pipelineBaseUrl
184+
const transferedInputs =
185+
inputs != null
186+
? Comlink.transfer(
187+
inputs,
188+
getTransferables(transferables, options?.noCopy)
189+
)
190+
: null
145191
const result: RunPipelineWorkerResult = await workerProxy.runPipeline(
146192
pipelinePath.toString(),
147193
pipelineBaseUrlString as string,
148-
args,
194+
filteredArgs,
149195
outputs,
150196
transferedInputs,
151197
options?.pipelineQueryParams ?? defaultPipelinesQueryParams()

0 commit comments

Comments
 (0)