Skip to content

Commit

Permalink
Implemented writable streams interface
Browse files Browse the repository at this point in the history
  • Loading branch information
birme committed Jun 22, 2020
1 parent ec9854e commit ef94263
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 6 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ if (fd) {

```
class SRT {
createSocket(): socket:Number
createSocket(sender?:Boolean): socket:Number
bind(socket:Number, address:String, port:Number): result:Number
listen(socket:Number, backlog:Number): result:Number
connect(socket:Number, host:String, port:Number): result:Number
Expand Down Expand Up @@ -66,6 +66,21 @@ srt.connect(readStream => {
});
```

### Writable Stream

Example of a writable stream

```
const fs = require('fs');
const source = fs.createReadStream('./input');
const { SRTWriteStream } = require('@eyevinn/srt');
const srt = new SRTWriteStream('127.0.0.1', 1234);
srt.connect(writeStream => {
source.pipe(writeStream);
});
```

## [Contributing](CONTRIBUTING.md)

In addition to contributing code, you can help to triage issues. This can include reproducing bug reports, or asking for vital information such as version numbers or reproduction instructions.
Expand Down
8 changes: 8 additions & 0 deletions examples/writable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const fs = require('fs');
const source = fs.createReadStream(process.argv[2], { highWaterMark: 1316 });
const { SRTWriteStream } = require('../index.js');

const srt = new SRTWriteStream('127.0.0.1', 1234);
srt.connect(writeStream => {
source.pipe(writeStream);
});
5 changes: 3 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const LIB = require('./build/Release/node_srt.node');
const Server = require('./src/server.js');
const { SRTReadStream } = require('./src/stream.js');
const { SRTReadStream, SRTWriteStream } = require('./src/stream.js');

module.exports = {
SRT: LIB.SRT,
Server: Server,
SRTReadStream
SRTReadStream,
SRTWriteStream
}
9 changes: 9 additions & 0 deletions src/node-srt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ Napi::Value NodeSRT::CreateSocket(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);

Napi::Boolean isSender = Napi::Boolean::New(env, false);
if (info.Length() > 0) {
isSender = info[0].As<Napi::Boolean>();
}

SRTSOCKET socket = srt_socket(AF_INET, SOCK_DGRAM, 0);
if (socket == SRT_ERROR) {
Napi::Error::New(env, srt_getlasterror_str()).ThrowAsJavaScriptException();
return Napi::Number::New(env, SRT_ERROR);
}
if (isSender) {
int yes = 1;
srt_setsockflag(socket, SRTO_SENDER, &yes, sizeof(yes));
}
return Napi::Number::New(env, socket);
}

Expand Down
2 changes: 1 addition & 1 deletion src/node-srt.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ class NodeSRT : public Napi::ObjectWrap<NodeSRT> {
Napi::Value Close(const Napi::CallbackInfo& info);
Napi::Value Read(const Napi::CallbackInfo& info);
Napi::Value Write(const Napi::CallbackInfo& info);
};
};
28 changes: 26 additions & 2 deletions src/stream.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { Readable } = require('stream');
const { Readable, Writable } = require('stream');
const LIB = require('../build/Release/node_srt.node');
const debug = require('debug')('srt-stream');

Expand Down Expand Up @@ -56,6 +56,30 @@ class SRTReadStream extends Readable {
}
}

class SRTWriteStream extends Writable {
constructor(address, port, opts) {
super();
this.srt = new LIB.SRT();
this.socket = this.srt.createSocket();
this.address = address;
this.port = port;
}

connect(cb) {
this.srt.connect(this.socket, this.address, this.port);
this.fd = this.socket;
if (this.fd) {
cb(this);
}
}

_write(chunk, encoding, callback) {
this.srt.write(this.fd, chunk);
callback();
}
}

module.exports = {
SRTReadStream
SRTReadStream,
SRTWriteStream
}

0 comments on commit ef94263

Please sign in to comment.