Skip to content

Commit

Permalink
accumulate chunks until min size (1Mb) is reached
Browse files Browse the repository at this point in the history
Closes: metarhia#326
  • Loading branch information
kramarmm committed Jul 18, 2022
1 parent 739dcda commit baf2d22
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
23 changes: 18 additions & 5 deletions dist/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ const finisherByte = 59; // ;
class MetacomChunk {
static encode(streamId, payload) {
const metadata = encoder.encode(`mc:${streamId};`);
const byteView = new Uint8Array(metadata.length + payload.length);
byteView.set(metadata);
byteView.set(payload, metadata.length);
return byteView;
return this.merge(metadata, payload);
}

static decode(byteView) {
Expand All @@ -32,12 +29,20 @@ class MetacomChunk {
}
throw new Error('Invalid chunk metadata: ' + metadata);
}

static merge(ch1, ch2) {
const result = new Uint8Array(ch1.length + ch2.length);
result.set(ch1);
result.set(ch2, ch1.length);
return result;
}
}

const PUSH_EVENT = Symbol();
const PULL_EVENT = Symbol();
const DEFAULT_HIGH_WATER_MARK = 32;
const MAX_HIGH_WATER_MARK = 1000;
const MIN_CHUNK_SIZE = 1024 * 1024;

class MetacomReadable extends EventEmitter {
constructor(initData, options = {}) {
Expand Down Expand Up @@ -155,6 +160,7 @@ class MetacomWritable extends EventEmitter {
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.chunk = null;
this.init();
}

Expand All @@ -168,11 +174,18 @@ class MetacomWritable extends EventEmitter {
}

write(data) {
const chunk = MetacomChunk.encode(this.streamId, data);
this.chunk = this.chunk ? MetacomChunk.merge(this.chunk, data) : data;
if (this.chunk.length >= MIN_CHUNK_SIZE) this.drain();
}

drain() {
const chunk = MetacomChunk.encode(this.streamId, this.chunk);
this.transport.send(chunk);
this.chunk = null;
}

end() {
if (this.chunk) this.drain();
const packet = { stream: this.streamId, status: 'end' };
this.transport.send(JSON.stringify(packet));
}
Expand Down
25 changes: 19 additions & 6 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ const finisherByte = 59; // ;
class MetacomChunk {
static encode(streamId, payload) {
const metadata = encoder.encode(`mc:${streamId};`);
const byteView = new Uint8Array(metadata.length + payload.length);
byteView.set(metadata);
byteView.set(payload, metadata.length);
return byteView;
return this.merge(metadata, payload);
}

static decode(byteView) {
Expand All @@ -40,12 +37,20 @@ class MetacomChunk {
}
throw new Error('Invalid chunk metadata: ' + metadata);
}

static merge(ch1, ch2) {
const result = new Uint8Array(ch1.length + ch2.length);
result.set(ch1);
result.set(ch2, ch1.length);
return result;
}
}

const PUSH_EVENT = Symbol();
const PULL_EVENT = Symbol();
const DEFAULT_HIGH_WATER_MARK = 32;
const MAX_HIGH_WATER_MARK = 1000;
const MIN_CHUNK_SIZE = 1024 * 1024;

class MetacomReadable extends EventEmitter {
constructor(initData, options = {}) {
Expand Down Expand Up @@ -164,6 +169,7 @@ class MetacomWritable extends EventEmitter {
this.streamId = initData.streamId;
this.name = initData.name;
this.size = initData.size;
this.chunk = null;
this.init();
}

Expand All @@ -178,13 +184,20 @@ class MetacomWritable extends EventEmitter {

// implements nodejs writable write method
write(data) {
const chunk = MetacomChunk.encode(this.streamId, data);
this.transport.send(chunk);
this.chunk = this.chunk ? MetacomChunk.merge(this.chunk, data) : data;
if (this.chunk.length >= MIN_CHUNK_SIZE) this.drain();
return true;
}

drain() {
const chunk = MetacomChunk.encode(this.streamId, this.chunk);
this.transport.send(chunk);
this.chunk = null;
}

// implements nodejs writable end method
end() {
if (this.chunk) this.drain();
const packet = { stream: this.streamId, status: 'end' };
this.transport.send(JSON.stringify(packet));
}
Expand Down

0 comments on commit baf2d22

Please sign in to comment.