diff --git a/packages/emnapi/src/core/async-work.ts b/packages/emnapi/src/core/async-work.ts index 3e76f0dd..e2ace373 100644 --- a/packages/emnapi/src/core/async-work.ts +++ b/packages/emnapi/src/core/async-work.ts @@ -57,7 +57,7 @@ var emnapiAWMT = { if (typeof PThread !== 'undefined') { PThread.unusedWorkers.forEach(emnapiAWMT.addListener) - PThread.runningWorkers.forEach(emnapiAWMT.addListener) + Object.values(PThread.pthreads).forEach(emnapiAWMT.addListener) const __original_getNewWorker = PThread.getNewWorker PThread.getNewWorker = function () { const r = __original_getNewWorker.apply(this, arguments as any) diff --git a/packages/emnapi/src/emscripten/async.ts b/packages/emnapi/src/emscripten/async.ts index e896db32..cd89d00c 100644 --- a/packages/emnapi/src/emscripten/async.ts +++ b/packages/emnapi/src/emscripten/async.ts @@ -34,7 +34,7 @@ export function _emnapi_worker_unref (pid: number): void { * @__postset * ``` * PThread.unusedWorkers.forEach(emnapiAddSendListener); - * PThread.runningWorkers.forEach(emnapiAddSendListener); + * Object.values(PThread.pthreads).forEach(emnapiAddSendListener); * (function () { * var __original_getNewWorker = PThread.getNewWorker; * PThread.getNewWorker = function () { diff --git a/packages/emnapi/src/threadsafe-function.ts b/packages/emnapi/src/threadsafe-function.ts index a8de457f..ee0effb5 100644 --- a/packages/emnapi/src/threadsafe-function.ts +++ b/packages/emnapi/src/threadsafe-function.ts @@ -119,7 +119,7 @@ const emnapiTSFN = { emnapiTSFN.offset.mutex = emnapiTSFN.offset.mutex + 4 if (typeof PThread !== 'undefined') { PThread.unusedWorkers.forEach(emnapiTSFN.addListener) - PThread.runningWorkers.forEach(emnapiTSFN.addListener) + Object.values(PThread.pthreads).forEach(emnapiTSFN.addListener) const __original_getNewWorker = PThread.getNewWorker PThread.getNewWorker = function () { const r = __original_getNewWorker.apply(this, arguments as any) diff --git a/packages/wasi-threads/src/thread-manager.ts b/packages/wasi-threads/src/thread-manager.ts index 569fc115..81f6c14f 100644 --- a/packages/wasi-threads/src/thread-manager.ts +++ b/packages/wasi-threads/src/thread-manager.ts @@ -96,7 +96,6 @@ let nextWorkerID = 0 /** @public */ export class ThreadManager { public unusedWorkers: WorkerLike[] = [] - public runningWorkers: WorkerLike[] = [] public pthreads: Record = Object.create(null) public get nextWorkerID (): number { return nextWorkerID } @@ -218,7 +217,6 @@ export class ThreadManager { delete this.pthreads[tid] } this.unusedWorkers.push(worker) - this.runningWorkers.splice(this.runningWorkers.indexOf(worker), 1) delete worker.__emnapi_tid if (ENVIRONMENT_IS_NODE) { (worker as NodeWorker).unref() @@ -351,10 +349,6 @@ export class ThreadManager { this.returnWorkerToPool(worker) } else { delete this.pthreads[tid] - const index = this.runningWorkers.indexOf(worker) - if (index !== -1) { - this.runningWorkers.splice(index, 1) - } this.terminateWorker(worker) delete worker.__emnapi_tid } @@ -375,14 +369,14 @@ export class ThreadManager { } public terminateAllThreads (): void { - for (let i = 0; i < this.runningWorkers.length; ++i) { - this.terminateWorker(this.runningWorkers[i]) + const runningWorkers = Object.values(this.pthreads) + for (let i = 0; i < runningWorkers.length; ++i) { + this.terminateWorker(runningWorkers[i]) } for (let i = 0; i < this.unusedWorkers.length; ++i) { this.terminateWorker(this.unusedWorkers[i]) } this.unusedWorkers = [] - this.runningWorkers = [] this.pthreads = Object.create(null) this.preparePool() diff --git a/packages/wasi-threads/src/wasi-threads.ts b/packages/wasi-threads/src/wasi-threads.ts index 9ec3e9e7..0478d949 100644 --- a/packages/wasi-threads/src/wasi-threads.ts +++ b/packages/wasi-threads/src/wasi-threads.ts @@ -173,7 +173,7 @@ export class WASIThreads { } let worker: WorkerLike | undefined - let tid: number + let tid: number | undefined const PThread = this.PThread try { worker = PThread!.getNewWorker(sab) @@ -194,9 +194,6 @@ export class WASIThreads { if (typeof waitThreadStart === 'number') { const waitResult = Atomics.wait(sab!, 0, 0, waitThreadStart) if (waitResult === 'timed-out') { - try { - PThread!.cleanThread(worker, tid, true) - } catch (_) {} throw new Error('Spawning thread timed out. Please check if the worker is created successfully and if message is handled properly in the worker.') } } else { @@ -204,13 +201,15 @@ export class WASIThreads { } const r = Atomics.load(sab!, 0) if (r > 1) { - try { - PThread!.cleanThread(worker, tid, true) - } catch (_) {} throw deserizeErrorFromBuffer(sab!.buffer as SharedArrayBuffer)! } } } catch (e) { + if (worker !== undefined && tid !== undefined) { + try { + PThread!.cleanThread(worker, tid, true) + } catch (_) {} + } Atomics.store(struct, 0, 1) Atomics.store(struct, 1, EAGAIN) Atomics.notify(struct, 1) @@ -227,7 +226,6 @@ export class WASIThreads { Atomics.store(struct, 1, tid) Atomics.notify(struct, 1) - PThread!.runningWorkers.push(worker) if (!shouldWait) { worker.whenLoaded!.catch((err: any) => { delete worker.whenLoaded