diff --git a/dist/streams.js b/dist/streams.js index c9d6ef9b..1b031b76 100644 --- a/dist/streams.js +++ b/dist/streams.js @@ -8,10 +8,7 @@ const finisherByte = 59; // ; class MetacomChunk { static encode(streamId, payload) { const metadata = encoder.encode(`mc:${streamId};`); - const byteView = new Uint8Array(metadata.length + payload.length); - byteView.set(metadata); - byteView.set(payload, metadata.length); - return byteView; + return this.merge(metadata, payload); } static decode(byteView) { @@ -32,12 +29,20 @@ class MetacomChunk { } throw new Error('Invalid chunk metadata: ' + metadata); } + + static merge(ch1, ch2) { + const result = new Uint8Array(ch1.length + ch2.length); + result.set(ch1); + result.set(ch2, ch1.length); + return result; + } } const PUSH_EVENT = Symbol(); const PULL_EVENT = Symbol(); const DEFAULT_HIGH_WATER_MARK = 32; const MAX_HIGH_WATER_MARK = 1000; +const MIN_CHUNK_SIZE = 1024 * 1024; class MetacomReadable extends EventEmitter { constructor(initData, options = {}) { @@ -155,6 +160,7 @@ class MetacomWritable extends EventEmitter { this.streamId = initData.streamId; this.name = initData.name; this.size = initData.size; + this.chunk = null; this.init(); } @@ -168,11 +174,18 @@ class MetacomWritable extends EventEmitter { } write(data) { - const chunk = MetacomChunk.encode(this.streamId, data); + this.chunk = this.chunk ? MetacomChunk.merge(this.chunk, data) : data; + if (this.chunk.length >= MIN_CHUNK_SIZE) this.drain(); + } + + drain() { + const chunk = MetacomChunk.encode(this.streamId, this.chunk); this.transport.send(chunk); + this.chunk = null; } end() { + if (this.chunk) this.drain(); const packet = { stream: this.streamId, status: 'end' }; this.transport.send(JSON.stringify(packet)); } diff --git a/lib/streams.js b/lib/streams.js index 69168558..e69b4edd 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -16,10 +16,7 @@ const finisherByte = 59; // ; class MetacomChunk { static encode(streamId, payload) { const metadata = encoder.encode(`mc:${streamId};`); - const byteView = new Uint8Array(metadata.length + payload.length); - byteView.set(metadata); - byteView.set(payload, metadata.length); - return byteView; + return this.merge(metadata, payload); } static decode(byteView) { @@ -40,12 +37,20 @@ class MetacomChunk { } throw new Error('Invalid chunk metadata: ' + metadata); } + + static merge(ch1, ch2) { + const result = new Uint8Array(ch1.length + ch2.length); + result.set(ch1); + result.set(ch2, ch1.length); + return result; + } } const PUSH_EVENT = Symbol(); const PULL_EVENT = Symbol(); const DEFAULT_HIGH_WATER_MARK = 32; const MAX_HIGH_WATER_MARK = 1000; +const MIN_CHUNK_SIZE = 1024 * 1024; class MetacomReadable extends EventEmitter { constructor(initData, options = {}) { @@ -164,6 +169,7 @@ class MetacomWritable extends EventEmitter { this.streamId = initData.streamId; this.name = initData.name; this.size = initData.size; + this.chunk = null; this.init(); } @@ -178,13 +184,20 @@ class MetacomWritable extends EventEmitter { // implements nodejs writable write method write(data) { - const chunk = MetacomChunk.encode(this.streamId, data); - this.transport.send(chunk); + this.chunk = this.chunk ? MetacomChunk.merge(this.chunk, data) : data; + if (this.chunk.length >= MIN_CHUNK_SIZE) this.drain(); return true; } + drain() { + const chunk = MetacomChunk.encode(this.streamId, this.chunk); + this.transport.send(chunk); + this.chunk = null; + } + // implements nodejs writable end method end() { + if (this.chunk) this.drain(); const packet = { stream: this.streamId, status: 'end' }; this.transport.send(JSON.stringify(packet)); }