-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement asNodeWritable. #359
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,12 +11,25 @@ | |
var inherits = require('util').inherits; | ||
var EventEmitter = require('events').EventEmitter; | ||
var Decoder = require('string_decoder').StringDecoder; | ||
var HighlandNodeWritable = require('./node-writable.js'); | ||
|
||
/** | ||
* The Stream constructor, accepts an array of values or a generator function | ||
* as an optional argument. This is typically the entry point to the Highland | ||
* APIs, providing a convenient way of chaining calls together. | ||
* | ||
* **null -** Not specifying a source creates a *writable* Stream. These | ||
* Streams are useful for interop with libraries that expect a | ||
* [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable) | ||
* (via [asNodeWritable](#asNodeWritable)). They can also be directly written | ||
* to via the [write](#write) method. | ||
* | ||
* While these streams implement `write` and you can generally pipe a | ||
* [Node Readable Stream](https://nodejs.org/api/stream.html#stream_class_stream_readable) | ||
* through it, it is generally recommended to use the result of | ||
* [asNodeWritable](#asNodeWritable) when you need a Writable, as Highland | ||
* Streams do not implement the full Writable API. | ||
* | ||
* **Arrays -** Streams created from Arrays will emit each value of the Array | ||
* and then emit a [nil](#nil) value to signal the end of the Stream. | ||
* | ||
|
@@ -513,7 +526,9 @@ function Stream(/*optional*/xs, /*optional*/ee, /*optional*/mappingHint) { | |
}); | ||
|
||
if (_.isUndefined(xs)) { | ||
// nothing else to do | ||
// The existance of this property signifies a | ||
// writable stream. | ||
this._node_writable = null; | ||
return this; | ||
} | ||
else if (_.isArray(xs)) { | ||
|
@@ -628,6 +643,16 @@ function StreamError(err) { | |
this.error = err; | ||
} | ||
|
||
/** | ||
* Used to hold callbacks when writing to a Stream's incoming buffer | ||
*/ | ||
|
||
function CallbackToken(token, cb) { | ||
this.__HighlandCallbackToken__ = true; | ||
this.token = token; | ||
this.cb = cb; | ||
} | ||
|
||
/** | ||
* Used as a Redirect marker when writing to a Stream's incoming buffer | ||
*/ | ||
|
@@ -659,6 +684,10 @@ _._isStreamError = function (x) { | |
return _.isObject(x) && !!x.__HighlandStreamError__; | ||
}; | ||
|
||
_._isCallbackToken = function (x) { | ||
return _.isObject(x) && !!x.__HighlandCallbackToken__; | ||
}; | ||
|
||
_._isStreamRedirect = function (x) { | ||
return _.isObject(x) && !!x.__HighlandStreamRedirect__; | ||
}; | ||
|
@@ -667,7 +696,7 @@ _._isStreamRedirect = function (x) { | |
* Sends errors / data to consumers, observers and event handlers | ||
*/ | ||
|
||
Stream.prototype._send = function (err, x) { | ||
Stream.prototype._send = function (err, x, cb) { | ||
//console.log(['_send', this.id, err, x]); | ||
var token; | ||
|
||
|
@@ -697,6 +726,10 @@ Stream.prototype._send = function (err, x) { | |
this.emit('data', x); | ||
} | ||
} | ||
|
||
if (cb) { | ||
cb(); | ||
} | ||
}; | ||
|
||
/** | ||
|
@@ -749,14 +782,21 @@ Stream.prototype._readFromBuffer = function () { | |
var i = 0; | ||
while (i < len && !this.paused) { | ||
var x = this._incoming[i]; | ||
var cb = null; | ||
|
||
if (_._isCallbackToken(x)) { | ||
cb = x.cb; | ||
x = x.token; | ||
} | ||
|
||
if (_._isStreamError(x)) { | ||
this._send(x.error); | ||
this._send(x.error, null, cb); | ||
} | ||
else if (_._isStreamRedirect(x)) { | ||
this._redirect(x.to); | ||
} | ||
else { | ||
this._send(null, x); | ||
this._send(null, x, cb); | ||
} | ||
i++; | ||
} | ||
|
@@ -867,7 +907,7 @@ Stream.prototype.end = function () { | |
|
||
/** | ||
* Pipes a Highland Stream to a [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable) | ||
* (Highland Streams are also Node Writable Streams). This will pull all the | ||
* (Highland Streams can also be used). This will pull all the | ||
* data from the source Highland Stream and write it to the destination, | ||
* automatically managing flow so that the destination is not overwhelmed | ||
* by a fast source. | ||
|
@@ -1113,6 +1153,10 @@ Stream.prototype.consume = function (f) { | |
self = self._delegate; | ||
} | ||
var s = new Stream(); | ||
|
||
// This has a source. | ||
s._node_writable = void 0; | ||
|
||
var _send = s._send; | ||
var push = function (err, x) { | ||
//console.log(['push', err, x, s.paused]); | ||
|
@@ -1221,7 +1265,12 @@ Stream.prototype.pull = function (f) { | |
* @id write | ||
* @section Stream Objects | ||
* @name Stream.write(x) | ||
* @param x - the value to write to the Stream | ||
* @param {any} x - the value to write to the Stream | ||
* @param {String} encoding - (optional) Ignored. This parameter is only here to | ||
* match the Writable signature. | ||
* @param {Function} callback - (optional) The callback that will be called | ||
* when this value has been processed (i.e., when it has been passed to the | ||
* next stage of the pipeline). | ||
* @api public | ||
* | ||
* var xs = _(); | ||
|
@@ -1233,26 +1282,82 @@ Stream.prototype.pull = function (f) { | |
* // ys will be [1, 2] | ||
* }); | ||
* | ||
* var xs2 = _(); | ||
* xs.write(1, null, function () { | ||
* console.log('written'); | ||
* }); | ||
* xs.end(); | ||
* xs.toArray(_.log); // 'written' will be printed here. | ||
* | ||
* // Do *not* do this. | ||
* var xs2 = _().toArray(_.log); | ||
* xs2.write(1); // This call is illegal. | ||
* var xs3 = _().toArray(_.log); | ||
* xs3.write(1); // This call is illegal. | ||
*/ | ||
|
||
Stream.prototype.write = function (x) { | ||
Stream.prototype.write = function (x, encoding, cb) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you could make encoding optional:
|
||
if (this.paused) { | ||
this._incoming.push(x); | ||
if (cb) { | ||
this._incoming.push(new CallbackToken(x, cb)); | ||
} | ||
else { | ||
this._incoming.push(x); | ||
} | ||
} | ||
else { | ||
if (_._isCallbackToken(x)) { | ||
cb = x.cb; | ||
x = x.token; | ||
} | ||
|
||
if (_._isStreamError(x)) { | ||
this._send(x.error); | ||
this._send(x.error, null, cb); | ||
} | ||
else { | ||
this._send(null, x); | ||
this._send(null, x, cb); | ||
} | ||
} | ||
return !this.paused; | ||
}; | ||
|
||
/** | ||
* Returns a `objectMode` | ||
* [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable) | ||
* that is linked to this Highland stream. Data written to the Writable will be | ||
* forwarded to this stream. The result can be used when using an API that | ||
* expects a Writable. Multiple calls to this method will return the same | ||
* Writable. | ||
* | ||
* This method can only be used on *writable* Highland streams (i.e., streams created | ||
* with no source). It will throw when called on non-writable stream. | ||
* | ||
* @id asNodeWritable | ||
* @section Stream Objects | ||
* @name Stream.asNodeWritable() | ||
* @api public | ||
* | ||
* var xs = _(); | ||
* | ||
* xs.toArray(_.log); | ||
* | ||
* var writable = xs.asNodeWritable(); | ||
* writable.write(1); | ||
* writable.write(2); | ||
* writable.end(3); | ||
* // => [1, 2, 3] | ||
*/ | ||
|
||
Stream.prototype.asNodeWritable = function asNodeWritable() { | ||
if (_.isUndefined(this._node_writable)) { | ||
throw new Error('This stream is not writable'); | ||
} | ||
|
||
if (!this._node_writable) { | ||
this._node_writable = new HighlandNodeWritable(this); | ||
} | ||
|
||
return this._node_writable; | ||
}; | ||
|
||
/** | ||
* Forks a stream, allowing you to add additional consumers with shared | ||
* back-pressure. A stream forked to multiple consumers will only pull values | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
var Writable = require('stream').Writable, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This won't be compatible with pre-0.10 Nodes, do we support those? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're not running any of our tests on Node 0.8 so I guess by virtue of that we don't. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once upon a time we did (back in 1.x according to git history). I think we dropped it because one of our If you're concerned we could make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, there's always readable-stream, but I'm not too concerned. I think we're fine ignoring old Nodes, as long as we make it clear what we support. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. We'll keep using |
||
inherits = require('util').inherits; | ||
|
||
function HighlandNodeWritable(stream) { | ||
Writable.call(this, { | ||
objectMode: true, | ||
decodeStrings: false, | ||
highWaterMark: 0 | ||
}); | ||
|
||
this._stream = stream; | ||
this.on('finish', stream.end.bind(stream)); | ||
} | ||
|
||
inherits(HighlandNodeWritable, Writable); | ||
|
||
HighlandNodeWritable.prototype._write = function _write(chunk, encoding, cb) { | ||
this._stream.write(chunk, encoding, cb); | ||
}; | ||
|
||
module.exports = HighlandNodeWritable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(suggested edit -- see below): If it is callable, it is taken as the callback argument.