Skip to content

Commit

Permalink
fix: the readable stream was blocking the entire event queue...
Browse files Browse the repository at this point in the history
  • Loading branch information
birme committed Jul 3, 2020
1 parent bee98af commit b4f6a98
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
4 changes: 4 additions & 0 deletions examples/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ const fs = require('fs');
const dest = fs.createWriteStream('./output');
const { SRTReadStream } = require('../index.js');

setInterval(() => {
console.log("Hey, I am still alive!");
}, 2000);

if (process.argv[2] === "listener") {
const srt = new SRTReadStream('0.0.0.0', 1234);
srt.listen(readStream => {
Expand Down
27 changes: 16 additions & 11 deletions src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class SRTReadStream extends Readable {
this.socket = this.srt.createSocket();
this.address = address;
this.port = port;
this.readTimer = null;
}

listen(cb) {
Expand All @@ -36,6 +37,7 @@ class SRTReadStream extends Readable {
if (status === LIB.SRT.SRTS_BROKEN || status === LIB.SRT.SRTS_NONEXIST || status === LIB.SRT.SRTS_CLOSED) {
debug("Client disconnected");
this.srt.close(event.socket);
this.push(null);
this.emit('end');
} else if (event.socket === this.socket) {
const fhandle = this.srt.accept(this.socket);
Expand All @@ -59,19 +61,22 @@ class SRTReadStream extends Readable {
}
}

_read(size) {
let chunk;
try {
chunk = this.srt.read(this.fd, size);
_readStart(fd, size) {
this.readTimer = setInterval(() => {
let chunk = this.srt.read(fd, size);
debug(`Read chunk ${chunk.length}`);
while (!this.push(chunk)) {
debug(`Read chunk ${chunk.length}`);
chunk = this.srt.read(this.fd, size);
if (!this.push(chunk)) {
this._readStop();
}
} catch (exc) {
debug(exc.message);
this.push(null);
}
}, 100);
}

_readStop() {
clearInterval(this.readTimer);
}

_read(size) {
this._readStart(this.fd, size);
}
}

Expand Down

0 comments on commit b4f6a98

Please sign in to comment.