Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream url list #263

Merged
merged 16 commits into from
Feb 28, 2024
Merged
2 changes: 1 addition & 1 deletion src/FrameClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,9 @@ class FrameClient {
"StartABRMezzanineJobs",
"StreamConfig",
"StreamCreate",
"StreamDeactivate",
"StreamInitialize",
"StreamInsertion",
"StreamListUrls",
"StreamStatus",
"StreamStartOrStopOrReset",
"StreamStopSession",
Expand Down
248 changes: 131 additions & 117 deletions src/client/LiveStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,7 @@ exports.StreamStartOrStopOrReset = async function({name, op}) {
};

/**
* Stop the live stream session and close the edge write token.
* Not implemented fully
* close the edge write token for the live stream session.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can say 'close the edge write token ...' and make the stream object inactive.

*
* @methodGroup Live Stream
* @namedParams
Expand All @@ -778,8 +777,7 @@ exports.StreamStartOrStopOrReset = async function({name, op}) {
*/
exports.StreamStopSession = async function({name}) {
try {
console.log("TERMINATE: ", name);

this.Log(`Terminating stream session for: ${name}`);
let conf = await this.LoadConf({name});

let {objectId} = conf;
Expand Down Expand Up @@ -807,51 +805,49 @@ exports.StreamStopSession = async function({name}) {
error: "no active streams - must create a stream first"
elv-zenia marked this conversation as resolved.
Show resolved Hide resolved
};
}
let edgeMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken
});

elv-zenia marked this conversation as resolved.
Show resolved Hide resolved
// Stop the LRO if running
let status = await this.StreamStatus({name});
if(status.state != "terminated") {
console.log("STOPPING");
try {
await this.CallBitcodeMethod({
libraryId: status.library_id,
objectId: status.object_id,
writeToken: status.edge_write_token,
method: "/live/stop/" + status.tlro,
constant: false
});
} catch(error) {
// The /call/lro/stop API returns empty response
// console.log("LRO Stop (failed): ", error);
}
let edgeMeta, status;
try {
edgeMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken
});

// Wait until LRO is terminated
let tries = 10;
while(status.state != "stopped" && tries-- > 0) {
console.log("Wait to terminate - ", status.state);
await Sleep(1000);
status = await this.StreamStatus({name});
status = await this.StreamStatus({name});

if(status.state !== "stopped") {
return {
state: status.state,
error: "Stream must be stopped before terminating"
}
}
console.log("Status after stop - ", status.state);
} catch(error) {
console.warn(`Unable to retrieve metadata for edge write token ${edgeWriteToken}`);

if(tries <= 0) {
console.log("Failed to stop");
return status;
// If token doesn't return metadata, generate new token
if(!edgeMeta) {
let response = await this.EditContentObject({
libraryId: libraryId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create the new write token in all cases.
This is because the edge write token has a ton of metadata in it - as created by the live recorder (the list of all video and audio parts and metadata for each part, ...).

I think we should do nothing when edge_meta doesn't work.
And if it does work then we can check the status, like you already to, and then discard/delete the edge write token (though I am not sure if we have a client method to discard a write token)

Then just open the new write token in all cases and write the status fields, then finalize and publish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like there's a client method to discard a write token. Is there a contract method for this?

objectId: objectId
});

this.Log(`Unable to retrieve metadata for token ${edgeWriteToken}. Generating new token ${response.write_token}`);

edgeWriteToken = response.write_token;
edgeMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken
});
}
}

// Set stop time
const newState = "inactive";
edgeMeta.recording_stop_time = Math.floor(new Date().getTime() / 1000);
console.log("recording_start_time: ", edgeMeta.recording_start_time);
console.log("recording_stop_time: ", edgeMeta.recording_stop_time);

// Set stop time and inactive state
edgeMeta.live_recording.status = {
state: "terminated",
state: newState,
recording_stop_time: edgeMeta.recording_stop_time
};

Expand All @@ -868,17 +864,15 @@ exports.StreamStopSession = async function({name}) {
libraryId,
objectId,
writeToken: edgeWriteToken,
commitMessage: "Finalize live stream - stop time " + edgeMeta.recording_stop_time,
publish: false // Don't publish this version because it is not currently useful
commitMessage: `Finalize live stream - stop time ${edgeMeta.recording_stop_time}`
elv-zenia marked this conversation as resolved.
Show resolved Hide resolved
});

return {
fin,
name,
edge_write_token: edgeWriteToken,
state: "terminated"
state: newState
};

} catch(error) {
console.error(error);
}
Expand Down Expand Up @@ -1419,97 +1413,117 @@ exports.StreamConfig = async function({name, customSettings={}}) {
};

/**
* Deactivate the stream
* List the pre-allocated URLs for a site
*
* @methodGroup Live Stream
* @namedParams
* @param {string} name - Object ID or name of the live stream object
* @param {string=} - ID of the live stream site object
*
* @return {Promise<Object>} - The status response for the stream
* @return {Promise<Object>} - The list of stream URLs
*/
exports.StreamDeactivate = async function({name}) {
exports.StreamListUrls = async function({siteId}={}) {
try {
let conf = await this.LoadConf({name});

let {objectId} = conf;
let libraryId = await this.ContentObjectLibraryId({objectId});

let mainMeta = await this.ContentObjectMetadata({
libraryId,
objectId
});
let status = await this.StreamStatus({name});

if(!mainMeta.live_recording) {
return {
state: status.state,
error: "Stream must be configured before deactivating"
};
}

// Return error if the LRO is running
if(status.state !== "stopped") {
return {
state: status.state,
error: "Stream must be stopped before deactivating"
};
}
const STATUS_MAP = {
UNCONFIGURED: "unconfigured",
UNINITIALIZED: "uninitialized",
INACTIVE: "inactive",
STOPPED: "stopped",
STARTING: "starting",
RUNNING: "running",
STALLED: "stalled",
};

let fabURI = mainMeta.live_recording.fabric_config.ingress_node_api;
// Support both hostname and URL ingress_node_api
if(!fabURI.startsWith("http")) {
// Assume https
fabURI = "https://" + fabURI;
if(!siteId) {
const tenantContractId = await this.userProfileClient.TenantContractId();
siteId = await this.ContentObjectMetadata({
libraryId: tenantContractId.replace("iten", "ilib"),
objectId: tenantContractId.replace("iten", "iq__"),
metadataSubtree: "public/sites/live_streams",
});
}

this.SetNodes({fabricURIs: [fabURI]});

let edgeWriteToken = mainMeta.live_recording.fabric_config.edge_write_token;

if(edgeWriteToken === undefined || edgeWriteToken === "") {
return {
state: "inactive",
error: "stream is already inactive"
};
}
let edgeMeta = await this.ContentObjectMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken
const streamMetadata = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: siteId}),
objectId: siteId,
metadataSubtree: "public/asset_metadata/live_streams",
resolveLinks: true,
resolveIgnoreErrors: true
});

// Set stop time
edgeMeta.recording_stop_time = Math.floor(new Date().getTime() / 1000);
const newState = "inactive";
const activeUrlMap = {};
await this.utils.LimitedMap(
10,
Object.keys(streamMetadata || {}),
async slug => {
const stream = streamMetadata[slug];
let versionHash;

if(
stream &&
stream.sources &&
stream.sources.default &&
stream.sources.default["."] &&
stream.sources.default["."].container ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably cleaner to use optional chaining for this

stream?.sources?.default?.["."]?.container

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to use optional chaining in client-js because I don't think all apps that use it support it. I think there are a few Webpack 4 projects

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right this is the client. Fair point, no optional chaining

((stream["/"] || "").match(/^\/?qfab\/([\w]+)\/?.+/) || [])[1]
) {
versionHash = (
stream.sources.default["."].container ||
((stream["/"] || "").match(/^\/?qfab\/([\w]+)\/?.+/) || [])[1]
);
}

edgeMeta.live_recording.status = {
state: newState,
recording_stop_time: edgeMeta.recording_stop_time
};
if(versionHash) {
const objectId = this.utils.DecodeVersionHash(versionHash).objectId;
const libraryId = await this.ContentObjectLibraryId({objectId});

const status = await this.StreamStatus({
name: objectId
});

const streamMeta = await this.ContentObjectMetadata({
objectId,
libraryId,
select: [
"live_recording_config/reference_url",
// live_recording_config/url is the old path
"live_recording_config/url"
]
});

const url = streamMeta.live_recording_config.reference_url || streamMeta.live_recording_config.url;
const isActive = [STATUS_MAP.STARTING, STATUS_MAP.RUNNING, STATUS_MAP.STALLED, STATUS_MAP.STOPPED].includes(status.state);

if(url && isActive) {
activeUrlMap[url] = true;
}
}
}
);

edgeMeta.live_recording.fabric_config.edge_write_token = "";
const streamUrlStatus = {};

await this.ReplaceMetadata({
libraryId,
objectId,
writeToken: edgeWriteToken,
metadata: edgeMeta
const streamUrls = await this.ContentObjectMetadata({
libraryId: await this.ContentObjectLibraryId({objectId: siteId}),
objectId: siteId,
metadataSubtree: "/live_stream_urls",
resolveLinks: true,
resolveIgnoreErrors: true
});

let fin = await this.FinalizeContentObject({
libraryId,
objectId,
writeToken: edgeWriteToken,
commitMessage: "Deactivate stream"
if(!streamUrls) {
throw Error("No pre-allocated URLs configured");
}

Object.keys(streamUrls || {}).forEach(protocol => {
streamUrlStatus[protocol] = streamUrls[protocol].map(url => {
return {
url,
active: activeUrlMap[url] || false
};
})
});

return {
reference_url: status.reference_url,
fin,
name,
edge_write_token: edgeWriteToken,
state: newState
};
return streamUrlStatus;
} catch(error) {
console.error(error);
}
Expand Down