Skip to content

Commit 862641a

Browse files
imaTik0Paweł Aniszewski
andauthored
feat(core): configurable WASM worker thread count via EndpointFactory setup() (#161)
* feat(core): configurable WASM worker thread count via EndpointFactory.setup() AsyncEngine (C++): - Add EM_JS readWorkerCountFromJs() that reads window.__privmxWorkerCount - AsyncEngine() constructor reads the global and uses it as the thread-pool size (defaults to 4 when absent; enforces minimum of 2) EndpointFactory (TypeScript): - setup() now accepts string | EndpointSetupOptions (backwards-compatible; a plain string assetsBasePath is still accepted) - New EndpointSetupOptions interface: { assetsBasePath?, workerCount? } - When workerCount is provided, sets window.__privmxWorkerCount before calling endpointWasmModule() so the C++ constructor reads it during WASM initialisation on the worker thread Tests (E2E): - measureSendMessages() helper: reload page, call setup({ workerCount }), send N messages concurrently, return elapsed ms - "Worker count / EndpointFactory.setup() initialises WASM with the requested worker count": runs at 2/4/8/16 workers, logs timings - measureSignData() helper: same pattern using 200 concurrent signData calls (CPU-bound secp256k1, no network) - "Worker count scales CPU-bound crypto throughput": hard-asserts that 4w < 2w and 8w < 4w to verify the parameter actually controls parallelism - setup({ assetsBasePath }) object-form test: verifies connect + key derivation work end-to-end when using the new options object * fix/post-review-fixes * fix: clang-format fix --------- Co-authored-by: Paweł Aniszewski <paniszewski@simplito.com>
1 parent 599df5e commit 862641a

3 files changed

Lines changed: 221 additions & 3 deletions

File tree

async-engine/src/AsyncEngine.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ limitations under the License.
1111

1212
#include "AsyncEngine.hpp"
1313

14+
static constexpr size_t WORKER_COUNT_MIN = 2;
15+
static constexpr size_t WORKER_COUNT_DEFAULT = 4;
16+
1417
#include <Poco/JSON/Object.h>
1518
#include <Pson/pson.h>
1619
#include <emscripten/eventloop.h>
@@ -46,6 +49,13 @@ namespace privmx {
4649
const value = Emval.toValue(valueHandle);
4750
setTimeout(()=>callback(value), 0);
4851
});
52+
53+
// Reads window.__privmxWorkerCount (set by TypeScript before module init).
54+
// Returns 0 when the global is absent or not a positive integer.
55+
EM_JS(int, readWorkerCountFromJs, (), {
56+
const v = (typeof window !== 'undefined') && window.__privmxWorkerCount;
57+
return (typeof v === 'number' && v > 0) ? (v | 0) : 0;
58+
});
4959
}
5060
}
5161

@@ -63,7 +73,10 @@ AsyncEngine* AsyncEngine::getInstance() {
6373
}
6474

6575
AsyncEngine::AsyncEngine() {
66-
_pool = std::make_unique<WorkerPool>(4);
76+
int requested = readWorkerCountFromJs();
77+
size_t numWorkers =
78+
(static_cast<size_t>(requested) >= WORKER_COUNT_MIN) ? static_cast<size_t>(requested) : WORKER_COUNT_DEFAULT;
79+
_pool = std::make_unique<WorkerPool>(numWorkers);
6780
_taskManagerThread = std::thread([=] { emscripten_runtime_keepalive_push(); });
6881
}
6982

src/service/EndpointFactory.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,55 @@ import { StreamApi } from "./StreamApi";
3434
import { ThreadApi } from "./ThreadApi";
3535
import { setGlobalEmCrypto } from "../crypto/index";
3636

37+
3738
/**
3839
* //doc-gen:ignore
3940
*/
4041
declare function endpointWasmModule(): Promise<any>; // Provided by emscripten js glue code
4142

43+
export interface EndpointSetupOptions {
44+
assetsBasePath?: string;
45+
workerCount?: number;
46+
}
47+
4248
/**
4349
* Contains static factory methods - generators for Connection and APIs.
4450
*/
4551
export class EndpointFactory {
52+
53+
private static readonly WORKER_COUNT_MIN = 2;
54+
4655
private static api: Api;
4756
private static eventQueueInstance: EventQueue;
4857
private static assetsBasePath: string;
4958

5059
/**
5160
* Load the Endpoint's WASM assets and initialize the Endpoint library.
5261
*
53-
* @param {string} [assetsBasePath] base path/url to the Endpoint's WebAssembly assets (like: endpoint-wasm-module.js, driver-web-context.js and others)
62+
* @param {string | EndpointSetupOptions} [options] either a base path string (legacy) or an options object
63+
* @param {string} [options.assetsBasePath] base path/url to the Endpoint's WebAssembly assets
64+
* @param {number} [options.workerCount] number of async-engine worker threads (default: 4, minimum: 2)
5465
*/
55-
public static async setup(assetsBasePath?: string): Promise<void> {
66+
public static async setup(options?: string | EndpointSetupOptions): Promise<void> {
67+
const resolved: EndpointSetupOptions =
68+
typeof options === "object" && options !== null
69+
? options
70+
: { assetsBasePath: options as string | undefined };
71+
const { assetsBasePath, workerCount } = resolved;
72+
5673
const basePath = this.resolveAssetsBasePath(assetsBasePath);
5774
this.assetsBasePath = basePath;
5875

76+
// Must be set before endpointWasmModule() is called — the C++ AsyncEngine
77+
// constructor reads this global during WASM module initialization (on the
78+
// worker thread), before the main thread gets control back.
79+
if (workerCount !== undefined) {
80+
(window as unknown as Record<string, unknown>).__privmxWorkerCount = Math.max(
81+
EndpointFactory.WORKER_COUNT_MIN,
82+
Math.floor(workerCount),
83+
);
84+
}
85+
5986
setGlobalEmCrypto();
6087
const assets = ["endpoint-wasm-module.js"];
6188

tests/specs/core.spec.ts

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,3 +326,181 @@ test.describe("CoreTest: Connection & Contexts", () => {
326326
expect(u2_p2!.isActive).toBe(true); // User 2 IS now connected
327327
});
328328
});
329+
330+
// ---------------------------------------------------------------------------
331+
// EndpointFactory.setup() — object-form regression test
332+
// ---------------------------------------------------------------------------
333+
334+
test.describe("CoreTest: EndpointFactory.setup() object form", () => {
335+
test("setup({ assetsBasePath }) initialises WASM identically to setup(string)", async ({
336+
page,
337+
backend,
338+
cli,
339+
}) => {
340+
await page.goto("/tests/harness/index.html");
341+
await page.waitForFunction(() => window.wasmReady === true, null, { timeout: 10000 });
342+
343+
// Use the object form exclusively — this is the regression path.
344+
await page.evaluate(async () => {
345+
await window.Endpoint.setup({ assetsBasePath: "../../assets" });
346+
});
347+
348+
const user = await setupTestUser(page, cli, [testData.contextId]);
349+
350+
const result = await page.evaluate(
351+
async ({ bridgeUrl, solutionId, privKey }) => {
352+
const connection = await window.Endpoint.connect(privKey, solutionId, bridgeUrl);
353+
const cryptoApi = await window.Endpoint.createCryptoApi();
354+
const pubKey = await cryptoApi.derivePublicKey(privKey);
355+
return { connected: connection !== null, pubKeyDefined: pubKey.length > 0 };
356+
},
357+
{
358+
bridgeUrl: backend.bridgeUrl,
359+
solutionId: testData.solutionId,
360+
privKey: user.privKey,
361+
},
362+
);
363+
364+
expect(result.connected).toBe(true);
365+
expect(result.pubKeyDefined).toBe(true);
366+
});
367+
368+
test("setup({ assetsBasePath, workerCount }) applies the requested worker count", async ({
369+
page,
370+
}) => {
371+
await page.goto("/tests/harness/index.html");
372+
await page.waitForFunction(() => window.wasmReady === true, null, { timeout: 10000 });
373+
374+
await page.evaluate(async () => {
375+
await window.Endpoint.setup({ assetsBasePath: "../../assets", workerCount: 6 });
376+
});
377+
378+
// Give pthreads time to spin up then verify crypto still works.
379+
await page.waitForTimeout(320);
380+
381+
const signed = await page.evaluate(async () => {
382+
const cryptoApi = await window.Endpoint.createCryptoApi();
383+
const privKey = await cryptoApi.generatePrivateKey();
384+
const sig = await cryptoApi.signData(new TextEncoder().encode("test"), privKey);
385+
return sig.length > 0;
386+
});
387+
388+
expect(signed).toBe(true);
389+
});
390+
});
391+
392+
// ---------------------------------------------------------------------------
393+
// Worker-count performance test
394+
// ---------------------------------------------------------------------------
395+
// Measures wall-clock time for Promise.all(100 x sendMessage) at 2, 4 and 8
396+
// worker threads to verify that (a) the workerCount parameter is wired through
397+
// to the WASM engine and (b) more workers reduce time on a CPU-bound workload.
398+
//
399+
// Each step reloads the page so the WASM singleton is re-initialised with the
400+
// desired worker count before any tasks are posted.
401+
// ---------------------------------------------------------------------------
402+
403+
async function measureSendMessages(
404+
page: Page,
405+
cli: CliContext,
406+
bridgeUrl: string,
407+
workerCount: number,
408+
messageCount: number,
409+
): Promise<number> {
410+
// Fresh page load so the WASM module reinitialises with the new worker count.
411+
await page.goto("/tests/harness/index.html");
412+
await page.waitForFunction(() => window.wasmReady === true, null, { timeout: 10000 });
413+
414+
// setup() sets window.__privmxWorkerCount BEFORE calling endpointWasmModule(),
415+
// so the C++ AsyncEngine constructor picks it up on its worker thread.
416+
// This must be a separate evaluate call so it completes before key generation.
417+
await page.evaluate(async (wc: number) => {
418+
await window.Endpoint.setup({ assetsBasePath: "../../assets", workerCount: wc });
419+
}, workerCount);
420+
421+
// Give the browser event loop time to finish allocating all pthreads.
422+
// Emscripten spawns workers asynchronously after module init returns;
423+
// without this pause the first WASM task may arrive before all threads
424+
// are ready, causing a stall or abort on high worker counts.
425+
await page.waitForTimeout(200 + workerCount * 20);
426+
427+
// Key generation in a separate evaluate — Endpoint is now fully initialised.
428+
const userKeys = await page.evaluate(async () => {
429+
const cryptoApi = await window.Endpoint.createCryptoApi();
430+
const privKey = await cryptoApi.generatePrivateKey();
431+
return { privKey, pubKey: await cryptoApi.derivePublicKey(privKey) };
432+
});
433+
434+
const userId = `perf-user-${Date.now()}-${workerCount}w`;
435+
await cli.call("context/addUserToContext", {
436+
contextId: testData.contextId,
437+
userId,
438+
userPubKey: userKeys.pubKey,
439+
});
440+
441+
const args = {
442+
bridgeUrl,
443+
privKey: userKeys.privKey,
444+
userId,
445+
solutionId: testData.solutionId,
446+
contextId: testData.contextId,
447+
messageCount,
448+
};
449+
450+
return page.evaluate(
451+
async ({ bridgeUrl, privKey, userId, solutionId, contextId, messageCount }) => {
452+
const Endpoint = window.Endpoint;
453+
const connection = await Endpoint.connect(privKey, solutionId, bridgeUrl);
454+
const threadApi = await Endpoint.createThreadApi(connection);
455+
const cryptoApi = await Endpoint.createCryptoApi();
456+
457+
const userObj = { userId, pubKey: await cryptoApi.derivePublicKey(privKey) };
458+
const enc = new TextEncoder();
459+
460+
const threadId = await threadApi.createThread(
461+
contextId,
462+
[userObj],
463+
[userObj],
464+
enc.encode("perf-test"),
465+
enc.encode("perf-test"),
466+
);
467+
468+
const payload = enc.encode("x".repeat(256));
469+
470+
const t0 = performance.now();
471+
await Promise.all(
472+
Array.from({ length: messageCount }, () =>
473+
threadApi.sendMessage(threadId, enc.encode(""), enc.encode(""), payload),
474+
),
475+
);
476+
return performance.now() - t0;
477+
},
478+
args,
479+
);
480+
}
481+
482+
test.describe("CoreTest: Worker count", () => {
483+
const MESSAGE_COUNT = 100;
484+
485+
test("EndpointFactory.setup() initialises WASM with the requested worker count", async ({
486+
page,
487+
backend,
488+
cli,
489+
}) => {
490+
const times: Record<string, number> = {};
491+
492+
await test.step("2 workers — baseline", async () => {
493+
times["2w"] = await measureSendMessages(page, cli, backend.bridgeUrl, 2, MESSAGE_COUNT);
494+
});
495+
496+
await test.step("4 workers — default", async () => {
497+
times["4w"] = await measureSendMessages(page, cli, backend.bridgeUrl, 4, MESSAGE_COUNT);
498+
});
499+
500+
// All three runs must complete all messages successfully (no throw = pass).
501+
// We log the timings for manual inspection; we don't assert a specific ordering
502+
// because the bridge/network RTT dominates and may swamp the worker-count effect.
503+
expect(times["2w"]).toBeGreaterThan(0);
504+
expect(times["4w"]).toBeGreaterThan(0);
505+
});
506+
});

0 commit comments

Comments
 (0)