Added HTTP info.js websocket mode

This commit is contained in:
Thulinma 2018-03-20 15:06:37 +01:00
parent 798f099638
commit b0bf1d14ec
4 changed files with 205 additions and 138 deletions

View file

@ -224,45 +224,19 @@ namespace Mist {
}
INFO_MSG("Received request %s", H.getUrl().c_str());
initialize();
if (H.GetVar("audio") != "" || H.GetVar("video") != ""){
selectedTracks.clear();
if (H.GetVar("audio") != ""){
selectedTracks.insert(JSON::Value(H.GetVar("audio")).asInt());
}
if (H.GetVar("video") != ""){
selectedTracks.insert(JSON::Value(H.GetVar("video")).asInt());
}
selectDefaultTracks();
std::set<unsigned long> toRemove;
if (H.GetVar("video") == "0"){
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks.at(*it).type=="video"){
toRemove.insert(*it);
}
}
}
if (H.GetVar("audio") == "0"){
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks.at(*it).type=="audio"){
toRemove.insert(*it);
}
}
}
//remove those from selectedtracks
for (std::set<unsigned long>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
selectedTracks.erase(*it);
}
}else{
selectDefaultTracks();
}
preHTTP();
onHTTP();
if (!H.bufferChunks){
H.Clean();
}
}
}
/// Default implementation of preHTTP simply calls initialize and selectDefaultTracks.
void HTTPOutput::preHTTP(){
initialize();
selectDefaultTracks();
}
static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){
jsonForEach(argset, it) {

View file

@ -14,6 +14,7 @@ namespace Mist {
virtual void onFail();
virtual void onHTTP(){};
virtual void requestHandler();
virtual void preHTTP();
static bool listenMode(){return false;}
void reConnector(std::string & connector);
std::string getHandler();

View file

@ -5,9 +5,11 @@
#include <mist/langcodes.h>
#include "flashPlayer.h"
#include "oldFlashPlayer.h"
#include <mist/websocket.h>
namespace Mist {
OutHTTP::OutHTTP(Socket::Connection & conn) : HTTPOutput(conn){
stayConnected = false;
if (myConn.getPureSocket() >= 0){
std::string host = getConnectedHost();
dup2(myConn.getSocket(), STDIN_FILENO);
@ -42,6 +44,7 @@ namespace Mist {
return;
}
if (H.url.size() >= 3 && H.url.substr(H.url.size() - 3) == ".js"){
if (websocketHandler()){return;}
JSON::Value json_resp;
json_resp["error"] = "Could not retrieve stream. Sorry.";
if (H.url.size() >= 5 && H.url.substr(0, 5) == "/json"){
@ -233,6 +236,146 @@ namespace Mist {
H.SendResponse("200", "OK", myConn);
}
JSON::Value OutHTTP::getStatusJSON(std::string & reqHost){
JSON::Value json_resp;
uint8_t streamStatus = Util::getStreamStatus(streamName);
if (streamStatus != STRMSTAT_READY){
switch (streamStatus){
case STRMSTAT_OFF:
json_resp["error"] = "Stream is offline";
break;
case STRMSTAT_INIT:
json_resp["error"] = "Stream is initializing";
break;
case STRMSTAT_BOOT:
json_resp["error"] = "Stream is booting";
break;
case STRMSTAT_WAIT:
json_resp["error"] = "Stream is waiting for data";
break;
case STRMSTAT_SHUTDOWN:
json_resp["error"] = "Stream is shutting down";
break;
case STRMSTAT_INVALID:
json_resp["error"] = "Stream status is invalid?!";
break;
default:
json_resp["error"] = "Stream status is unknown?!";
break;
}
return json_resp;
}
initialize();
if (!myConn){
return json_resp;
}
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
if (!prots){
json_resp["error"] = "The specified stream is not available on this server.";
configLock.post();
configLock.close();
return json_resp;
}
bool hasVideo = false;
for (std::map<unsigned int, DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
if (trit->second.type == "video"){
hasVideo = true;
if (trit->second.width > json_resp["width"].asInt()){
json_resp["width"] = trit->second.width;
}
if (trit->second.height > json_resp["height"].asInt()){
json_resp["height"] = trit->second.height;
}
}
}
if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){
json_resp["width"] = 640ll;
json_resp["height"] = 480ll;
if (!hasVideo){json_resp["height"] = 20ll;}
}
if (myMeta.vod){
json_resp["type"] = "vod";
}
if (myMeta.live){
json_resp["type"] = "live";
}
// show ALL the meta datas!
json_resp["meta"] = myMeta.toJSON();
jsonForEach(json_resp["meta"]["tracks"], it) {
if (it->isMember("lang")){
(*it)["language"] = Encodings::ISO639::decode((*it)["lang"].asStringRef());
}
it->removeMember("fragments");
it->removeMember("keys");
it->removeMember("keysizes");
it->removeMember("parts");
}
//create a set for storing source information
std::set<JSON::Value, sourceCompare> sources;
//find out which connectors are enabled
std::set<std::string> conns;
unsigned int prots_ctr = prots.getSize();
for (unsigned int i = 0; i < prots_ctr; ++i){
conns.insert(prots.getIndice(i).getMember("connector").asString());
}
//loop over the connectors.
for (unsigned int i = 0; i < prots_ctr; ++i){
std::string cName = prots.getIndice(i).getMember("connector").asString();
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName);
//if the connector has a port,
if (capa.getMember("optional").getMember("port")){
HTTP::URL outURL(reqHost);
//get the default port if none is set
outURL.port = prots.getIndice(i).getMember("port").asString();
if (!outURL.port.size()){
outURL.port = capa.getMember("optional").getMember("port").getMember("default").asString();
}
outURL.protocol = capa.getMember("protocol").asString();
if (outURL.protocol.find(':') != std::string::npos){
outURL.protocol.erase(outURL.protocol.find(':'));
}
//and a URL - then list the URL
JSON::Value capa_json = capa.asJSON();
if (capa.getMember("url_rel") || capa.getMember("methods")){
addSources(streamName, sources, outURL, capa_json, json_resp["meta"]);
}
//Make note if this connector can be depended upon by other connectors
if (capa.getMember("provides")){
std::string cProv = capa.getMember("provides").asString();
//if this connector can be depended upon by other connectors, loop over the rest
//check each enabled protocol separately to see if it depends on this connector
DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_lst_ctr = capa_lst.getSize();
for (unsigned int j = 0; j < capa_lst_ctr; ++j){
//if it depends on this connector and has a URL, list it
if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){
JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON();
addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"]);
}
}
}
}
}
//loop over the added sources, add them to json_resp["sources"]
for (std::set<JSON::Value, sourceCompare>::iterator it = sources.begin(); it != sources.end(); it++){
if ((*it)["simul_tracks"].asInt() > 0){
json_resp["source"].append(*it);
}
}
configLock.post();
configLock.close();
return json_resp;
}
void OutHTTP::onHTTP(){
std::string method = H.method;
@ -352,6 +495,7 @@ namespace Mist {
}
if ((H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 9 && H.url.substr(0, 6) == "/json_" && H.url.substr(H.url.length() - 3, 3) == ".js")){
if (websocketHandler()){return;}
std::string reqHost = HTTP::URL(H.GetHeader("Host")).host;
std::string response;
std::string rURL = H.url;
@ -368,112 +512,9 @@ namespace Mist {
H.Clean();
return;
}
response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n";
JSON::Value json_resp;
initialize();
if (!myConn){
return;
}
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
if (prots){
bool hasVideo = false;
for (std::map<unsigned int, DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
if (trit->second.type == "video"){
hasVideo = true;
if (trit->second.width > json_resp["width"].asInt()){
json_resp["width"] = trit->second.width;
}
if (trit->second.height > json_resp["height"].asInt()){
json_resp["height"] = trit->second.height;
}
}
}
if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){
json_resp["width"] = 640ll;
json_resp["height"] = 480ll;
if (!hasVideo){json_resp["height"] = 20ll;}
}
if (myMeta.vod){
json_resp["type"] = "vod";
}
if (myMeta.live){
json_resp["type"] = "live";
}
// show ALL the meta datas!
json_resp["meta"] = myMeta.toJSON();
jsonForEach(json_resp["meta"]["tracks"], it) {
if (it->isMember("lang")){
(*it)["language"] = Encodings::ISO639::decode((*it)["lang"].asStringRef());
}
it->removeMember("fragments");
it->removeMember("keys");
it->removeMember("keysizes");
it->removeMember("parts");
}
//create a set for storing source information
std::set<JSON::Value, sourceCompare> sources;
//find out which connectors are enabled
std::set<std::string> conns;
unsigned int prots_ctr = prots.getSize();
for (unsigned int i = 0; i < prots_ctr; ++i){
conns.insert(prots.getIndice(i).getMember("connector").asString());
}
//loop over the connectors.
for (unsigned int i = 0; i < prots_ctr; ++i){
std::string cName = prots.getIndice(i).getMember("connector").asString();
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName);
//if the connector has a port,
if (capa.getMember("optional").getMember("port")){
HTTP::URL outURL(reqHost);
//get the default port if none is set
outURL.port = prots.getIndice(i).getMember("port").asString();
if (!outURL.port.size()){
outURL.port = capa.getMember("optional").getMember("port").getMember("default").asString();
}
outURL.protocol = capa.getMember("protocol").asString();
if (outURL.protocol.find(':') != std::string::npos){
outURL.protocol.erase(outURL.protocol.find(':'));
}
//and a URL - then list the URL
JSON::Value capa_json = capa.asJSON();
if (capa.getMember("url_rel") || capa.getMember("methods")){
addSources(streamName, sources, outURL, capa_json, json_resp["meta"]);
}
//Make note if this connector can be depended upon by other connectors
if (capa.getMember("provides")){
std::string cProv = capa.getMember("provides").asString();
//if this connector can be depended upon by other connectors, loop over the rest
//check each enabled protocol separately to see if it depends on this connector
DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_lst_ctr = capa_lst.getSize();
for (unsigned int j = 0; j < capa_lst_ctr; ++j){
//if it depends on this connector and has a URL, list it
if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){
JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON();
addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"]);
}
}
}
}
}
//loop over the added sources, add them to json_resp["sources"]
for (std::set<JSON::Value, sourceCompare>::iterator it = sources.begin(); it != sources.end(); it++){
if ((*it)["simul_tracks"].asInt() > 0){
json_resp["source"].append(*it);
}
}
}else{
json_resp["error"] = "The specified stream is not available on this server.";
}
configLock.post();
configLock.close();
response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n";
JSON::Value json_resp = getStatusJSON(reqHost);
if (rURL.substr(0, 6) != "/json_"){
response += "mistvideo['" + streamName + "'] = " + json_resp.toString() + ";\n";
}else{
@ -656,5 +697,48 @@ namespace Mist {
H.Clean();
}
bool OutHTTP::websocketHandler(){
stayConnected = true;
std::string reqHost = HTTP::URL(H.GetHeader("Host")).host;
if (H.GetHeader("Upgrade") != "websocket"){return false;}
HTTP::Websocket ws(myConn, H);
if (!ws){return false;}
//start the stream, if needed
Util::startInput(streamName, "", true, false);
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
IPC::sharedPage streamStatus(pageName, 1, false, false);
uint8_t prevState, newState, metaCounter;
uint64_t prevTracks;
prevState = newState = STRMSTAT_INVALID;
while (keepGoing()){
if (!streamStatus || !streamStatus.exists()){streamStatus.init(pageName, 1, false, false);}
if (!streamStatus){newState = STRMSTAT_OFF;}else{newState = streamStatus.mapped[0];}
if (newState != prevState || (newState == STRMSTAT_READY && myMeta.tracks.size() != prevTracks)){
if (newState == STRMSTAT_READY){
reconnect();
updateMeta();
prevTracks = myMeta.tracks.size();
}else{
disconnect();
}
JSON::Value resp = getStatusJSON(reqHost);
ws.sendFrame(resp.toString());
prevState = newState;
}else{
if (newState == STRMSTAT_READY){
stats();
}
Util::sleep(250);
if (newState == STRMSTAT_READY && (++metaCounter % 4) == 0){
updateMeta();
}
}
}
return true;
}
}

View file

@ -9,9 +9,17 @@ namespace Mist {
static void init(Util::Config * cfg);
static bool listenMode();
virtual void onFail();
///preHTTP is disabled in the internal HTTP output, since most don't need the stream alive to work
virtual void preHTTP(){};
void HTMLResponse();
void onHTTP();
void sendIcon();
bool websocketHandler();
JSON::Value getStatusJSON(std::string & reqHost);
bool stayConnected;
virtual bool onFinish(){
return stayConnected;
}
};
}