diff --git a/main/util/parallel.js b/main/util/parallel.js index 627a883..d0d242a 100644 --- a/main/util/parallel.js +++ b/main/util/parallel.js @@ -249,72 +249,67 @@ return this; }; - Parallel.prototype._spawnMapWorker = function (i, cb, done, env, wrk) { - var that = this; - - if (!wrk) wrk = that._spawnWorker(cb, env); + Parallel.prototype._initializeWorkerPool = function (size, cb, env) { + this.workerPool = []; + for (var i = 0; i < size; i++) { + var worker = this._spawnWorker(cb, env); + if (worker) { + this.workerPool.push(worker); + } + } + }; - if (wrk !== undefined) { - wrk.onmessage = function (msg) { - that.data[i] = msg.data; - done(null, wrk); - }; - wrk.onerror = function (e) { - wrk.terminate(); - done(e); - }; - wrk.postMessage(that.data[i]); - } else if (that.options.synchronous) { - setImmediate(function () { - that.data[i] = cb(that.data[i]); - done(); - }); - } else { - throw new Error( - "Workers do not exist and synchronous operation not allowed!" - ); + Parallel.prototype._spawnMapWorker = function (i, cb, done, env) { + var that = this; + + if (!this.workerPool || this.workerPool.length === 0) { + throw new Error("Worker pool is not initialized or empty."); } + + var worker = this.workerPool.pop(); + worker.onmessage = function (msg) { + that.data[i] = msg.data; + that.workerPool.push(worker); // Return worker to the pool + done(null); + }; + worker.onerror = function (e) { + that.workerPool.push(worker); // Return worker to the pool even if there's an error + done(e); + }; + worker.postMessage(that.data[i]); }; Parallel.prototype.map = function (cb, env) { env = extend(this.options.env, env || {}); - - if (!this.data.length) { - return this.spawn(cb, env); - } - var that = this; var startedOps = 0; var doneOps = 0; - function done(err, wrk) { + + if (!this.workerPool) { + this._initializeWorkerPool(this.options.maxWorkers, cb, env); + } + + var newOp = new Operation(); + function done(err) { if (err) { newOp.resolve(err, null); } else if (++doneOps === that.data.length) { newOp.resolve(null, that.data); - if (wrk) wrk.terminate(); } else if (startedOps < that.data.length) { - that._spawnMapWorker(startedOps++, cb, done, env, wrk); - } else { - if (wrk) wrk.terminate(); + that._spawnMapWorker(startedOps++, cb, done, env); } } - - var newOp = new Operation(); - this.operation.then( - function () { - for ( - ; - startedOps - doneOps < that.options.maxWorkers && - startedOps < that.data.length; - ++startedOps - ) { - that._spawnMapWorker(startedOps, cb, done, env); - } - }, - function (err) { - newOp.resolve(err, null); + + this.operation.then(function () { + for ( + ; + startedOps - doneOps < that.options.maxWorkers && + startedOps < that.data.length; + ++startedOps + ) { + that._spawnMapWorker(startedOps, cb, done, env); } - ); + }); this.operation = newOp; return this; };