Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asNodeWritable. #359

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 116 additions & 12 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,25 @@
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var Decoder = require('string_decoder').StringDecoder;
var HighlandNodeWritable = require('./node-writable.js');

/**
* The Stream constructor, accepts an array of values or a generator function
* as an optional argument. This is typically the entry point to the Highland
* APIs, providing a convenient way of chaining calls together.
*
* **null -** Not specifying a source creates a *writable* Stream. These
* Streams are useful for interop with libraries that expect a
* [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable)
* (via [asNodeWritable](#asNodeWritable)). They can also be directly written
* to via the [write](#write) method.
*
* While these streams implement `write` and you can generally pipe a
* [Node Readable Stream](https://nodejs.org/api/stream.html#stream_class_stream_readable)
* through it, it is generally recommended to use the result of
* [asNodeWritable](#asNodeWritable) when you need a Writable, as Highland
* Streams do not implement the full Writable API.
*
* **Arrays -** Streams created from Arrays will emit each value of the Array
* and then emit a [nil](#nil) value to signal the end of the Stream.
*
Expand Down Expand Up @@ -513,7 +526,9 @@ function Stream(/*optional*/xs, /*optional*/ee, /*optional*/mappingHint) {
});

if (_.isUndefined(xs)) {
// nothing else to do
// The existance of this property signifies a
// writable stream.
this._node_writable = null;
return this;
}
else if (_.isArray(xs)) {
Expand Down Expand Up @@ -628,6 +643,16 @@ function StreamError(err) {
this.error = err;
}

/**
* Used to hold callbacks when writing to a Stream's incoming buffer
*/

function CallbackToken(token, cb) {
this.__HighlandCallbackToken__ = true;
this.token = token;
this.cb = cb;
}

/**
* Used as a Redirect marker when writing to a Stream's incoming buffer
*/
Expand Down Expand Up @@ -659,6 +684,10 @@ _._isStreamError = function (x) {
return _.isObject(x) && !!x.__HighlandStreamError__;
};

_._isCallbackToken = function (x) {
return _.isObject(x) && !!x.__HighlandCallbackToken__;
};

_._isStreamRedirect = function (x) {
return _.isObject(x) && !!x.__HighlandStreamRedirect__;
};
Expand All @@ -667,7 +696,7 @@ _._isStreamRedirect = function (x) {
* Sends errors / data to consumers, observers and event handlers
*/

Stream.prototype._send = function (err, x) {
Stream.prototype._send = function (err, x, cb) {
//console.log(['_send', this.id, err, x]);
var token;

Expand Down Expand Up @@ -697,6 +726,10 @@ Stream.prototype._send = function (err, x) {
this.emit('data', x);
}
}

if (cb) {
cb();
}
};

/**
Expand Down Expand Up @@ -749,14 +782,21 @@ Stream.prototype._readFromBuffer = function () {
var i = 0;
while (i < len && !this.paused) {
var x = this._incoming[i];
var cb = null;

if (_._isCallbackToken(x)) {
cb = x.cb;
x = x.token;
}

if (_._isStreamError(x)) {
this._send(x.error);
this._send(x.error, null, cb);
}
else if (_._isStreamRedirect(x)) {
this._redirect(x.to);
}
else {
this._send(null, x);
this._send(null, x, cb);
}
i++;
}
Expand Down Expand Up @@ -867,7 +907,7 @@ Stream.prototype.end = function () {

/**
* Pipes a Highland Stream to a [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable)
* (Highland Streams are also Node Writable Streams). This will pull all the
* (Highland Streams can also be used). This will pull all the
* data from the source Highland Stream and write it to the destination,
* automatically managing flow so that the destination is not overwhelmed
* by a fast source.
Expand Down Expand Up @@ -1113,6 +1153,10 @@ Stream.prototype.consume = function (f) {
self = self._delegate;
}
var s = new Stream();

// This has a source.
s._node_writable = void 0;

var _send = s._send;
var push = function (err, x) {
//console.log(['push', err, x, s.paused]);
Expand Down Expand Up @@ -1221,7 +1265,12 @@ Stream.prototype.pull = function (f) {
* @id write
* @section Stream Objects
* @name Stream.write(x)
* @param x - the value to write to the Stream
* @param {any} x - the value to write to the Stream
* @param {String} encoding - (optional) Ignored. This parameter is only here to
* match the Writable signature.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(suggested edit -- see below): If it is callable, it is taken as the callback argument.

* @param {Function} callback - (optional) The callback that will be called
* when this value has been processed (i.e., when it has been passed to the
* next stage of the pipeline).
* @api public
*
* var xs = _();
Expand All @@ -1233,26 +1282,81 @@ Stream.prototype.pull = function (f) {
* // ys will be [1, 2]
* });
*
* var xs2 = _();
* xs.write(1, null, function () {
* console.log('written');
* });
* xs.end();
* xs.toArray(_.log); // 'written' will be printed here.
*
* // Do *not* do this.
* var xs2 = _().toArray(_.log);
* xs2.write(1); // This call is illegal.
* var xs3 = _().toArray(_.log);
* xs3.write(1); // This call is illegal.
*/

Stream.prototype.write = function (x) {
Stream.prototype.write = function (x, encoding, cb) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you could make encoding optional:

if (typeof encoding === 'function') {
    cb = encoding;
    encoding = null;
}

if (this.paused) {
this._incoming.push(x);
if (cb) {
this._incoming.push(new CallbackToken(x, cb));
}
else {
this._incoming.push(x);
}
}
else {
if (_._isCallbackToken(x)) {
cb = x.cb;
x = x.token;
}

if (_._isStreamError(x)) {
this._send(x.error);
this._send(x.error, null, cb);
}
else {
this._send(null, x);
this._send(null, x, cb);
}
}
return !this.paused;
};

/**
* Returns a [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable)
* that is linked to this Highland stream. Data written to the Writable will
* be written to this stream. This stream can be used when using an API that
* expects a Writable. Multiple calls to this method will
* return the same Writable.
*
* This method can only be used on *writable* Highland streams (i.e., streams created
* with no source). This method will throw when called on non-writable stream.
*
* @id asNodeWritable
* @section Stream Objects
* @name Stream.asNodeWritable()
* @api public
*
* var xs = _();
*
* xs.toArray(_.log);
*
* var writable = xs.asNodeWritable();
* writable.write(1);
* writable.write(2);
* writable.end(3);
* // => [1, 2, 3]
*/

Stream.prototype.asNodeWritable = function asNodeWritable() {
if (_.isUndefined(this._node_writable)) {
throw new Error('This stream is not writable');
}

if (!this._node_writable) {
this._node_writable = new HighlandNodeWritable(this);
}

return this._node_writable;
};

/**
* Forks a stream, allowing you to add additional consumers with shared
* back-pressure. A stream forked to multiple consumers will only pull values
Expand Down
21 changes: 21 additions & 0 deletions lib/node-writable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var Writable = require('stream').Writable,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be compatible with pre-0.10 Nodes, do we support those?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not running any of our tests on Node 0.8 so I guess by virtue of that we don't.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once upon a time we did (back in 1.x according to git history). I think we dropped it because one of our devDependencies is no longer compatible with the version of npm that comes with 0.8 on travis.

If you're concerned we could make asNodeWritable return this if require('stream').Writable doesn't exist. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there's always readable-stream, but I'm not too concerned. I think we're fine ignoring old Nodes, as long as we make it clear what we support.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. We'll keep using Writable then.

inherits = require('util').inherits;

function HighlandNodeWritable(stream) {
Writable.call(this, {
objectMode: true,
decodeStrings: false,
highWaterMark: 0
});

this._stream = stream;
this.on('finish', stream.end.bind(stream));
}

inherits(HighlandNodeWritable, Writable);

HighlandNodeWritable.prototype._write = function _write(chunk, encoding, cb) {
this._stream.write(chunk, encoding, cb);
};

module.exports = HighlandNodeWritable;
73 changes: 73 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,23 @@ exports['write when not paused sends to consumer'] = function (test) {
test.done();
};

exports['write - callbacks called when value emitted'] = function (test) {
test.expect(4);
var s = _();
var cbFired = false;

s.write(1, null, function () {
cbFired = true;
});
s.end();

test.ok(!cbFired, 'The callback fired before the value was consumed.');
s.pull(valueEquals(test, 1));
test.ok(cbFired, 'The callback was never fired.');
s.pull(valueEquals(test, _.nil));
test.done();
};

exports['buffered incoming data released on resume'] = function (test) {
var vals = [];
var s1 = _();
Expand Down Expand Up @@ -767,6 +784,62 @@ exports['consume throws error if next called after nil'] = function (test) {
test.done();
};

exports['asNodeWritable'] = {
'writable forwards data': function (test) {
test.expect(1);
var s = _();
s.toArray(function (xs) {
test.same(xs, [1, 2, 3]);
test.done();
});

var writable = s.asNodeWritable();
writable.write(1);
writable.write(2);
writable.end(3);
},
'same writable returned every time': function (test) {
test.expect(1);
var s = _();
test.strictEqual(s.asNodeWritable(), s.asNodeWritable());
test.done();
},
'throws if not writable': function (test) {
test.expect(1);
var s = _().map(_.add(1));
test.throws(s.asNodeWritable.bind(s));
test.done();
},
'pipes to writable works': function (test) {
test.expect(2);
var s = _();
streamify([1, 2, 3]).pipe(s.asNodeWritable());
test.same(s._incoming.length, 0);

s.toArray(function (xs) {
test.same(xs, [1, 2, 3]);
test.done();
});
},
'callbacks fired when value emitted': function (test) {
test.expect(4);
var cbFired = false;
var s = _();
var writable = s.asNodeWritable();

writable.write(1, null, function () {
cbFired = true;
});
writable.end();

test.ok(!cbFired, 'The callback fired before the value was consumed.');
s.pull(valueEquals(test, 1));
test.ok(cbFired, 'The callback was never fired.');
s.pull(valueEquals(test, _.nil));
test.done();
}
};

exports['errors'] = function (test) {
var errs = [];
var err1 = new Error('one');
Expand Down