Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor queue from prototype to class syntax #460

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 299 additions & 0 deletions lib/queue.class.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
'use strict';

const QUEUE_TIMEOUT = 'Metasync: Queue timed out';

class Queue {
// Queue constructor
// concurrency - <number>, asynchronous concurrency
constructor(concurrency) {
this.paused = false;
this.concurrency = concurrency;
this.waitTimeout = 0;
this.processTimeout = 0;
this.throttleCount = 0;
this.throttleInterval = 1000;
this.count = 0;
this.tasks = [];
this.waiting = [];
this.factors = {};
this.fifoMode = true;
this.roundRobinMode = false;
this.priorityMode = false;
this.onProcess = null;
this.onDone = null;
this.onSuccess = null;
this.onTimeout = null;
this.onFailure = null;
this.onDrain = null;
}

// Set wait before processing timeout
// msec - <number>, wait timeout for single item
//
// Returns: <this>
wait(msec) {
this.waitTimeout = msec;
return this;
}

// Throttle to limit throughput
// Signature: count[, interval]
// count - <number>, item count
// interval - <number>, per interval, optional
// default: 1000 msec
//
// Returns: <this>
throttle(count, interval = 1000) {
this.throttleCount = count;
this.throttleInterval = interval;
return this;
}

// Add item to queue
// Signature: item[, factor[, priority]]
// item - <Object>, to be added
// factor - <number> | <string>, type, source,
// destination or path, optional
// priority - <number>, optional
//
// Returns: <this>
add(item, factor = 0, priority = 0) {
if (this.priorityMode && !this.roundRobinMode) {
priority = factor;
factor = 0;
}
const task = [item, factor, priority];
const slot = this.count < this.concurrency;
if (!this.paused && slot && this.onProcess) {
this.next(task);
return this;
}
let tasks;
if (this.roundRobinMode) {
tasks = this.factors[factor];
if (!tasks) {
tasks = [];
this.factors[factor] = tasks;
this.waiting.push(tasks);
}
} else {
tasks = this.tasks;
}

if (this.fifoMode) tasks.push(task);
else tasks.unshift(task);

if (this.priorityMode) {
if (this.fifoMode) {
tasks.sort((a, b) => b[2] - a[2]);
} else {
tasks.sort((a, b) => a[2] - b[2]);
}
}
return this;
}

// Process next item
// task - <Array>, next task [item, factor, priority]
//
// Returns: <this>
next(task) {
const item = task[0];
let timer;
this.count++;
if (this.processTimeout) {
timer = setTimeout(() => {
const err = new Error(QUEUE_TIMEOUT);
if (this.onTimeout) this.onTimeout(err);
}, this.processTimeout);
}
this.onProcess(item, (err, result) => {
if (this.onDone) this.onDone(err, result);
if (err) {
if (this.onFailure) this.onFailure(err);
} else if (this.onSuccess) {
this.onSuccess(result);
}
if (timer) {
clearTimeout(timer);
timer = null;
}
this.count--;
if (this.tasks.length > 0 || this.waiting.length > 0) {
this.takeNext();
} else if (this.count === 0 && this.onDrain) {
this.onDrain();
}
});
return this;
}

// Prepare next item for processing
//
// Returns: <this>
takeNext() {
if (this.paused || !this.onProcess) {
return this;
}
let tasks;
if (this.roundRobinMode) {
tasks = this.waiting.shift();
if (tasks.length > 1) {
this.waiting.push(tasks);
}
} else {
tasks = this.tasks;
}
const task = tasks.shift();
if (task) this.next(task);
return this;
}

// This function is not completely implemented yet
//
// Returns: <this>
pause() {
this.paused = true;
return this;
}

// Resume queue
// This function is not completely implemented yet
//
// Returns: <this>
resume() {
this.paused = false;
return this;
}

// Clear queue
//
// Returns: <this>
clear() {
this.count = 0;
this.tasks = [];
this.waiting = [];
this.factors = {};
return this;
}

// Set timeout interval and listener
// msec - <number>, process timeout for single item
// onTimeout - <Function>
//
// Returns: <this>
timeout(msec, onTimeout = null) {
this.processTimeout = msec;
if (onTimeout) this.onTimeout = onTimeout;
return this;
}

// Set processing function
// fn - <Function>
// item - <Object>
// callback - <Function>
// err - <Error> | <null>
// result - <any>
//
// Returns: <this>
process(fn) {
this.onProcess = fn;
return this;
}

// Set listener on processing done
// fn - <Function>, done listener
// err - <Error> | <null>
// result - <any>
//
// Returns: <this>
done(fn) {
this.onDone = fn;
return this;
}

// Set listener on processing success
// listener - <Function>, on success
// item - <any>
//
// Returns: <this>
success(listener) {
this.onSuccess = listener;
return this;
}

// Set listener on processing error
// listener - <Function>, on failure
// err - <Error> | <null>
//
// Returns: <this>
failure(listener) {
this.onFailure = listener;
return this;
}

// Set listener on drain Queue
// listener - <Function>, on drain
//
// Returns: <this>
drain(listener) {
this.onDrain = listener;
return this;
}

// Switch to FIFO mode (default for Queue)
//
// Returns: <this>
fifo() {
this.fifoMode = true;
return this;
}

// Switch to LIFO mode
//
// Returns: <this>
lifo() {
this.fifoMode = false;
return this;
}

// Activate or deactivate priority mode
// flag - <boolean>, default: true, false will
// disable priority mode
//
// Returns: <this>
priority(flag = true) {
this.priorityMode = flag;
return this;
}

// Activate or deactivate round robin mode
// flag - <boolean>, default: true, false will
// disable roundRobin mode
//
// Returns: <this>
roundRobin(flag = true) {
this.roundRobinMode = flag;
return this;
}

// Pipe processed items to different queue
// dest - <Queue>, destination queue
//
// Returns: <this>
pipe(dest) {
if (dest instanceof Queue) {
this.success((item) => void dest.add(item));
}
return this;
}
}

// Create Queue instance
// concurrency - <number>, simultaneous and
// asynchronously executing tasks
//
// Returns: <Queue>
const queue = (concurrency) => new Queue(concurrency);

module.exports = { queue, Queue };
11 changes: 7 additions & 4 deletions lib/throttle.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
//
// Returns: <Function>
const throttle = (timeout, fn, ...args) => {
let timer;
let timer = null;
let wait = false;

const execute = args
? (...pars) => (pars ? fn(...args, ...pars) : fn(...args))
: (...pars) => (pars ? fn(...pars) : fn());

const delayed = (...pars) => {
timer = undefined;
timer = null;
if (wait) execute(...pars);
};

Expand All @@ -38,12 +38,15 @@ const throttle = (timeout, fn, ...args) => {
// fn - <Function>, to be debounced
// args - <Array>, arguments for fn, optional
const debounce = (timeout, fn, ...args) => {
let timer;
let timer = null;

const debounced = () => (args ? fn(...args) : fn());

const wrapped = () => {
if (timer) clearTimeout(timer);
if (timer) {
clearTimeout(timer);
timer = null;
}
timer = setTimeout(debounced, timeout);
};

Expand Down
4 changes: 2 additions & 2 deletions metasync.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

const common = require('@metarhia/common');
const nodeVerion = common.between(process.version, 'v', '.');
const nodeVersion = common.between(process.version, 'v', '.');

const submodules = [
'composition', // Unified abstraction
Expand All @@ -17,7 +17,7 @@ const submodules = [
'throttle', // Throttling utilities
].map((path) => require('./lib/' + path));

if (nodeVerion >= 10) {
if (nodeVersion >= 10) {
submodules.push(require('./lib/async-iterator'));
}

Expand Down
Loading