Skip to content

Commit

Permalink
Allow async iterator sources
Browse files Browse the repository at this point in the history
  • Loading branch information
erayd committed Jan 8, 2020
1 parent 2225030 commit ffd99c8
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ If `isSource` is true, then `value` will be treated as an iterator. All items
returned by the iterator will be added to the queue as space allows. If the
queue is full, the iterator will be called again once there is space available.

If `isSource` is true and `value` is an asynchronous iterator, then Spique will
wait for each yielded promise to resolve before adding it to the queue, and
before requesting another value from the iterator.

If `isSource` is true and `value` is another `Spique` instance, then in
addition to being added as an iterator, it will also be watched for new data.
If `value` is closed, then the queue into which it is feeding will also close
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "spique",
"version": "3.0.0",
"version": "3.0.1",
"description": "A spiral deque - high performance and dynamic queue size",
"main": "spique.js",
"scripts": {
Expand Down
34 changes: 24 additions & 10 deletions spique.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ module.exports = class Spique extends EventEmitter {
});

// attach chained source (iterator | generator | Spique)
function attachSource(source, forward = true, applyTransforms = true) {
async function attachSource(source, forward = true, applyTransforms = true) {
let insert = forward ? "enqueue" : "enqueueHead";
if (source instanceof Spique) {
source.on("data", s => {
Expand All @@ -92,16 +92,30 @@ module.exports = class Spique extends EventEmitter {
source.on("close", () => this.close());
return;
} else if (Symbol.iterator in source) source = source[Symbol.iterator]();
let feed = target => {
while (target.free) {
let next = source.next();
if (next.done) {
target.removeListener("free", feed);
break;
} else target[insert](next.value, false, applyTransforms);
if (Symbol.asyncIterator in source) {
for await (let next of source) {
await new Promise(async resolve => {
let feed = target => {
if (target.free) {
target[insert](next, false, applyTransforms);
resolve();
} else this.once("free", feed);
};
feed(this);
});
}
};
this.on("free", feed);
} else {
let feed = target => {
while (target.free) {
let next = source.next();
if (next.done) {
target.removeListener("free", feed);
break;
} else target[insert](next.value, false, applyTransforms);
}
};
this.on("free", feed);
}
}

// apply transforms & return a generator instance
Expand Down
12 changes: 12 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ const GeneratorFunction = function*() {}.prototype.constructor;
for (let noop of s4);
assert(s4.closed === true);
assert(closed === true);

// async
(async () => {
let g = async function*(limit) {
for (let i = 0; i < limit; i++) yield i;
};
let s = new Spique(5);
s.enqueue(g(10), true);
await new Promise(resolve => s.on("full", resolve));
assert(s.length === 5);
assert(s.dequeue() === 0);
})();
}

// transforms
Expand Down

0 comments on commit ffd99c8

Please sign in to comment.