Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/emnapi/src/core/async-work.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var emnapiAWMT = {

if (typeof PThread !== 'undefined') {
PThread.unusedWorkers.forEach(emnapiAWMT.addListener)
PThread.runningWorkers.forEach(emnapiAWMT.addListener)
Object.values(PThread.pthreads).forEach(emnapiAWMT.addListener)
const __original_getNewWorker = PThread.getNewWorker
PThread.getNewWorker = function () {
const r = __original_getNewWorker.apply(this, arguments as any)
Expand Down
2 changes: 1 addition & 1 deletion packages/emnapi/src/emscripten/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export function _emnapi_worker_unref (pid: number): void {
* @__postset
* ```
* PThread.unusedWorkers.forEach(emnapiAddSendListener);
* PThread.runningWorkers.forEach(emnapiAddSendListener);
* Object.values(PThread.pthreads).forEach(emnapiAddSendListener);
* (function () {
* var __original_getNewWorker = PThread.getNewWorker;
* PThread.getNewWorker = function () {
Expand Down
2 changes: 1 addition & 1 deletion packages/emnapi/src/threadsafe-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ const emnapiTSFN = {
emnapiTSFN.offset.mutex = emnapiTSFN.offset.mutex + 4
if (typeof PThread !== 'undefined') {
PThread.unusedWorkers.forEach(emnapiTSFN.addListener)
PThread.runningWorkers.forEach(emnapiTSFN.addListener)
Object.values(PThread.pthreads).forEach(emnapiTSFN.addListener)
const __original_getNewWorker = PThread.getNewWorker
PThread.getNewWorker = function () {
const r = __original_getNewWorker.apply(this, arguments as any)
Expand Down
12 changes: 3 additions & 9 deletions packages/wasi-threads/src/thread-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ let nextWorkerID = 0
/** @public */
export class ThreadManager {
public unusedWorkers: WorkerLike[] = []
public runningWorkers: WorkerLike[] = []
public pthreads: Record<number, WorkerLike> = Object.create(null)
public get nextWorkerID (): number { return nextWorkerID }

Expand Down Expand Up @@ -218,7 +217,6 @@ export class ThreadManager {
delete this.pthreads[tid]
}
this.unusedWorkers.push(worker)
this.runningWorkers.splice(this.runningWorkers.indexOf(worker), 1)
delete worker.__emnapi_tid
if (ENVIRONMENT_IS_NODE) {
(worker as NodeWorker).unref()
Expand Down Expand Up @@ -351,10 +349,6 @@ export class ThreadManager {
this.returnWorkerToPool(worker)
} else {
delete this.pthreads[tid]
const index = this.runningWorkers.indexOf(worker)
if (index !== -1) {
this.runningWorkers.splice(index, 1)
}
this.terminateWorker(worker)
delete worker.__emnapi_tid
}
Expand All @@ -375,14 +369,14 @@ export class ThreadManager {
}

public terminateAllThreads (): void {
for (let i = 0; i < this.runningWorkers.length; ++i) {
this.terminateWorker(this.runningWorkers[i])
const runningWorkers = Object.values(this.pthreads)
for (let i = 0; i < runningWorkers.length; ++i) {
this.terminateWorker(runningWorkers[i])
}
for (let i = 0; i < this.unusedWorkers.length; ++i) {
this.terminateWorker(this.unusedWorkers[i])
}
this.unusedWorkers = []
this.runningWorkers = []
this.pthreads = Object.create(null)

this.preparePool()
Expand Down
14 changes: 6 additions & 8 deletions packages/wasi-threads/src/wasi-threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class WASIThreads {
}

let worker: WorkerLike | undefined
let tid: number
let tid: number | undefined
const PThread = this.PThread
try {
worker = PThread!.getNewWorker(sab)
Expand All @@ -194,23 +194,22 @@ export class WASIThreads {
if (typeof waitThreadStart === 'number') {
const waitResult = Atomics.wait(sab!, 0, 0, waitThreadStart)
if (waitResult === 'timed-out') {
try {
PThread!.cleanThread(worker, tid, true)
} catch (_) {}
throw new Error('Spawning thread timed out. Please check if the worker is created successfully and if message is handled properly in the worker.')
}
} else {
Atomics.wait(sab!, 0, 0)
}
const r = Atomics.load(sab!, 0)
if (r > 1) {
try {
PThread!.cleanThread(worker, tid, true)
} catch (_) {}
throw deserizeErrorFromBuffer(sab!.buffer as SharedArrayBuffer)!
}
}
} catch (e) {
if (worker !== undefined && tid !== undefined) {
try {
PThread!.cleanThread(worker, tid, true)
} catch (_) {}
}
Atomics.store(struct, 0, 1)
Atomics.store(struct, 1, EAGAIN)
Atomics.notify(struct, 1)
Expand All @@ -227,7 +226,6 @@ export class WASIThreads {
Atomics.store(struct, 1, tid)
Atomics.notify(struct, 1)

PThread!.runningWorkers.push(worker)
if (!shouldWait) {
worker.whenLoaded!.catch((err: any) => {
delete worker.whenLoaded
Expand Down