diff --git a/impress.js b/impress.js index cb6e89cb..1ed9ae5f 100644 --- a/impress.js +++ b/impress.js @@ -100,20 +100,29 @@ const startWorker = async (app, kind, port, id = ++impress.lastWorkerId) => { }, invoke: async (msg) => { - const { status, port, exclusive } = msg; - if (status === 'done') return void app.pool.release(worker); - const promisedThread = exclusive ? app.pool.capture() : app.pool.next(); - const next = await promisedThread.catch(() => { - const error = new Error('No thread available'); - port.postMessage({ name: 'error', error }); + const { from, to, exclusive } = msg; + if (to) { + const back = app.threads.get(to); + return void back.postMessage(msg); + } + const promised = exclusive ? app.pool.capture() : app.pool.next(); + const next = await promised.catch(() => { + const error = { message: 'No thread available' }; + const back = app.threads.get(from); + const data = { id, status: 'error', error }; + back.postMessage({ name: 'invoke', to: from, data }); return null; }); if (!next) return; - next.postMessage(msg, [port]); + next.postMessage(msg); + }, + + release: () => { + app.pool.release(worker); }, - terminate: (msg) => { - process.emit('TERMINATE', msg.code); + terminate: ({ code }) => { + process.emit('TERMINATE', code); }, }; diff --git a/lib/deps.js b/lib/deps.js index 81fa996a..e0d1851b 100644 --- a/lib/deps.js +++ b/lib/deps.js @@ -92,7 +92,9 @@ node.StringDecoder = node['string_decoder']; node.perfHooks = node['perf_hooks']; node.asyncHooks = node['async_hooks']; node.fsp = node.fs.promises; -node.timers.promises = require('node:timers/promises'); +if (!node.timers.promises) { + node.timers.promises = require('node:timers/promises'); +} Object.freeze(node); Object.freeze(npm); diff --git a/lib/worker.js b/lib/worker.js index 4c8f113a..13a79690 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,7 +1,7 @@ 'use strict'; const { node, metarhia, notLoaded, wt } = require('./deps.js'); -const { MessageChannel, parentPort, threadId, workerData } = wt; +const { parentPort, threadId, workerData } = wt; const application = require('./application.js'); @@ -22,17 +22,21 @@ process.on('warning', logError('warning')); process.on('uncaughtException', logError('uncaughtException')); process.on('unhandledRejection', logError('unhandledRejection')); +let callId = 0; +const calls = new Map(); + const invoke = async ({ method, args, exclusive = false }) => { - const { port1: port, port2 } = new MessageChannel(); - const data = { method, args }; - const msg = { name: 'invoke', exclusive, data, port }; + const id = ++callId; + const data = { type: 'call', id, method, args }; + const msg = { name: 'invoke', from: threadId, exclusive, data }; return new Promise((resolve, reject) => { - port2.on('message', ({ error, data }) => { - port2.close(); + const handler = ({ error, result }) => { + calls.delete(id); if (error) reject(error); - else resolve(data); - }); - parentPort.postMessage(msg, [port]); + else resolve(result); + }; + calls.set(id, handler); + parentPort.postMessage(msg); }); }; @@ -48,34 +52,36 @@ const handlers = { process.exit(0); }, - invoke: async ({ exclusive, data, port }) => { - const { method, args } = data; + invoke: async ({ from, to, exclusive, data }) => { + if (to) { + const { id, status, error, result } = data; + const handler = calls.get(id); + const err = status === 'error' ? new Error(error.message) : null; + return void handler({ error: err, result }); + } const { sandbox, config } = application; + const msg = { name: 'invoke', to: from }; + const { timeout } = config.server.workers; + const { id, method, args } = data; const handler = metarhia.metautil.namespaceByPath(sandbox, method); if (!handler) { - const error = new Error('Handler not found'); - return void port.postMessage({ name: 'error', error }); + const error = { message: 'Handler not found' }; + const data = { id, status: 'error', error }; + return void parentPort.postMessage({ ...msg, data }); } - const msg = { name: 'invoke', status: 'done' }; - const { timeout } = config.server.workers; try { - let result; - if (timeout) { - const ac = new AbortController(); - result = await Promise.race([ - metarhia.metautil.timeout(timeout, ac.signal), - handler(args), - ]); - ac.abort(); - } else { - result = await handler(args); - } - port.postMessage({ ...msg, data: result }); - } catch (error) { - port.postMessage({ name: 'error', error }); - application.console.error(error.stack); + let promise = handler(args); + if (timeout) promise = metarhia.metautil.timeoutify(promise, timeout); + const result = await promise; + const data = { id, status: 'done', result }; + parentPort.postMessage({ ...msg, data }); + } catch (err) { + const error = { message: err.message }; + const data = { id, status: 'error', error }; + parentPort.postMessage({ ...msg, data }); + application.console.error(err.stack); } finally { - if (exclusive) parentPort.postMessage(msg); + if (exclusive) parentPort.postMessage({ name: 'release' }); } }, };