Skip to content

Commit

Permalink
Merge branch 'master' into fix/issue-6803
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafaboleken committed Nov 28, 2024
2 parents bc07bc7 + 1561586 commit 4c9385e
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 63 deletions.
136 changes: 80 additions & 56 deletions src/main/js/webrtc_adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ export class WebRTCAdaptor {
* This is the time info for the last reconnection attempt
*/
this.lastReconnectiontionTrialTime = 0;

/**
* TimerId for the pending try again call
*/
this.pendingTryAgainTimerId = -1;

/**
* All media management works for teh local stream are made by @MediaManager class.
Expand Down Expand Up @@ -516,15 +521,8 @@ export class WebRTCAdaptor {
}
//init peer connection for reconnectIfRequired
this.initPeerConnection(streamId, "publish");
setTimeout(() => {
//check if it is connected or not
//this resolves if the server responds with some error message
if (this.iceConnectionState(this.publishStreamId) != "checking" && this.iceConnectionState(this.publishStreamId) != "connected" && this.iceConnectionState(this.publishStreamId) != "completed") {
//if it is not connected, try to reconnect
this.reconnectIfRequired(0);
}
}, 3000);


this.reconnectIfRequired(3000, false);
}

sendPublishCommand(streamId, token, subscriberId, subscriberCode, streamName, mainTrack, metaData, role, videoEnabled, audioEnabled) {
Expand Down Expand Up @@ -605,43 +603,44 @@ export class WebRTCAdaptor {

//init peer connection for reconnectIfRequired
this.initPeerConnection(streamId, "play");

setTimeout(() => {
//check if it is connected or not
//this resolves if the server responds with some error message
if (this.iceConnectionState(streamId) != "checking" &&
this.iceConnectionState(streamId) != "connected" &&
this.iceConnectionState(streamId) != "completed") {
//if it is not connected, try to reconnect
this.reconnectIfRequired(0);
}
}, 3000);
this.reconnectIfRequired(3000, false);
}

/**
* Reconnects to the stream if it is not stopped on purpose
* @param {number} [delayMs]
* @returns
*/
reconnectIfRequired(delayMs = 3000) {
reconnectIfRequired(delayMs = 3000, forceReconnect = false) {
if (this.reconnectIfRequiredFlag) {
//It's important to run the following methods after 3000 ms because the stream may be stopped by the user in the meantime
if (delayMs > 0) {
setTimeout(() => {
this.tryAgain();
}, delayMs);
if (delayMs <= 0) {
delayMs = 500;
//clear the timer because there is a demand to reconnect without delay
clearTimeout(this.pendingTryAgainTimerId);
this.pendingTryAgainTimerId = -1;
}
else {
this.tryAgain()

if (this.pendingTryAgainTimerId == -1)
{
this.pendingTryAgainTimerId = setTimeout(() =>
{
this.pendingTryAgainTimerId = -1;
this.tryAgain(forceReconnect);
},
delayMs);
}
}
}

tryAgain() {
tryAgain(forceReconnect) {

const now = Date.now();
//to prevent too many trial from different paths
if (now - this.lastReconnectiontionTrialTime < 3000) {
const timeDiff = now - this.lastReconnectiontionTrialTime;;
if (timeDiff < 3000 && forceReconnect == false) {
//check again 1 seconds later if it is not stopped on purpose
Logger.debug("Reconnection request received after "+ timeDiff+" ms. It should be at least 3000ms. It will try again after 1000ms");
this.reconnectIfRequired(1000, forceReconnect);
return;
}
this.lastReconnectiontionTrialTime = now;
Expand All @@ -650,10 +649,10 @@ export class WebRTCAdaptor {
//if remotePeerConnection has a peer connection for the stream id, it means that it is not stopped on purpose

if (this.remotePeerConnection[this.publishStreamId] != null &&
(forceReconnect ||
//check connection status to not stop streaming an active stream
this.iceConnectionState(this.publishStreamId) != "checking" &&
this.iceConnectionState(this.publishStreamId) != "connected" &&
this.iceConnectionState(this.publishStreamId) != "completed") {
["checking", "connected", "completed"].indexOf(this.iceConnectionState(this.publishStreamId)) === -1)
) {
// notify that reconnection process started for publish
this.notifyEventListeners("reconnection_attempt_for_publisher", this.publishStreamId);

Expand All @@ -669,11 +668,12 @@ export class WebRTCAdaptor {
//reconnect play
for (var index in this.playStreamId) {
let streamId = this.playStreamId[index];
if (this.remotePeerConnection[streamId] != "null" &&
//check connection status to not stop streaming an active stream
this.iceConnectionState(streamId) != "checking" &&
this.iceConnectionState(streamId) != "connected" &&
this.iceConnectionState(streamId) != "completed") {
if (this.remotePeerConnection[streamId] != null &&
(forceReconnect ||
//check connection status to not stop streaming an active stream
["checking", "connected", "completed"].indexOf(this.iceConnectionState(streamId)) === -1
)
) {
// notify that reconnection process started for play
this.notifyEventListeners("reconnection_attempt_for_player", streamId);

Expand Down Expand Up @@ -1136,27 +1136,38 @@ export class WebRTCAdaptor {

this.remotePeerConnection[streamId].oniceconnectionstatechange = event => {
var obj = { state: this.remotePeerConnection[streamId].iceConnectionState, streamId: streamId };
if (obj.state == "failed" || obj.state == "disconnected" || obj.state == "closed") {
this.reconnectIfRequired(3000);
}
this.notifyEventListeners("ice_connection_state_changed", obj);

//
if (!this.isPlayMode && !this.playStreamId.includes(streamId)) {
if (this.remotePeerConnection[streamId].iceConnectionState == "connected") {

this.mediaManager.changeBandwidth(this.mediaManager.bandwidth, streamId).then(() => {
Logger.debug("Bandwidth is changed to " + this.mediaManager.bandwidth);
})
.catch(e => Logger.warn(e));
}
}
this.oniceconnectionstatechangeCallback(obj);
}

}

return this.remotePeerConnection[streamId];
}

oniceconnectionstatechangeCallback(obj)
{
Logger.debug("ice connection state is " +obj.state + " for streamId: " + obj.streamId);
if (obj.state == "failed" || obj.state == "disconnected" || obj.state == "closed") {
//try immediately
Logger.debug("ice connection state is failed, disconnected or closed for streamId: " + obj.streamId + " it will try to reconnect immediately");
this.reconnectIfRequired(0, false);
}
this.notifyEventListeners("ice_connection_state_changed", obj);

//
if (!this.isPlayMode && !this.playStreamId.includes(obj.streamId)) {
if (this.remotePeerConnection[obj.streamId] != null && this.remotePeerConnection[obj.streamId].iceConnectionState == "connected") {

this.mediaManager.changeBandwidth(this.mediaManager.bandwidth, obj.streamId).then(() => {
Logger.debug("Bandwidth is changed to " + this.mediaManager.bandwidth);
})
.catch(e => Logger.warn(e));
}
}
}



/**
* Called internally to close PeerConnection.
Expand Down Expand Up @@ -1745,10 +1756,7 @@ export class WebRTCAdaptor {
websocket_url: this.websocketURL,
webrtcadaptor: this,
callback: (info, obj) => {
if (info == "closed") {
this.reconnectIfRequired();
}
this.notifyEventListeners(info, obj);
this.websocketCallback(info, obj)
},
callbackError: (error, message) => {
this.notifyErrorEventListeners(error, message)
Expand All @@ -1757,6 +1765,22 @@ export class WebRTCAdaptor {
});
}
}

websocketCallback(info, obj) {

if (info == "closed" || info == "server_will_stop") {
Logger.info("Critical response from server:"+ info +". It will reconnect immediately if there is an active connection");

//close websocket reconnect again
if (info == "server_will_stop") {
this.webSocketAdaptor.close();
}
//try with forcing reconnect because webrtc will be closed as well
this.reconnectIfRequired(0, true);
}

this.notifyEventListeners(info, obj);
}

/**
* Called to stop Web Socket connection
Expand Down
13 changes: 7 additions & 6 deletions src/main/webapp/conference.html
Original file line number Diff line number Diff line change
Expand Up @@ -687,12 +687,12 @@ <h3 class="col text-muted">WebRTC Multitrack Conference</h3>
var state = webRTCAdaptor
.signallingState(publishStreamId);
if (state != null
&& state != "closed") {
&& state != "closed")
{
var iceState = webRTCAdaptor
.iceConnectionState(publishStreamId);
if (iceState != null
&& iceState != "failed"
&& iceState != "disconnected") {
if (iceState != null && iceState != "new" && iceState != "closed" && iceState != "failed" && iceState != "disconnected")
{
startAnimation();
}
}
Expand Down Expand Up @@ -864,10 +864,11 @@ <h3 class="col text-muted">WebRTC Multitrack Conference</h3>
}
else {
//errorHandler(error, message);
$('video').notify("Warning: " + errorHandler(error, message), {

$('#roomName').notify("Warning: " + errorHandler(error, message), {
autoHideDelay: 5000,
className: 'error',
position: 'top right'
position: 'top center'
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/webapp/samples/publish_webrtc.html
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@
var state = webRTCAdaptor.signallingState(streamId);
if (state != null && state != "closed") {
var iceState = webRTCAdaptor.iceConnectionState(streamId);
if (iceState != null && iceState != "failed" && iceState != "disconnected") {
if (iceState != null && iceState != "new" && iceState != "closed" && iceState != "failed" && iceState != "disconnected") {
startAnimation();
}
else {
Expand Down
103 changes: 103 additions & 0 deletions src/test/js/webrtc_adaptor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,116 @@ describe("WebRTCAdaptor", function() {
expect(webSocketAdaptor.connecting).to.be.false;

});


it("reconnectIfRequired", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

let tryAgain = sinon.replace(adaptor, "tryAgain", sinon.fake());


adaptor.reconnectIfRequired(100);
adaptor.reconnectIfRequired(200);
clock.tick(300);

expect(tryAgain.calledOnce).to.be.true;



});

it("oniceconnectionstatechangeCallback", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

let reconnectIfRequired = sinon.replace(adaptor, "reconnectIfRequired", sinon.fake());
var obj = { state: "failed", streamId: "streamId" };

var stopFake = sinon.replace(adaptor, "stop", sinon.fake());
adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.calledOnce).to.be.true;
expect(reconnectIfRequired.calledWithExactly(0, false)).to.be.true;

obj = { state: "closed", streamId: "streamId" };

adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.calledTwice).to.be.true;
expect(reconnectIfRequired.calledWithExactly(0, false)).to.be.true;

obj = { state: "disconnected", streamId: "streamId" };
adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.callCount).to.be.equal(3);


obj = { state: "connected", streamId: "streamId" };
adaptor.oniceconnectionstatechangeCallback(obj);
expect(reconnectIfRequired.callCount).to.be.equal(3);


});

it("websocketCallback", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

let reconnectIfRequired = sinon.replace(adaptor, "reconnectIfRequired", sinon.fake());

var stopFake = sinon.replace(adaptor, "stop", sinon.fake());
adaptor.websocketCallback("closed");

expect(reconnectIfRequired.calledOnce).to.be.true;
expect(reconnectIfRequired.calledWithExactly(0, true)).to.be.true;


adaptor.websocketCallback("anyOtherThing");

//it should be still once
expect(reconnectIfRequired.calledOnce).to.be.true;


});

it("tryAgainForceReconnect", async function() {

var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});
var streamId = "streamId";
adaptor.publishStreamId = streamId;

let stop = sinon.replace(adaptor, "stop", sinon.fake());

var mockPC = sinon.mock(RTCPeerConnection);
adaptor.remotePeerConnection[streamId] = mockPC
mockPC.iceConnectionState = "connected";

adaptor.tryAgain(false);

expect(stop.calledOnce).to.be.false;

adaptor.tryAgain(true);
expect(stop.calledOnce).to.be.true;


});



it("Frequent try again call", async function() {
var adaptor = new WebRTCAdaptor({
websocketURL: "ws://example.com",
isPlayMode: true
});

expect(adaptor.pendingTryAgainTimerId).to.be.equal(-1);
let webSocketAdaptor = sinon.mock(adaptor.webSocketAdaptor);
let closeExpectation = webSocketAdaptor.expects("close");

Expand Down

0 comments on commit 4c9385e

Please sign in to comment.