diff --git a/lib/index.js b/lib/index.js index 71d9265..eafdf1e 100755 --- a/lib/index.js +++ b/lib/index.js @@ -11,12 +11,25 @@ var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var Decoder = require('string_decoder').StringDecoder; +var HighlandNodeWritable = require('./node-writable.js'); /** * The Stream constructor, accepts an array of values or a generator function * as an optional argument. This is typically the entry point to the Highland * APIs, providing a convenient way of chaining calls together. * + * **null -** Not specifying a source creates a *writable* Stream. These + * Streams are useful for interop with libraries that expect a + * [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable) + * (via [asNodeWritable](#asNodeWritable)). They can also be directly written + * to via the [write](#write) method. + * + * While these streams implement `write` and you can generally pipe a + * [Node Readable Stream](https://nodejs.org/api/stream.html#stream_class_stream_readable) + * through it, it is generally recommended to use the result of + * [asNodeWritable](#asNodeWritable) when you need a Writable, as Highland + * Streams do not implement the full Writable API. + * * **Arrays -** Streams created from Arrays will emit each value of the Array * and then emit a [nil](#nil) value to signal the end of the Stream. * @@ -513,7 +526,9 @@ function Stream(/*optional*/xs, /*optional*/ee, /*optional*/mappingHint) { }); if (_.isUndefined(xs)) { - // nothing else to do + // The existance of this property signifies a + // writable stream. + this._node_writable = null; return this; } else if (_.isArray(xs)) { @@ -628,6 +643,16 @@ function StreamError(err) { this.error = err; } +/** + * Used to hold callbacks when writing to a Stream's incoming buffer + */ + +function CallbackToken(token, cb) { + this.__HighlandCallbackToken__ = true; + this.token = token; + this.cb = cb; +} + /** * Used as a Redirect marker when writing to a Stream's incoming buffer */ @@ -659,6 +684,10 @@ _._isStreamError = function (x) { return _.isObject(x) && !!x.__HighlandStreamError__; }; +_._isCallbackToken = function (x) { + return _.isObject(x) && !!x.__HighlandCallbackToken__; +}; + _._isStreamRedirect = function (x) { return _.isObject(x) && !!x.__HighlandStreamRedirect__; }; @@ -667,7 +696,7 @@ _._isStreamRedirect = function (x) { * Sends errors / data to consumers, observers and event handlers */ -Stream.prototype._send = function (err, x) { +Stream.prototype._send = function (err, x, cb) { //console.log(['_send', this.id, err, x]); var token; @@ -697,6 +726,10 @@ Stream.prototype._send = function (err, x) { this.emit('data', x); } } + + if (cb) { + cb(); + } }; /** @@ -749,14 +782,21 @@ Stream.prototype._readFromBuffer = function () { var i = 0; while (i < len && !this.paused) { var x = this._incoming[i]; + var cb = null; + + if (_._isCallbackToken(x)) { + cb = x.cb; + x = x.token; + } + if (_._isStreamError(x)) { - this._send(x.error); + this._send(x.error, null, cb); } else if (_._isStreamRedirect(x)) { this._redirect(x.to); } else { - this._send(null, x); + this._send(null, x, cb); } i++; } @@ -867,7 +907,7 @@ Stream.prototype.end = function () { /** * Pipes a Highland Stream to a [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable) - * (Highland Streams are also Node Writable Streams). This will pull all the + * (Highland Streams can also be used). This will pull all the * data from the source Highland Stream and write it to the destination, * automatically managing flow so that the destination is not overwhelmed * by a fast source. @@ -1113,6 +1153,10 @@ Stream.prototype.consume = function (f) { self = self._delegate; } var s = new Stream(); + + // This has a source. + s._node_writable = void 0; + var _send = s._send; var push = function (err, x) { //console.log(['push', err, x, s.paused]); @@ -1221,7 +1265,12 @@ Stream.prototype.pull = function (f) { * @id write * @section Stream Objects * @name Stream.write(x) - * @param x - the value to write to the Stream + * @param {any} x - the value to write to the Stream + * @param {String} encoding - (optional) Ignored. This parameter is only here to + * match the Writable signature. + * @param {Function} callback - (optional) The callback that will be called + * when this value has been processed (i.e., when it has been passed to the + * next stage of the pipeline). * @api public * * var xs = _(); @@ -1233,26 +1282,82 @@ Stream.prototype.pull = function (f) { * // ys will be [1, 2] * }); * + * var xs2 = _(); + * xs.write(1, null, function () { + * console.log('written'); + * }); + * xs.end(); + * xs.toArray(_.log); // 'written' will be printed here. + * * // Do *not* do this. - * var xs2 = _().toArray(_.log); - * xs2.write(1); // This call is illegal. + * var xs3 = _().toArray(_.log); + * xs3.write(1); // This call is illegal. */ -Stream.prototype.write = function (x) { +Stream.prototype.write = function (x, encoding, cb) { if (this.paused) { - this._incoming.push(x); + if (cb) { + this._incoming.push(new CallbackToken(x, cb)); + } + else { + this._incoming.push(x); + } } else { + if (_._isCallbackToken(x)) { + cb = x.cb; + x = x.token; + } + if (_._isStreamError(x)) { - this._send(x.error); + this._send(x.error, null, cb); } else { - this._send(null, x); + this._send(null, x, cb); } } return !this.paused; }; +/** + * Returns a `objectMode` + * [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable) + * that is linked to this Highland stream. Data written to the Writable will be + * forwarded to this stream. The result can be used when using an API that + * expects a Writable. Multiple calls to this method will return the same + * Writable. + * + * This method can only be used on *writable* Highland streams (i.e., streams created + * with no source). It will throw when called on non-writable stream. + * + * @id asNodeWritable + * @section Stream Objects + * @name Stream.asNodeWritable() + * @api public + * + * var xs = _(); + * + * xs.toArray(_.log); + * + * var writable = xs.asNodeWritable(); + * writable.write(1); + * writable.write(2); + * writable.end(3); + * // => [1, 2, 3] + */ + +Stream.prototype.asNodeWritable = function asNodeWritable() { + if (_.isUndefined(this._node_writable)) { + throw new Error('This stream is not writable'); + } + + if (!this._node_writable) { + this._node_writable = new HighlandNodeWritable(this); + } + + return this._node_writable; +}; + /** * Forks a stream, allowing you to add additional consumers with shared * back-pressure. A stream forked to multiple consumers will only pull values diff --git a/lib/node-writable.js b/lib/node-writable.js new file mode 100644 index 0000000..746faa0 --- /dev/null +++ b/lib/node-writable.js @@ -0,0 +1,21 @@ +var Writable = require('stream').Writable, + inherits = require('util').inherits; + +function HighlandNodeWritable(stream) { + Writable.call(this, { + objectMode: true, + decodeStrings: false, + highWaterMark: 0 + }); + + this._stream = stream; + this.on('finish', stream.end.bind(stream)); +} + +inherits(HighlandNodeWritable, Writable); + +HighlandNodeWritable.prototype._write = function _write(chunk, encoding, cb) { + this._stream.write(chunk, encoding, cb); +}; + +module.exports = HighlandNodeWritable; diff --git a/test/test.js b/test/test.js index c7f0d59..5eab8af 100755 --- a/test/test.js +++ b/test/test.js @@ -610,6 +610,23 @@ exports['write when not paused sends to consumer'] = function (test) { test.done(); }; +exports['write - callbacks called when value emitted'] = function (test) { + test.expect(4); + var s = _(); + var cbFired = false; + + s.write(1, null, function () { + cbFired = true; + }); + s.end(); + + test.ok(!cbFired, 'The callback fired before the value was consumed.'); + s.pull(valueEquals(test, 1)); + test.ok(cbFired, 'The callback was never fired.'); + s.pull(valueEquals(test, _.nil)); + test.done(); +}; + exports['buffered incoming data released on resume'] = function (test) { var vals = []; var s1 = _(); @@ -767,6 +784,62 @@ exports['consume throws error if next called after nil'] = function (test) { test.done(); }; +exports['asNodeWritable'] = { + 'writable forwards data': function (test) { + test.expect(1); + var s = _(); + s.toArray(function (xs) { + test.same(xs, [1, 2, 3]); + test.done(); + }); + + var writable = s.asNodeWritable(); + writable.write(1); + writable.write(2); + writable.end(3); + }, + 'same writable returned every time': function (test) { + test.expect(1); + var s = _(); + test.strictEqual(s.asNodeWritable(), s.asNodeWritable()); + test.done(); + }, + 'throws if not writable': function (test) { + test.expect(1); + var s = _().map(_.add(1)); + test.throws(s.asNodeWritable.bind(s)); + test.done(); + }, + 'pipes to writable works': function (test) { + test.expect(2); + var s = _(); + streamify([1, 2, 3]).pipe(s.asNodeWritable()); + test.same(s._incoming.length, 0); + + s.toArray(function (xs) { + test.same(xs, [1, 2, 3]); + test.done(); + }); + }, + 'callbacks fired when value emitted': function (test) { + test.expect(4); + var cbFired = false; + var s = _(); + var writable = s.asNodeWritable(); + + writable.write(1, null, function () { + cbFired = true; + }); + writable.end(); + + test.ok(!cbFired, 'The callback fired before the value was consumed.'); + s.pull(valueEquals(test, 1)); + test.ok(cbFired, 'The callback was never fired.'); + s.pull(valueEquals(test, _.nil)); + test.done(); + } +}; + exports['errors'] = function (test) { var errs = []; var err1 = new Error('one');