diff --git a/src/index.js b/src/index.js index 1ac3423..937f0ba 100644 --- a/src/index.js +++ b/src/index.js @@ -720,6 +720,107 @@ class JanusAdapter { }; } + async createListener(occupantId, maxRetries = 5) { + if (this.availableOccupants.indexOf(occupantId) === -1) { + console.warn(occupantId + ": cancelled occupant connection, occupant left before subscription negotation."); + return null; + } + + var handle = new mj.JanusPluginHandle(this.session); + var conn = new RTCPeerConnection(this.peerConnectionConfig || DEFAULT_PEER_CONNECTION_CONFIG); + + debug(occupantId + ": sub waiting for sfu"); + await handle.attach("janus.plugin.sfu"); + + this.associate(conn, handle); + + debug(occupantId + ": sub waiting for join"); + + if (this.availableOccupants.indexOf(occupantId) === -1) { + conn.close(); + console.warn(occupantId + ": cancelled occupant connection, occupant left after attach"); + return null; + } + + let webrtcFailed = false; + + const webrtcup = new Promise(resolve => { + const leftInterval = setInterval(() => { + if (this.availableOccupants.indexOf(occupantId) === -1) { + clearInterval(leftInterval); + resolve(); + } + }, 1000); + + const timeout = setTimeout(() => { + clearInterval(leftInterval); + webrtcFailed = true; + resolve(); + }, SUBSCRIBE_TIMEOUT_MS); + + handle.on("webrtcup", () => { + clearTimeout(timeout); + clearInterval(leftInterval); + resolve(); + }); + }); + + // Send subscribe message to janus. Don't listen for join/leave messages. Subscribe to the occupant's media. + // Janus should send us an offer for this occupant's media in response to this. + await this.sendSubscribe(handle, { media: occupantId }); + + if (this.availableOccupants.indexOf(occupantId) === -1) { + conn.close(); + console.warn(occupantId + ": cancelled occupant connection, occupant left after join"); + return null; + } + + debug(occupantId + ": sub waiting for webrtcup"); + await webrtcup; + + if (this.availableOccupants.indexOf(occupantId) === -1) { + conn.close(); + console.warn(occupantId + ": cancel occupant connection, occupant left during or after webrtcup"); + return null; + } + + if (webrtcFailed) { + conn.close(); + if (maxRetries > 0) { + console.warn(occupantId + ": webrtc up timed out, retrying"); + return this.createSubscriber(occupantId, maxRetries - 1); + } else { + console.warn(occupantId + ": webrtc up timed out"); + return null; + } + } + + if (isSafari && !this._iOSHackDelayedInitialPeer) { + // HACK: the first peer on Safari during page load can fail to work if we don't + // wait some time before continuing here. See: https://github.com/mozilla/hubs/pull/1692 + await (new Promise((resolve) => setTimeout(resolve, 3000))); + this._iOSHackDelayedInitialPeer = true; + } + + var mediaStream = new MediaStream(); + var receivers = conn.getReceivers(); + receivers.forEach(receiver => { + if (receiver.track) { + mediaStream.addTrack(receiver.track); + } + }); + if (mediaStream.getTracks().length === 0) { + mediaStream = null; + } + + debug(occupantId + ": subscriber ready"); + return { + handle, + mediaStream, + conn + }; + } + sendJoin(handle, subscribe) { return handle.sendMessage({ kind: "join", @@ -730,6 +831,13 @@ class JanusAdapter { }); } + sendSubscribe(handle, subscribe) { + return handle.sendMessage({ + kind: "subscribe", + what: subscribe, + }); + } + toggleFreeze() { if (this.frozen) { this.unfreeze();