diff --git a/README.md b/README.md index 4833c40..c8972e2 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,10 @@ If `isSource` is true, then `value` will be treated as an iterator. All items returned by the iterator will be added to the queue as space allows. If the queue is full, the iterator will be called again once there is space available. +If `isSource` is true and `value` is an asynchronous iterator, then Spique will +wait for each yielded promise to resolve before adding it to the queue, and +before requesting another value from the iterator. + If `isSource` is true and `value` is another `Spique` instance, then in addition to being added as an iterator, it will also be watched for new data. If `value` is closed, then the queue into which it is feeding will also close diff --git a/package.json b/package.json index 34be4ee..6a01ce9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "spique", - "version": "3.0.0", + "version": "3.0.1", "description": "A spiral deque - high performance and dynamic queue size", "main": "spique.js", "scripts": { diff --git a/spique.js b/spique.js index a0517c0..899a65f 100644 --- a/spique.js +++ b/spique.js @@ -83,7 +83,7 @@ module.exports = class Spique extends EventEmitter { }); // attach chained source (iterator | generator | Spique) - function attachSource(source, forward = true, applyTransforms = true) { + async function attachSource(source, forward = true, applyTransforms = true) { let insert = forward ? "enqueue" : "enqueueHead"; if (source instanceof Spique) { source.on("data", s => { @@ -92,16 +92,30 @@ module.exports = class Spique extends EventEmitter { source.on("close", () => this.close()); return; } else if (Symbol.iterator in source) source = source[Symbol.iterator](); - let feed = target => { - while (target.free) { - let next = source.next(); - if (next.done) { - target.removeListener("free", feed); - break; - } else target[insert](next.value, false, applyTransforms); + if (Symbol.asyncIterator in source) { + for await (let next of source) { + await new Promise(async resolve => { + let feed = target => { + if (target.free) { + target[insert](next, false, applyTransforms); + resolve(); + } else this.once("free", feed); + }; + feed(this); + }); } - }; - this.on("free", feed); + } else { + let feed = target => { + while (target.free) { + let next = source.next(); + if (next.done) { + target.removeListener("free", feed); + break; + } else target[insert](next.value, false, applyTransforms); + } + }; + this.on("free", feed); + } } // apply transforms & return a generator instance diff --git a/test.js b/test.js index e0b2ff6..cef7a71 100755 --- a/test.js +++ b/test.js @@ -245,6 +245,18 @@ const GeneratorFunction = function*() {}.prototype.constructor; for (let noop of s4); assert(s4.closed === true); assert(closed === true); + + // async + (async () => { + let g = async function*(limit) { + for (let i = 0; i < limit; i++) yield i; + }; + let s = new Spique(5); + s.enqueue(g(10), true); + await new Promise(resolve => s.on("full", resolve)); + assert(s.length === 5); + assert(s.dequeue() === 0); + })(); } // transforms