diff --git a/lib/queue.js b/lib/queue.js index 3447752b..fd78f8bf 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,292 +1,272 @@ 'use strict'; -// Queue constructor -// concurrency - , asynchronous concurrency -function Queue(concurrency) { - this.paused = false; - this.concurrency = concurrency; - this.waitTimeout = 0; - this.processTimeout = 0; - this.throttleCount = 0; - this.throttleInterval = 1000; - this.count = 0; - this.tasks = []; - this.waiting = []; - this.factors = {}; - this.fifoMode = true; - this.roundRobinMode = false; - this.priorityMode = false; - this.onProcess = null; - this.onDone = null; - this.onSuccess = null; - this.onTimeout = null; - this.onFailure = null; - this.onDrain = null; -} - const QUEUE_TIMEOUT = 'Metasync: Queue timed out'; -// Set wait before processing timeout -// msec - , wait timeout for single item -// -// Returns: -Queue.prototype.wait = function(msec) { - this.waitTimeout = msec; - return this; -}; - -// Throttle to limit throughput -// count - , item count -// interval - , per interval, optional -// default: 1000 msec -// -// Returns: -Queue.prototype.throttle = function(count, interval = 1000) { - this.throttleCount = count; - this.throttleInterval = interval; - return this; -}; +class Queue { + // Queue constructor + // concurrency - , asynchronous concurrency + constructor(concurrency) { + this.paused = false; + this.concurrency = concurrency; + this.waitTimeout = 0; + this.processTimeout = 0; + this.throttleCount = 0; + this.throttleInterval = 1000; + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + this.roundRobinMode = false; + this.priorityMode = false; + this.onProcess = null; + this.onDone = null; + this.onSuccess = null; + this.onTimeout = null; + this.onFailure = null; + this.onDrain = null; + } -// Add item to queue -// item - , to be added -// factor - | , type, source, -// destination or path, optional -// priority - , optional -// -// Returns: -Queue.prototype.add = function(item, factor = 0, priority = 0) { - if (this.priorityMode && !this.roundRobinMode) { - priority = factor; - factor = 0; + // Set wait before processing timeout + // msec - , wait timeout for single item + // + // Returns: + wait(msec) { + this.waitTimeout = msec; + return this; } - const task = [item, factor, priority]; - const slot = this.count < this.concurrency; - if (!this.paused && slot && this.onProcess) { - this.next(task); + + // Throttle to limit throughput + // count - , item count + // interval - , per interval, optional + // default: 1000 msec + // + // Returns: + throttle(count, interval = 1000) { + this.throttleCount = count; + this.throttleInterval = interval; return this; } - let tasks; - if (this.roundRobinMode) { - tasks = this.factors[factor]; - if (!tasks) { - tasks = []; - this.factors[factor] = tasks; - this.waiting.push(tasks); + + // Add item to queue + // item - , to be added + // factor - | , type, source, + // destination or path, optional + // priority - , optional + // + // Returns: + add(item, factor = 0, priority = 0) { + if (this.priorityMode && !this.roundRobinMode) { + priority = factor; + factor = 0; + } + const task = [item, factor, priority]; + const slot = this.count < this.concurrency; + if (!this.paused && slot && this.onProcess) { + this.next(task); + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.factors[factor]; + if (!tasks) { + tasks = []; + this.factors[factor] = tasks; + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; } - } else { - tasks = this.tasks; - } - if (this.fifoMode) tasks.push(task); - else tasks.unshift(task); + tasks.push(task); - if (this.priorityMode) { - if (this.fifoMode) { + if (this.priorityMode) { tasks.sort((a, b) => b[2] - a[2]); - } else { - tasks.sort((a, b) => a[2] - b[2]); } + return this; } - return this; -}; -// Process next item -// task - , next task [item, factor, priority] -// -// Returns: -Queue.prototype.next = function(task) { - const item = task[0]; - let timer; - this.count++; - if (this.processTimeout) { - timer = setTimeout(() => { - const err = new Error(QUEUE_TIMEOUT); - if (this.onTimeout) this.onTimeout(err); - }, this.processTimeout); - } - this.onProcess(item, (err, result) => { - if (this.onDone) this.onDone(err, result); - if (err) { - if (this.onFailure) this.onFailure(err); - } else if (this.onSuccess) { - this.onSuccess(result); + // Process next item + // task - , next task [item, factor, priority] + // + // Returns: + next(task) { + const item = task[0]; + let timer; + this.count++; + if (this.processTimeout) { + timer = setTimeout(() => { + const err = new Error(QUEUE_TIMEOUT); + if (this.onTimeout) this.onTimeout(err); + }, this.processTimeout); } - if (timer) { - clearTimeout(timer); - timer = null; - } - this.count--; - if (this.tasks.length > 0 || this.waiting.length > 0) { - this.takeNext(); - } else if (this.count === 0 && this.onDrain) { - this.onDrain(); - } - }); - return this; -}; - -// Prepare next item for processing -// -// Returns: -Queue.prototype.takeNext = function() { - if (this.paused || !this.onProcess) { + this.onProcess(item, (err, result) => { + if (this.onDone) this.onDone(err, result); + if (err) { + if (this.onFailure) this.onFailure(err); + } else if (this.onSuccess) { + this.onSuccess(result); + } + if (timer) { + clearTimeout(timer); + timer = null; + } + this.count--; + if (this.tasks.length > 0 || this.waiting.length > 0) { + this.takeNext(); + } else if (this.count === 0 && this.onDrain) { + this.onDrain(); + } + }); return this; } - let tasks; - if (this.roundRobinMode) { - tasks = this.waiting.shift(); - if (tasks.length > 1) { - this.waiting.push(tasks); + + // Prepare next item for processing + // + // Returns: + takeNext() { + if (this.paused || !this.onProcess) { + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.waiting.shift(); + if (tasks.length > 1) { + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; } - } else { - tasks = this.tasks; + const task = tasks.shift(); + if (task) this.next(task); + return this; } - const task = tasks.shift(); - if (task) this.next(task); - return this; -}; -// Pause queue -// This function is not completely implemented yet -// -// Returns: -Queue.prototype.pause = function() { - this.paused = true; - return this; -}; - -// Resume queue -// This function is not completely implemented yet -// -// Returns: -Queue.prototype.resume = function() { - this.paused = false; - return this; -}; - -// Clear queue -// -// Returns: -Queue.prototype.clear = function() { - this.count = 0; - this.tasks = []; - this.waiting = []; - this.factors = {}; - return this; -}; + // Pause queue + // This function is not completely implemented yet + // + // Returns: + pause() { + this.paused = true; + return this; + } -// Set timeout interval and listener -// msec - , process timeout for single item -// onTimeout - -// -// Returns: -Queue.prototype.timeout = function(msec, onTimeout = null) { - this.processTimeout = msec; - if (onTimeout) this.onTimeout = onTimeout; - return this; -}; + // Resume queue + // This function is not completely implemented yet + // + // Returns: + resume() { + this.paused = false; + return this; + } -// Set processing function -// fn - -// item - -// callback - -// err - | -// result - -// -// Returns: -Queue.prototype.process = function(fn) { - this.onProcess = fn; - return this; -}; + // Clear queue + // + // Returns: + clear() { + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + return this; + } -// Set listener on processing done -// fn - , done listener -// err - | -// result - -// -// Returns: -Queue.prototype.done = function(fn) { - this.onDone = fn; - return this; -}; + // Set timeout interval and listener + // msec - , process timeout for single item + // onTimeout - + // + // Returns: + timeout(msec, onTimeout = null) { + this.processTimeout = msec; + if (onTimeout) this.onTimeout = onTimeout; + return this; + } -// Set listener on processing success -// listener - , on success -// item - -// -// Returns: -Queue.prototype.success = function(listener) { - this.onSuccess = listener; - return this; -}; + // Set processing function + // fn - + // item - + // callback - + // err - | + // result - + // + // Returns: + process(fn) { + this.onProcess = fn; + return this; + } -// Set listener on processing error -// listener - , on failure -// err - | -// -// Returns: -Queue.prototype.failure = function(listener) { - this.onFailure = listener; - return this; -}; + // Set listener on processing done + // fn - , done listener + // err - | + // result - + // + // Returns: + done(fn) { + this.onDone = fn; + return this; + } -// Set listener on drain Queue -// listener - , on drain -// -// Returns: -Queue.prototype.drain = function(listener) { - this.onDrain = listener; - return this; -}; + // Set listener on processing success + // listener - , on success + // item - + // + // Returns: + success(listener) { + this.onSuccess = listener; + return this; + } -// Switch to FIFO mode (default for Queue) -// -// Returns: -Queue.prototype.fifo = function() { - this.fifoMode = true; - return this; -}; + // Set listener on processing error + // listener - , on failure + // err - | + // + // Returns: + failure(listener) { + this.onFailure = listener; + return this; + } -// Switch to LIFO mode -// -// Returns: -Queue.prototype.lifo = function() { - this.fifoMode = false; - return this; -}; + // Set listener on drain Queue + // listener - , on drain + // + // Returns: + drain(listener) { + this.onDrain = listener; + return this; + } -// Activate or deactivate priority mode -// flag - , default: true, false will -// disable priority mode -// -// Returns: -Queue.prototype.priority = function(flag = true) { - this.priorityMode = flag; - return this; -}; + // Activate or deactivate priority mode + // flag - , default: true, false will + // disable priority mode + // + // Returns: + priority(flag = true) { + this.priorityMode = flag; + return this; + } -// Activate or deactivate round robin mode -// flag - , default: true, false will -// disable roundRobin mode -// -// Returns: -Queue.prototype.roundRobin = function(flag = true) { - this.roundRobinMode = flag; - return this; -}; + // Activate or deactivate round robin mode + // flag - , default: true, false will + // disable roundRobin mode + // + // Returns: + roundRobin(flag = true) { + this.roundRobinMode = flag; + return this; + } -// Pipe processed items to different queue -// dest - , destination queue -// -// Returns: -Queue.prototype.pipe = function(dest) { - if (dest instanceof Queue) { - this.success(item => { - dest.add(item); - }); + // Pipe processed items to different queue + // dest - , destination queue + // + // Returns: + pipe(dest) { + if (dest instanceof Queue) { + this.success(item => { + dest.add(item); + }); + } + return this; } - return this; -}; +} // Queue instantiation // concurrency - , simultaneous and diff --git a/test/queue.lifo.js b/test/queue.lifo.js deleted file mode 100644 index 91161916..00000000 --- a/test/queue.lifo.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict'; - -const metasync = require('..'); -const metatests = require('metatests'); - -metatests.test('lifo / simple', test => { - const expectedResult = [1, 2, 3, 9, 8, 7, 6, 5, 4]; - const result = []; - - const q = metasync - .queue(3) - .priority() - .lifo() - .process((item, cb) => { - result.push(item.id); - setTimeout(cb, 100); - }); - - q.drain(() => { - test.strictSame(result, expectedResult); - test.end(); - }); - - q.add({ id: 1 }); - q.add({ id: 2 }); - q.add({ id: 3 }); - q.add({ id: 4 }); - q.add({ id: 5 }); - q.add({ id: 6 }); - q.add({ id: 7 }); - q.add({ id: 8 }); - q.add({ id: 9 }); -}); diff --git a/test/queue.modes.js b/test/queue.modes.js index ab8553dd..def43c03 100644 --- a/test/queue.modes.js +++ b/test/queue.modes.js @@ -24,54 +24,6 @@ metatests.test('queue default FIFO', test => { } }); -metatests.test('queue FIFO', test => { - const queue = metasync - .queue(3) - .fifo() - .timeout(1); - const res = []; - - queue.process((item, callback) => { - process.nextTick(() => { - res.push(item.id); - callback(); - }); - }); - - queue.drain(() => { - test.strictSame(res, [1, 2, 3, 4, 5, 6, 7, 8, 9]); - test.end(); - }); - - for (let id = 1; id < 10; id++) { - queue.add({ id }); - } -}); - -metatests.test('queue LIFO', test => { - const queue = metasync - .queue(3) - .lifo() - .timeout(1); - const res = []; - - queue.process((item, callback) => { - process.nextTick(() => { - res.push(item.id); - callback(); - }); - }); - - queue.drain(() => { - test.strictSame(res, [1, 2, 3, 9, 8, 7, 6, 5, 4]); - test.end(); - }); - - for (let id = 1; id < 10; id++) { - queue.add({ id }); - } -}); - metatests.test('queue priority', test => { const queue = metasync.queue(3).priority(); const res = [];