From dd976f7a7ae075577a20a9d6a01ec0eed671324c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 8 Feb 2018 12:35:05 +0100 Subject: [PATCH 1/3] Added basic websocket support --- CMakeLists.txt | 2 + lib/websocket.cpp | 166 ++++++++++++++++++++++++++++++++++++++++++++++ lib/websocket.h | 22 ++++++ 3 files changed, 190 insertions(+) create mode 100644 lib/websocket.cpp create mode 100644 lib/websocket.h diff --git a/CMakeLists.txt b/CMakeLists.txt index d1457feb..c8c90ed9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -136,6 +136,7 @@ set(libHeaders lib/riff.h lib/ebml.h lib/ebml_socketglue.h + lib/websocket.h ) ######################################## @@ -178,6 +179,7 @@ add_library (mist lib/riff.cpp lib/ebml.cpp lib/ebml_socketglue.cpp + lib/websocket.cpp ) if (NOT APPLE) set (LIBRT -lrt) diff --git a/lib/websocket.cpp b/lib/websocket.cpp new file mode 100644 index 00000000..6b8ef2a4 --- /dev/null +++ b/lib/websocket.cpp @@ -0,0 +1,166 @@ +#include "websocket.h" +#include "defines.h" +#include "encode.h" +#include "bitfields.h" +#include "timing.h" +#ifdef SSL +#include "mbedtls/sha1.h" +#endif + +namespace HTTP{ + + Websocket::Websocket(Socket::Connection &c, HTTP::Parser &h) : C(c), H(h){ + frameType = 0; + if (H.GetHeader("Connection").find("Upgrade") == std::string::npos){ + FAIL_MSG("Could not negotiate websocket, connection header incorrect (%s).", + H.GetHeader("Connection").c_str()); + C.close(); + return; + } + if (H.GetHeader("Upgrade") != "websocket"){ + FAIL_MSG("Could not negotiate websocket, upgrade header incorrect (%s).", + H.GetHeader("Upgrade").c_str()); + C.close(); + return; + } + if (H.GetHeader("Sec-WebSocket-Version") != "13"){ + FAIL_MSG("Could not negotiate websocket, version incorrect (%s).", + H.GetHeader("Sec-WebSocket-Version").c_str()); + C.close(); + return; + } + std::string client_key = H.GetHeader("Sec-WebSocket-Key"); + if (!client_key.size()){ + FAIL_MSG("Could not negotiate websocket, missing key!"); + C.close(); + return; + } + client_key += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + H.Clean(); + H.setCORSHeaders(); + H.SetHeader("Upgrade", "websocket"); + H.SetHeader("Connection", "Upgrade"); +#ifdef SSL + mbedtls_sha1_context ctx; + unsigned char outdata[20]; + mbedtls_sha1_starts(&ctx); + mbedtls_sha1_update(&ctx, (const unsigned char*)client_key.data(), client_key.size()); + mbedtls_sha1_finish(&ctx, outdata); + H.SetHeader("Sec-WebSocket-Accept", Encodings::Base64::encode(std::string((const char*)outdata, 20))); +#endif + //H.SetHeader("Sec-WebSocket-Protocol", "json"); + H.SendResponse("101", "Websocket away!", C); + } + + /// Loops calling readFrame until the connection is closed, sleeping in between reads if needed. + bool Websocket::readLoop(){ + while (C){ + if (readFrame()){ + return true; + } + Util::sleep(500); + } + return false; + } + + /// Loops reading from the socket until either there is no more data ready or a whole frame was read. + bool Websocket::readFrame(){ + while(true){ + //Check if we can receive the minimum frame size (2 header bytes, 0 payload) + if (!C.Received().available(2)){ + if (C.spool()){continue;} + return false; + } + std::string head = C.Received().copy(2); + //Read masked bit and payload length + bool masked = head[1] & 0x80; + uint64_t payLen = head[1] & 0x7F; + uint32_t headSize = 2 + (masked?4:0) + (payLen==126?2:0) + (payLen==127?8:0); + if (headSize > 2){ + //Check if we can receive the whole header + if (!C.Received().available(headSize)){ + if (C.spool()){continue;} + return false; + } + //Read entire header, re-read real payload length + head = C.Received().copy(headSize); + if (payLen == 126){ + payLen = Bit::btohs(head.data()+2); + }else if (payLen == 127){ + payLen = Bit::btohll(head.data()+2); + } + } + //Check if we can receive the whole frame (header + payload) + if (!C.Received().available(headSize + payLen)){ + if (C.spool()){continue;} + return false; + } + C.Received().remove(headSize);//delete the header + std::string pl = C.Received().remove(payLen); + if (masked){ + //If masked, apply the mask to the payload + const char * mask = head.data() + headSize - 4;//mask is last 4 bytes of header + for (uint32_t i = 0; i < payLen; ++i){ + pl[i] ^= mask[i % 4]; + } + } + if ((head[0] & 0xF)){ + //Non-continuation + frameType = (head[0] & 0xF); + data.assign(pl.data(), pl.size()); + }else{ + //Continuation + data.append(pl.data(), pl.size()); + } + if (head[0] & 0x80){ + //FIN + switch (frameType){ + case 0x0://Continuation, should not happen + WARN_MSG("Received unknown websocket frame - ignoring"); + break; + case 0x8://Connection close + HIGH_MSG("Websocket close received"); + C.close(); + break; + case 0x9://Ping + HIGH_MSG("Websocket ping received"); + sendFrame(data, data.size(), 0xA);//send pong + break; + case 0xA://Pong + HIGH_MSG("Websocket pong received"); + break; + } + return true; + } + } + } + + void Websocket::sendFrame(const char * data, unsigned int len, unsigned int frameType){ + char header[10]; + header[0] = 0x80 + frameType;//FIN + frameType + if (len < 126){ + header[1] = len; + C.SendNow(header, 2); + }else{ + if (len <= 0xFFFF){ + header[1] = 126; + Bit::htobs(header+2, len); + C.SendNow(header, 4); + }else{ + header[1] = 127; + Bit::htobll(header+2, len); + C.SendNow(header, 10); + } + } + C.SendNow(data, len); + } + + void Websocket::sendFrame(const std::string & data){ + sendFrame(data.data(), data.size()); + } + + Websocket::operator bool() const{return C;} + +}// namespace HTTP + diff --git a/lib/websocket.h b/lib/websocket.h new file mode 100644 index 00000000..2c386b1e --- /dev/null +++ b/lib/websocket.h @@ -0,0 +1,22 @@ +#pragma once +#include "http_parser.h" +#include "socket.h" +#include "util.h" + +namespace HTTP{ + class Websocket{ + public: + Websocket(Socket::Connection &c, HTTP::Parser &h); + operator bool() const; + bool readFrame(); + bool readLoop(); + void sendFrame(const char * data, unsigned int len, unsigned int frameType = 1); + void sendFrame(const std::string & data); + Util::ResizeablePointer data; + uint8_t frameType; + private: + Socket::Connection &C; + HTTP::Parser &H; + }; +}// namespace HTTP + From 798f099638643c7fd05a3568cd14fe0502d2fa17 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 8 Feb 2018 14:50:48 +0100 Subject: [PATCH 2/3] JSON output websocket support --- src/output/output_json.cpp | 33 +++++++++++++++++++++++++++++++-- src/output/output_json.h | 3 ++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index 6c7096f9..b2fa08f6 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -2,8 +2,16 @@ #include namespace Mist { - OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){realTime = 0;} - OutJSON::~OutJSON() {} + OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){ + ws = 0; + realTime = 0; + } + OutJSON::~OutJSON() { + if (ws){ + delete ws; + ws = 0; + } + } void OutJSON::init(Util::Config * cfg){ HTTPOutput::init(cfg); @@ -16,9 +24,17 @@ namespace Mist { capa["methods"][0u]["type"] = "html5/text/javascript"; capa["methods"][0u]["priority"] = 0ll; capa["methods"][0u]["url_rel"] = "/$.json"; + capa["methods"][1u]["handler"] = "ws"; + capa["methods"][1u]["type"] = "html5/text/javascript"; + capa["methods"][1u]["priority"] = 0ll; + capa["methods"][1u]["url_rel"] = "/$.json"; } void OutJSON::sendNext(){ + if (ws){ + ws->sendFrame(thisPacket.toJSON().toString()); + return; + } if (!jsonp.size()){ if(!first) { myConn.SendNow(", ", 2); @@ -63,6 +79,19 @@ namespace Mist { selectedTracks.insert(JSON::Value(H.GetVar("track")).asInt()); } + if (H.GetHeader("Upgrade") == "websocket"){ + ws = new HTTP::Websocket(myConn, H); + if (!(*ws)){ + delete ws; + ws = 0; + return; + } + sentHeader = true; + parseData = true; + wantRequest = false; + return; + } + H.Clean(); H.setCORSHeaders(); if(method == "OPTIONS" || method == "HEAD"){ diff --git a/src/output/output_json.h b/src/output/output_json.h index 29ad92c0..3e8c7023 100644 --- a/src/output/output_json.h +++ b/src/output/output_json.h @@ -1,5 +1,5 @@ #include "output_http.h" - +#include namespace Mist { class OutJSON : public HTTPOutput { @@ -14,6 +14,7 @@ namespace Mist { protected: std::string jsonp; bool first; + HTTP::Websocket * ws; }; } From b0bf1d14ec50b360ea45a640f77810fc434e6e6a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 20 Mar 2018 15:06:37 +0100 Subject: [PATCH 3/3] Added HTTP info.js websocket mode --- src/output/output_http.cpp | 40 +--- src/output/output_http.h | 1 + src/output/output_http_internal.cpp | 294 ++++++++++++++++++---------- src/output/output_http_internal.h | 8 + 4 files changed, 205 insertions(+), 138 deletions(-) diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 08f74ed3..16eeb264 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -224,45 +224,19 @@ namespace Mist { } INFO_MSG("Received request %s", H.getUrl().c_str()); - initialize(); - if (H.GetVar("audio") != "" || H.GetVar("video") != ""){ - selectedTracks.clear(); - if (H.GetVar("audio") != ""){ - selectedTracks.insert(JSON::Value(H.GetVar("audio")).asInt()); - } - if (H.GetVar("video") != ""){ - selectedTracks.insert(JSON::Value(H.GetVar("video")).asInt()); - } - selectDefaultTracks(); - std::set toRemove; - if (H.GetVar("video") == "0"){ - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks.at(*it).type=="video"){ - toRemove.insert(*it); - } - } - } - if (H.GetVar("audio") == "0"){ - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks.at(*it).type=="audio"){ - toRemove.insert(*it); - } - } - } - //remove those from selectedtracks - for (std::set::iterator it = toRemove.begin(); it != toRemove.end(); it++){ - selectedTracks.erase(*it); - } - }else{ - selectDefaultTracks(); - } - + preHTTP(); onHTTP(); if (!H.bufferChunks){ H.Clean(); } } } + + /// Default implementation of preHTTP simply calls initialize and selectDefaultTracks. + void HTTPOutput::preHTTP(){ + initialize(); + selectDefaultTracks(); + } static inline void builPipedPart(JSON::Value & p, char * argarr[], int & argnum, JSON::Value & argset){ jsonForEach(argset, it) { diff --git a/src/output/output_http.h b/src/output/output_http.h index bdf00151..bad8ae6a 100644 --- a/src/output/output_http.h +++ b/src/output/output_http.h @@ -14,6 +14,7 @@ namespace Mist { virtual void onFail(); virtual void onHTTP(){}; virtual void requestHandler(); + virtual void preHTTP(); static bool listenMode(){return false;} void reConnector(std::string & connector); std::string getHandler(); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 94026a9a..9dcdb120 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -5,9 +5,11 @@ #include #include "flashPlayer.h" #include "oldFlashPlayer.h" +#include namespace Mist { OutHTTP::OutHTTP(Socket::Connection & conn) : HTTPOutput(conn){ + stayConnected = false; if (myConn.getPureSocket() >= 0){ std::string host = getConnectedHost(); dup2(myConn.getSocket(), STDIN_FILENO); @@ -42,6 +44,7 @@ namespace Mist { return; } if (H.url.size() >= 3 && H.url.substr(H.url.size() - 3) == ".js"){ + if (websocketHandler()){return;} JSON::Value json_resp; json_resp["error"] = "Could not retrieve stream. Sorry."; if (H.url.size() >= 5 && H.url.substr(0, 5) == "/json"){ @@ -233,6 +236,146 @@ namespace Mist { H.SendResponse("200", "OK", myConn); } + JSON::Value OutHTTP::getStatusJSON(std::string & reqHost){ + JSON::Value json_resp; + uint8_t streamStatus = Util::getStreamStatus(streamName); + if (streamStatus != STRMSTAT_READY){ + switch (streamStatus){ + case STRMSTAT_OFF: + json_resp["error"] = "Stream is offline"; + break; + case STRMSTAT_INIT: + json_resp["error"] = "Stream is initializing"; + break; + case STRMSTAT_BOOT: + json_resp["error"] = "Stream is booting"; + break; + case STRMSTAT_WAIT: + json_resp["error"] = "Stream is waiting for data"; + break; + case STRMSTAT_SHUTDOWN: + json_resp["error"] = "Stream is shutting down"; + break; + case STRMSTAT_INVALID: + json_resp["error"] = "Stream status is invalid?!"; + break; + default: + json_resp["error"] = "Stream status is unknown?!"; + break; + } + return json_resp; + } + initialize(); + if (!myConn){ + return json_resp; + } + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); + configLock.wait(); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); + if (!prots){ + json_resp["error"] = "The specified stream is not available on this server."; + configLock.post(); + configLock.close(); + return json_resp; + } + + bool hasVideo = false; + for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ + if (trit->second.type == "video"){ + hasVideo = true; + if (trit->second.width > json_resp["width"].asInt()){ + json_resp["width"] = trit->second.width; + } + if (trit->second.height > json_resp["height"].asInt()){ + json_resp["height"] = trit->second.height; + } + } + } + if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){ + json_resp["width"] = 640ll; + json_resp["height"] = 480ll; + if (!hasVideo){json_resp["height"] = 20ll;} + } + if (myMeta.vod){ + json_resp["type"] = "vod"; + } + if (myMeta.live){ + json_resp["type"] = "live"; + } + + // show ALL the meta datas! + json_resp["meta"] = myMeta.toJSON(); + jsonForEach(json_resp["meta"]["tracks"], it) { + if (it->isMember("lang")){ + (*it)["language"] = Encodings::ISO639::decode((*it)["lang"].asStringRef()); + } + it->removeMember("fragments"); + it->removeMember("keys"); + it->removeMember("keysizes"); + it->removeMember("parts"); + } + + //create a set for storing source information + std::set sources; + + //find out which connectors are enabled + std::set conns; + unsigned int prots_ctr = prots.getSize(); + for (unsigned int i = 0; i < prots_ctr; ++i){ + conns.insert(prots.getIndice(i).getMember("connector").asString()); + } + //loop over the connectors. + for (unsigned int i = 0; i < prots_ctr; ++i){ + std::string cName = prots.getIndice(i).getMember("connector").asString(); + DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); + //if the connector has a port, + if (capa.getMember("optional").getMember("port")){ + HTTP::URL outURL(reqHost); + //get the default port if none is set + outURL.port = prots.getIndice(i).getMember("port").asString(); + if (!outURL.port.size()){ + outURL.port = capa.getMember("optional").getMember("port").getMember("default").asString(); + } + outURL.protocol = capa.getMember("protocol").asString(); + if (outURL.protocol.find(':') != std::string::npos){ + outURL.protocol.erase(outURL.protocol.find(':')); + } + //and a URL - then list the URL + JSON::Value capa_json = capa.asJSON(); + if (capa.getMember("url_rel") || capa.getMember("methods")){ + addSources(streamName, sources, outURL, capa_json, json_resp["meta"]); + } + //Make note if this connector can be depended upon by other connectors + if (capa.getMember("provides")){ + std::string cProv = capa.getMember("provides").asString(); + //if this connector can be depended upon by other connectors, loop over the rest + //check each enabled protocol separately to see if it depends on this connector + DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); + unsigned int capa_lst_ctr = capa_lst.getSize(); + for (unsigned int j = 0; j < capa_lst_ctr; ++j){ + //if it depends on this connector and has a URL, list it + if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ + JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); + addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"]); + } + } + } + } + } + + //loop over the added sources, add them to json_resp["sources"] + for (std::set::iterator it = sources.begin(); it != sources.end(); it++){ + if ((*it)["simul_tracks"].asInt() > 0){ + json_resp["source"].append(*it); + } + } + configLock.post(); + configLock.close(); + return json_resp; + } + + void OutHTTP::onHTTP(){ std::string method = H.method; @@ -352,6 +495,7 @@ namespace Mist { } if ((H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js") || (H.url.length() > 9 && H.url.substr(0, 6) == "/json_" && H.url.substr(H.url.length() - 3, 3) == ".js")){ + if (websocketHandler()){return;} std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; std::string response; std::string rURL = H.url; @@ -368,112 +512,9 @@ namespace Mist { H.Clean(); return; } - response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n"; - JSON::Value json_resp; initialize(); - if (!myConn){ - return; - } - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - if (prots){ - bool hasVideo = false; - for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.type == "video"){ - hasVideo = true; - if (trit->second.width > json_resp["width"].asInt()){ - json_resp["width"] = trit->second.width; - } - if (trit->second.height > json_resp["height"].asInt()){ - json_resp["height"] = trit->second.height; - } - } - } - if (json_resp["width"].asInt() < 1 || json_resp["height"].asInt() < 1){ - json_resp["width"] = 640ll; - json_resp["height"] = 480ll; - if (!hasVideo){json_resp["height"] = 20ll;} - } - if (myMeta.vod){ - json_resp["type"] = "vod"; - } - if (myMeta.live){ - json_resp["type"] = "live"; - } - - // show ALL the meta datas! - json_resp["meta"] = myMeta.toJSON(); - jsonForEach(json_resp["meta"]["tracks"], it) { - if (it->isMember("lang")){ - (*it)["language"] = Encodings::ISO639::decode((*it)["lang"].asStringRef()); - } - it->removeMember("fragments"); - it->removeMember("keys"); - it->removeMember("keysizes"); - it->removeMember("parts"); - } - - //create a set for storing source information - std::set sources; - - //find out which connectors are enabled - std::set conns; - unsigned int prots_ctr = prots.getSize(); - for (unsigned int i = 0; i < prots_ctr; ++i){ - conns.insert(prots.getIndice(i).getMember("connector").asString()); - } - //loop over the connectors. - for (unsigned int i = 0; i < prots_ctr; ++i){ - std::string cName = prots.getIndice(i).getMember("connector").asString(); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); - //if the connector has a port, - if (capa.getMember("optional").getMember("port")){ - HTTP::URL outURL(reqHost); - //get the default port if none is set - outURL.port = prots.getIndice(i).getMember("port").asString(); - if (!outURL.port.size()){ - outURL.port = capa.getMember("optional").getMember("port").getMember("default").asString(); - } - outURL.protocol = capa.getMember("protocol").asString(); - if (outURL.protocol.find(':') != std::string::npos){ - outURL.protocol.erase(outURL.protocol.find(':')); - } - //and a URL - then list the URL - JSON::Value capa_json = capa.asJSON(); - if (capa.getMember("url_rel") || capa.getMember("methods")){ - addSources(streamName, sources, outURL, capa_json, json_resp["meta"]); - } - //Make note if this connector can be depended upon by other connectors - if (capa.getMember("provides")){ - std::string cProv = capa.getMember("provides").asString(); - //if this connector can be depended upon by other connectors, loop over the rest - //check each enabled protocol separately to see if it depends on this connector - DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); - unsigned int capa_lst_ctr = capa_lst.getSize(); - for (unsigned int j = 0; j < capa_lst_ctr; ++j){ - //if it depends on this connector and has a URL, list it - if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ - JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); - addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"]); - } - } - } - } - } - - //loop over the added sources, add them to json_resp["sources"] - for (std::set::iterator it = sources.begin(); it != sources.end(); it++){ - if ((*it)["simul_tracks"].asInt() > 0){ - json_resp["source"].append(*it); - } - } - }else{ - json_resp["error"] = "The specified stream is not available on this server."; - } - configLock.post(); - configLock.close(); + response = "// Generating info code for stream " + streamName + "\n\nif (!mistvideo){var mistvideo = {};}\n"; + JSON::Value json_resp = getStatusJSON(reqHost); if (rURL.substr(0, 6) != "/json_"){ response += "mistvideo['" + streamName + "'] = " + json_resp.toString() + ";\n"; }else{ @@ -656,5 +697,48 @@ namespace Mist { H.Clean(); } + bool OutHTTP::websocketHandler(){ + stayConnected = true; + std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; + if (H.GetHeader("Upgrade") != "websocket"){return false;} + HTTP::Websocket ws(myConn, H); + if (!ws){return false;} + //start the stream, if needed + Util::startInput(streamName, "", true, false); + + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + IPC::sharedPage streamStatus(pageName, 1, false, false); + uint8_t prevState, newState, metaCounter; + uint64_t prevTracks; + prevState = newState = STRMSTAT_INVALID; + while (keepGoing()){ + if (!streamStatus || !streamStatus.exists()){streamStatus.init(pageName, 1, false, false);} + if (!streamStatus){newState = STRMSTAT_OFF;}else{newState = streamStatus.mapped[0];} + + if (newState != prevState || (newState == STRMSTAT_READY && myMeta.tracks.size() != prevTracks)){ + if (newState == STRMSTAT_READY){ + reconnect(); + updateMeta(); + prevTracks = myMeta.tracks.size(); + }else{ + disconnect(); + } + JSON::Value resp = getStatusJSON(reqHost); + ws.sendFrame(resp.toString()); + prevState = newState; + }else{ + if (newState == STRMSTAT_READY){ + stats(); + } + Util::sleep(250); + if (newState == STRMSTAT_READY && (++metaCounter % 4) == 0){ + updateMeta(); + } + } + } + return true; + } + } diff --git a/src/output/output_http_internal.h b/src/output/output_http_internal.h index 5badb0d9..f2f9a0ba 100644 --- a/src/output/output_http_internal.h +++ b/src/output/output_http_internal.h @@ -9,9 +9,17 @@ namespace Mist { static void init(Util::Config * cfg); static bool listenMode(); virtual void onFail(); + ///preHTTP is disabled in the internal HTTP output, since most don't need the stream alive to work + virtual void preHTTP(){}; void HTMLResponse(); void onHTTP(); void sendIcon(); + bool websocketHandler(); + JSON::Value getStatusJSON(std::string & reqHost); + bool stayConnected; + virtual bool onFinish(){ + return stayConnected; + } }; }