Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add createListener with the subscribe message #9

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,107 @@ class JanusAdapter {
};
}

async createListener(occupantId, maxRetries = 5) {
if (this.availableOccupants.indexOf(occupantId) === -1) {
console.warn(occupantId + ": cancelled occupant connection, occupant left before subscription negotation.");
return null;
}

var handle = new mj.JanusPluginHandle(this.session);
var conn = new RTCPeerConnection(this.peerConnectionConfig || DEFAULT_PEER_CONNECTION_CONFIG);

debug(occupantId + ": sub waiting for sfu");
await handle.attach("janus.plugin.sfu");

this.associate(conn, handle);

debug(occupantId + ": sub waiting for join");

if (this.availableOccupants.indexOf(occupantId) === -1) {
conn.close();
console.warn(occupantId + ": cancelled occupant connection, occupant left after attach");
return null;
}

let webrtcFailed = false;

const webrtcup = new Promise(resolve => {
const leftInterval = setInterval(() => {
if (this.availableOccupants.indexOf(occupantId) === -1) {
clearInterval(leftInterval);
resolve();
}
}, 1000);

const timeout = setTimeout(() => {
clearInterval(leftInterval);
webrtcFailed = true;
resolve();
}, SUBSCRIBE_TIMEOUT_MS);

handle.on("webrtcup", () => {
clearTimeout(timeout);
clearInterval(leftInterval);
resolve();
});
});

// Send subscribe message to janus. Don't listen for join/leave messages. Subscribe to the occupant's media.
// Janus should send us an offer for this occupant's media in response to this.
await this.sendSubscribe(handle, { media: occupantId });

if (this.availableOccupants.indexOf(occupantId) === -1) {
conn.close();
console.warn(occupantId + ": cancelled occupant connection, occupant left after join");
return null;
}

debug(occupantId + ": sub waiting for webrtcup");
await webrtcup;

if (this.availableOccupants.indexOf(occupantId) === -1) {
conn.close();
console.warn(occupantId + ": cancel occupant connection, occupant left during or after webrtcup");
return null;
}

if (webrtcFailed) {
conn.close();
if (maxRetries > 0) {
console.warn(occupantId + ": webrtc up timed out, retrying");
return this.createSubscriber(occupantId, maxRetries - 1);
} else {
console.warn(occupantId + ": webrtc up timed out");
return null;
}
}

if (isSafari && !this._iOSHackDelayedInitialPeer) {
// HACK: the first peer on Safari during page load can fail to work if we don't
// wait some time before continuing here. See: https://github.com/mozilla/hubs/pull/1692
await (new Promise((resolve) => setTimeout(resolve, 3000)));
this._iOSHackDelayedInitialPeer = true;
}

var mediaStream = new MediaStream();
var receivers = conn.getReceivers();
receivers.forEach(receiver => {
if (receiver.track) {
mediaStream.addTrack(receiver.track);
}
});
if (mediaStream.getTracks().length === 0) {
mediaStream = null;
}

debug(occupantId + ": subscriber ready");
return {
handle,
mediaStream,
conn
};
}

sendJoin(handle, subscribe) {
return handle.sendMessage({
kind: "join",
Expand All @@ -730,6 +831,13 @@ class JanusAdapter {
});
}

sendSubscribe(handle, subscribe) {
return handle.sendMessage({
kind: "subscribe",
what: subscribe,
});
}

toggleFreeze() {
if (this.frozen) {
this.unfreeze();
Expand Down