diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 08f74ed3..16eeb264 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -224,45 +224,19 @@ namespace Mist { } INFO_MSG("Received request %s", H.getUrl().c_str()); - initialize(); - if (H.GetVar("audio") != "" || H.GetVar("video") != ""){ - selectedTracks.clear(); - if (H.GetVar("audio") != ""){ - selectedTracks.insert(JSON::Value(H.GetVar("audio")).asInt()); - } - if (H.GetVar("video") != ""){ - selectedTracks.insert(JSON::Value(H.GetVar("video")).asInt()); - } - selectDefaultTracks(); - std::set toRemove; - if (H.GetVar("video") == "0"){ - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks.at(*it).type=="video"){ - toRemove.insert(*it); - } - } - } - if (H.GetVar("audio") == "0"){ - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks.at(*it).type=="audio"){ - toRemove.insert(*it); - } - } - } - //remove those from selectedtracks - for (std::set::iterator it = toRemove.begin(); it != toRemove.end(); it++){ - selectedTracks.erase(*it); - } - }else{ - selectDefaultTracks(); - } - + preHTTP(); onHTTP(); if (!H.bufferChunks){ H.Clean(); } } } + + /// Default implementation of preHTTP simply calls initialize and selectDefaultTracks. + void HTTPOutput::preHTTP(){ + initialize(); + selectDefaultTracks(); + } static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){ jsonForEach(argset, it) { diff --git a/src/output/output_http.h b/src/output/output_http.h index bdf00151..bad8ae6a 100644 --- a/src/output/output_http.h +++ b/src/output/output_http.h @@ -14,6 +14,7 @@ namespace Mist { virtual void onFail(); virtual void onHTTP(){}; virtual void requestHandler(); + virtual void preHTTP(); static bool listenMode(){return false;} void reConnector(std::string & connector); std::string getHandler(); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 94026a9a..9dcdb120 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -5,9 +5,11 @@ #include #include "flashPlayer.h" #include "oldFlashPlayer.h" +#include namespace Mist { OutHTTP::OutHTTP(Socket::Connection & conn) : HTTPOutput(conn){ + stayConnected = false; if (myConn.getPureSocket() >= 0){ std::string host = getConnectedHost(); dup2(myConn.getSocket(), STDIN_FILENO); @@ -42,6 +44,7 @@ namespace Mist { return; } if (H.url.size() >= 3 && H.url.substr(H.url.size() - 3) == ".js"){ + if (websocketHandler()){return;} JSON::Value json_resp; json_resp["error"] = "Could not retrieve stream. Sorry."; if (H.url.size() >= 5 && H.url.substr(0, 5) == "/json"){ @@ -233,6 +236,146 @@ namespace Mist { H.SendResponse("200", "OK", myConn); } + JSON::Value OutHTTP::getStatusJSON(std::string & reqHost){ + JSON::Value json_resp; + uint8_t streamStatus = Util::getStreamStatus(streamName); + if (streamStatus != STRMSTAT_READY){ + switch (streamStatus){ + case STRMSTAT_OFF: + json_resp["error"] = "Stream is offline"; + break; + case STRMSTAT_INIT: + json_resp["error"] = "Stream is initializing"; + break; + case STRMSTAT_BOOT: + json_resp["error"] = "Stream is booting"; + break; + case STRMSTAT_WAIT: + json_resp["error"] = "Stream is waiting for data"; + break; + case STRMSTAT_SHUTDOWN: + json_resp["error"] = "Stream is shutting down"; + break; + case STRMSTAT_INVALID: + json_resp["error"] = "Stream status is invalid?!"; + break; + default: + json_resp["error"] = "Stream status is unknown?!"; + break; + } + return json_resp; + } + initialize(); + if (!myConn){ + return json_resp; + } + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); + configLock.wait(); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); + if (!prots){ + json_resp["error"] = "The specified stream is not available on this server."; + configLock.post(); + configLock.close(); + return json_resp; + } + + bool hasVideo = false; + for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ + if (trit->second.type == "video"){ + hasVideo = true; + if (trit->second.width > json_resp["width"].asInt()){ + json_resp["width"] = trit->second.width; + } + if (trit->second.height > json_resp["height"].asInt()){ + json_resp["height"] = trit->second.height; + } + } + } + if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){ + json_resp["width"] = 640ll; + json_resp["height"] = 480ll; + if (!hasVideo){json_resp["height"] = 20ll;} + } + if (myMeta.vod){ + json_resp["type"] = "vod"; + } + if (myMeta.live){ + json_resp["type"] = "live"; + } + + // show ALL the meta datas! + json_resp["meta"] = myMeta.toJSON(); + jsonForEach(json_resp["meta"]["tracks"], it) { + if (it->isMember("lang")){ + (*it)["language"] = Encodings::ISO639::decode((*it)["lang"].asStringRef()); + } + it->removeMember("fragments"); + it->removeMember("keys"); + it->removeMember("keysizes"); + it->removeMember("parts"); + } + + //create a set for storing source information + std::set sources; + + //find out which connectors are enabled + std::set conns; + unsigned int prots_ctr = prots.getSize(); + for (unsigned int i = 0; i < prots_ctr; ++i){ + conns.insert(prots.getIndice(i).getMember("connector").asString()); + } + //loop over the connectors. + for (unsigned int i = 0; i < prots_ctr; ++i){ + std::string cName = prots.getIndice(i).getMember("connector").asString(); + DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); + //if the connector has a port, + if (capa.getMember("optional").getMember("port")){ + HTTP::URL outURL(reqHost); + //get the default port if none is set + outURL.port = prots.getIndice(i).getMember("port").asString(); + if (!outURL.port.size()){ + outURL.port = capa.getMember("optional").getMember("port").getMember("default").asString(); + } + outURL.protocol = capa.getMember("protocol").asString(); + if (outURL.protocol.find(':') != std::string::npos){ + outURL.protocol.erase(outURL.protocol.find(':')); + } + //and a URL - then list the URL + JSON::Value capa_json = capa.asJSON(); + if (capa.getMember("url_rel") || capa.getMember("methods")){ + addSources(streamName, sources, outURL, capa_json, json_resp["meta"]); + } + //Make note if this connector can be depended upon by other connectors + if (capa.getMember("provides")){ + std::string cProv = capa.getMember("provides").asString(); + //if this connector can be depended upon by other connectors, loop over the rest + //check each enabled protocol separately to see if it depends on this connector + DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); + unsigned int capa_lst_ctr = capa_lst.getSize(); + for (unsigned int j = 0; j < capa_lst_ctr; ++j){ + //if it depends on this connector and has a URL, list it + if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ + JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); + addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"]); + } + } + } + } + } + + //loop over the added sources, add them to json_resp["sources"] + for (std::set::iterator it = sources.begin(); it != sources.end(); it++){ + if ((*it)["simul_tracks"].asInt() > 0){ + json_resp["source"].append(*it); + } + } + configLock.post(); + configLock.close(); + return json_resp; + } + + void OutHTTP::onHTTP(){ std::string method = H.method; @@ -352,6 +495,7 @@ namespace Mist { } if ((H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 9 && H.url.substr(0, 6) == "/json_" && H.url.substr(H.url.length() - 3, 3) == ".js")){ + if (websocketHandler()){return;} std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; std::string response; std::string rURL = H.url; @@ -368,112 +512,9 @@ namespace Mist { H.Clean(); return; } - response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n"; - JSON::Value json_resp; initialize(); - if (!myConn){ - return; - } - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - if (prots){ - bool hasVideo = false; - for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.type == "video"){ - hasVideo = true; - if (trit->second.width > json_resp["width"].asInt()){ - json_resp["width"] = trit->second.width; - } - if (trit->second.height > json_resp["height"].asInt()){ - json_resp["height"] = trit->second.height; - } - } - } - if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){ - json_resp["width"] = 640ll; - json_resp["height"] = 480ll; - if (!hasVideo){json_resp["height"] = 20ll;} - } - if (myMeta.vod){ - json_resp["type"] = "vod"; - } - if (myMeta.live){ - json_resp["type"] = "live"; - } - - // show ALL the meta datas! - json_resp["meta"] = myMeta.toJSON(); - jsonForEach(json_resp["meta"]["tracks"], it) { - if (it->isMember("lang")){ - (*it)["language"] = Encodings::ISO639::decode((*it)["lang"].asStringRef()); - } - it->removeMember("fragments"); - it->removeMember("keys"); - it->removeMember("keysizes"); - it->removeMember("parts"); - } - - //create a set for storing source information - std::set sources; - - //find out which connectors are enabled - std::set conns; - unsigned int prots_ctr = prots.getSize(); - for (unsigned int i = 0; i < prots_ctr; ++i){ - conns.insert(prots.getIndice(i).getMember("connector").asString()); - } - //loop over the connectors. - for (unsigned int i = 0; i < prots_ctr; ++i){ - std::string cName = prots.getIndice(i).getMember("connector").asString(); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); - //if the connector has a port, - if (capa.getMember("optional").getMember("port")){ - HTTP::URL outURL(reqHost); - //get the default port if none is set - outURL.port = prots.getIndice(i).getMember("port").asString(); - if (!outURL.port.size()){ - outURL.port = capa.getMember("optional").getMember("port").getMember("default").asString(); - } - outURL.protocol = capa.getMember("protocol").asString(); - if (outURL.protocol.find(':') != std::string::npos){ - outURL.protocol.erase(outURL.protocol.find(':')); - } - //and a URL - then list the URL - JSON::Value capa_json = capa.asJSON(); - if (capa.getMember("url_rel") || capa.getMember("methods")){ - addSources(streamName, sources, outURL, capa_json, json_resp["meta"]); - } - //Make note if this connector can be depended upon by other connectors - if (capa.getMember("provides")){ - std::string cProv = capa.getMember("provides").asString(); - //if this connector can be depended upon by other connectors, loop over the rest - //check each enabled protocol separately to see if it depends on this connector - DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); - unsigned int capa_lst_ctr = capa_lst.getSize(); - for (unsigned int j = 0; j < capa_lst_ctr; ++j){ - //if it depends on this connector and has a URL, list it - if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ - JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); - addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"]); - } - } - } - } - } - - //loop over the added sources, add them to json_resp["sources"] - for (std::set::iterator it = sources.begin(); it != sources.end(); it++){ - if ((*it)["simul_tracks"].asInt() > 0){ - json_resp["source"].append(*it); - } - } - }else{ - json_resp["error"] = "The specified stream is not available on this server."; - } - configLock.post(); - configLock.close(); + response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n"; + JSON::Value json_resp = getStatusJSON(reqHost); if (rURL.substr(0, 6) != "/json_"){ response += "mistvideo['" + streamName + "'] = " + json_resp.toString() + ";\n"; }else{ @@ -656,5 +697,48 @@ namespace Mist { H.Clean(); } + bool OutHTTP::websocketHandler(){ + stayConnected = true; + std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; + if (H.GetHeader("Upgrade") != "websocket"){return false;} + HTTP::Websocket ws(myConn, H); + if (!ws){return false;} + //start the stream, if needed + Util::startInput(streamName, "", true, false); + + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + IPC::sharedPage streamStatus(pageName, 1, false, false); + uint8_t prevState, newState, metaCounter; + uint64_t prevTracks; + prevState = newState = STRMSTAT_INVALID; + while (keepGoing()){ + if (!streamStatus || !streamStatus.exists()){streamStatus.init(pageName, 1, false, false);} + if (!streamStatus){newState = STRMSTAT_OFF;}else{newState = streamStatus.mapped[0];} + + if (newState != prevState || (newState == STRMSTAT_READY && myMeta.tracks.size() != prevTracks)){ + if (newState == STRMSTAT_READY){ + reconnect(); + updateMeta(); + prevTracks = myMeta.tracks.size(); + }else{ + disconnect(); + } + JSON::Value resp = getStatusJSON(reqHost); + ws.sendFrame(resp.toString()); + prevState = newState; + }else{ + if (newState == STRMSTAT_READY){ + stats(); + } + Util::sleep(250); + if (newState == STRMSTAT_READY && (++metaCounter % 4) == 0){ + updateMeta(); + } + } + } + return true; + } + } diff --git a/src/output/output_http_internal.h b/src/output/output_http_internal.h index 5badb0d9..f2f9a0ba 100644 --- a/src/output/output_http_internal.h +++ b/src/output/output_http_internal.h @@ -9,9 +9,17 @@ namespace Mist { static void init(Util::Config * cfg); static bool listenMode(); virtual void onFail(); + ///preHTTP is disabled in the internal HTTP output, since most don't need the stream alive to work + virtual void preHTTP(){}; void HTMLResponse(); void onHTTP(); void sendIcon(); + bool websocketHandler(); + JSON::Value getStatusJSON(std::string & reqHost); + bool stayConnected; + virtual bool onFinish(){ + return stayConnected; + } }; }