Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulate chunks until min size (1Mb) is reached #332

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

## [Unreleased][unreleased]

- Add metacom binary streams
- Event emitter based
- Websocket bidirectional streaming
- Multiple simultaneous streams
- Interaction with nodejs streams
- File streaming
- Implement metacom stream protocol
- Stream initialization
- Stream closing
- Stream termination
- Chunk identification with metadata header
- Api interfaces
- Stream consumers
- Stream producers
- Add stream types
- Fix browser client WebsocketTransport open
- Add method for a possibility to delete session token from the application routes

Expand Down
39 changes: 35 additions & 4 deletions dist/events.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
export class EventEmitter {
const warnAboutMemoryLeak = (eventName, count) =>
console.warn(
`Possible EventEmitter memory leak detected.
${count} listeners added.
You have to decrease the number of listeners for '${eventName}' event.
Hint: avoid adding listeners in loops.`,
);

export default class EventEmitter {
constructor() {
this.events = new Map();
this.maxListenersCount = 10;
}

getMaxListeners() {
return this.maxListenersCount;
}

listenerCount(name) {
const event = this.events.get(name);
if (event) return event.size;
return 0;
}

on(name, fn) {
const event = this.events.get(name);
if (event) event.add(fn);
else this.events.set(name, new Set([fn]));
if (event) {
event.add(fn);
const tooManyListeners = event.size > this.maxListenersCount;
if (tooManyListeners) warnAboutMemoryLeak(name, event.size);
} else {
this.events.set(name, new Set([fn]));
}
}

once(name, fn) {
const dispose = (...args) => {
this.remove(name, dispose);
return fn(...args);
};
this.on(name, dispose);
}

emit(name, ...args) {
Expand All @@ -22,7 +54,6 @@ export class EventEmitter {
if (!event) return;
if (event.has(fn)) {
event.delete(fn);
return;
}
}

Expand Down
92 changes: 67 additions & 25 deletions dist/metacom.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EventEmitter } from './events.js';
import EventEmitter from './events.js';
import { MetacomChunk, MetacomReadable, MetacomWritable } from './streams.js';

const CALL_TIMEOUT = 7 * 1000;
const PING_INTERVAL = 60 * 1000;
Expand Down Expand Up @@ -34,6 +35,7 @@ export class Metacom extends EventEmitter {
this.callId = 0;
this.calls = new Map();
this.streams = new Map();
this.streamId = 0;
this.active = false;
this.connected = false;
this.opening = null;
Expand All @@ -50,7 +52,37 @@ export class Metacom extends EventEmitter {
return new Transport(url, options);
}

message(data) {
getStream(streamId) {
const stream = this.streams.get(streamId);
if (stream) return stream;
throw new Error(`Stream ${streamId} is not initialized`);
}

createStream(name, size) {
const streamId = ++this.streamId;
const initData = { streamId, name, size };
const transport = this;
return new MetacomWritable(transport, initData);
}

createBlobUploader(blob) {
const name = blob.name || 'blob';
const size = blob.size;
const consumer = this.createStream(name, size);
return {
streamId: consumer.streamId,
upload: async () => {
const reader = blob.stream().getReader();
let chunk;
while (!(chunk = await reader.read()).done) {
consumer.write(chunk.value);
}
consumer.end();
},
};
}

async message(data) {
if (data === '{}') return;
this.lastActivity = new Date().getTime();
let packet;
Expand All @@ -73,33 +105,45 @@ export class Metacom extends EventEmitter {
return;
}
resolve(args);
return;
}
if (callType === 'event') {
} else if (callType === 'event') {
const [interfaceName, eventName] = target.split('/');
const metacomInterface = this.api[interfaceName];
metacomInterface.emit(eventName, args);
}
if (callType === 'stream') {
const { name, size, status } = packet;
if (name) {
const stream = { name, size, chunks: [], received: 0 };
this.streams.set(callId, stream);
return;
}
const stream = this.streams.get(callId);
if (status) {
this.streams.delete(callId);
const blob = new Blob(stream.chunks);
blob.text().then((text) => {
console.log({ text });
});
return;
} else if (callType === 'stream') {
const { stream: streamId, name, size, status } = packet;
const stream = this.streams.get(streamId);
if (name && typeof name === 'string' && Number.isSafeInteger(size)) {
if (stream) {
console.error(new Error(`Stream ${name} is already initialized`));
} else {
const streamData = { streamId, name, size };
const stream = new MetacomReadable(streamData);
this.streams.set(streamId, stream);
}
} else if (!stream) {
console.error(new Error(`Stream ${streamId} is not initialized`));
} else if (status === 'end') {
await stream.close();
this.streams.delete(streamId);
} else if (status === 'terminate') {
await stream.terminate();
this.streams.delete(streamId);
} else {
console.error(new Error('Stream packet structure error'));
}
}
}
}

async binary(blob) {
const buffer = await blob.arrayBuffer();
const byteView = new Uint8Array(buffer);
const { streamId, payload } = MetacomChunk.decode(byteView);
const stream = this.streams.get(streamId);
if (stream) await stream.push(payload);
else console.warn(`Stream ${streamId} is not initialized`);
}

async load(...interfaces) {
const introspect = this.scaffold('system')('introspect');
const introspection = await introspect(interfaces);
Expand Down Expand Up @@ -150,10 +194,8 @@ class WebsocketTransport extends Metacom {
connections.add(this);

socket.addEventListener('message', ({ data }) => {
if (typeof data === 'string') {
this.message(data);
return;
}
if (typeof data === 'string') this.message(data);
else this.binary(data);
});

socket.addEventListener('close', () => {
Expand Down
Loading