From 1451b64e000df5b6b46adf415bad62c75899bf7d Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:19:02 +0200 Subject: [PATCH 1/8] Improved track selection logic for JSON output --- src/output/output.cpp | 56 +++++++++++++++++++++----------------- src/output/output_json.cpp | 16 +---------- src/output/output_raw.cpp | 2 +- 3 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index b988cd78..87bdd79b 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -277,28 +277,22 @@ namespace Mist{ unsigned int bestSoFarCount = 0; unsigned int index = 0; jsonForEach(capa["codecs"], it){ - unsigned int genCounter = 0; unsigned int selCounter = 0; if ((*it).size() > 0){ jsonForEach((*it), itb){ if ((*itb).size() > 0){ - bool found = false; jsonForEach(*itb, itc){ + const std::string & strRef = (*itc).asStringRef(); + bool byType = false; + bool multiSel = false; + uint8_t shift = 0; + if (strRef[shift] == '@'){byType = true; ++shift;} + if (strRef[shift] == '+'){multiSel = true; ++shift;} for (std::set::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){ - if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){ + if ((!byType && myMeta.tracks[*itd].codec == strRef.substr(shift)) || (byType && myMeta.tracks[*itd].type == strRef.substr(shift)) || strRef.substr(shift) == "*"){ selCounter++; - found = true; - break; - } - } - } - if (!found){ - jsonForEach(*itb, itc){ - for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){ - genCounter++; - found = true; - if ((*itc).asStringRef() != "*"){break;} + if (!multiSel){ + break; } } } @@ -306,10 +300,10 @@ namespace Mist{ } } if (selCounter == selectedTracks.size()){ - if (selCounter + genCounter > bestSoFarCount){ - bestSoFarCount = selCounter + genCounter; + if (selCounter > bestSoFarCount){ + bestSoFarCount = selCounter; bestSoFar = index; - HIGH_MSG("Match (%u/%u): %s", selCounter, selCounter + genCounter, (*it).toString().c_str()); + HIGH_MSG("Matched %u: %s", selCounter, (*it).toString().c_str()); } }else{ VERYHIGH_MSG("Not a match for currently selected tracks: %s", (*it).toString().c_str()); @@ -324,31 +318,43 @@ namespace Mist{ jsonForEach(capa["codecs"][bestSoFar], itb){ if ((*itb).size() && myMeta.tracks.size()){ bool found = false; + bool multiFind = false; jsonForEach((*itb), itc){ + const std::string & strRef = (*itc).asStringRef(); + bool byType = false; + uint8_t shift = 0; + if (strRef[shift] == '@'){byType = true; ++shift;} + if (strRef[shift] == '+'){multiFind = true; ++shift;} for (std::set::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){ - if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){ + if ((!byType && myMeta.tracks[*itd].codec == strRef.substr(shift)) || (byType && myMeta.tracks[*itd].type == strRef.substr(shift)) || strRef.substr(shift) == "*"){ found = true; break; } } } - if (!found){ + if (!found || multiFind){ jsonForEach((*itb), itc){ - if (found){break;} + const std::string & strRef = (*itc).asStringRef(); + bool byType = false; + bool multiSel = false; + uint8_t shift = 0; + if (strRef[shift] == '@'){byType = true; ++shift;} + if (strRef[shift] == '+'){multiSel = true; ++shift;} + if (found && !multiSel){continue;} if (myMeta.live){ for (std::map::reverse_iterator trit = myMeta.tracks.rbegin(); trit != myMeta.tracks.rend(); trit++){ - if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){ + if ((!byType && trit->second.codec == strRef.substr(shift)) || (byType && trit->second.type == strRef.substr(shift)) || strRef.substr(shift) == "*"){ selectedTracks.insert(trit->first); found = true; - if ((*itc).asStringRef() != "*"){break;} + if (!multiSel){break;} } } }else{ for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.codec == (*itc).asStringRef() || (*itc).asStringRef() == "*"){ + if ((!byType && trit->second.codec == strRef.substr(shift)) || (byType && trit->second.type == strRef.substr(shift)) || strRef.substr(shift) == "*"){ selectedTracks.insert(trit->first); found = true; - if ((*itc).asStringRef() != "*"){break;} + if (!multiSel){break;} } } } diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index b2fa08f6..1add0a8c 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -18,8 +18,7 @@ namespace Mist { capa["name"] = "JSON"; capa["desc"] = "Enables HTTP protocol JSON streaming."; capa["url_match"] = "/$.json"; - capa["codecs"][0u][0u].append("srt"); - capa["codecs"][0u][0u].append("TTXT"); + capa["codecs"][0u][0u].append("@+meta"); capa["methods"][0u]["handler"] = "http"; capa["methods"][0u]["type"] = "html5/text/javascript"; capa["methods"][0u]["priority"] = 0ll; @@ -74,10 +73,6 @@ namespace Mist { jsonp = ""; if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");} if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");} - if (H.GetVar("track") != ""){ - selectedTracks.clear(); - selectedTracks.insert(JSON::Value(H.GetVar("track")).asInt()); - } if (H.GetHeader("Upgrade") == "websocket"){ ws = new HTTP::Websocket(myConn, H); @@ -101,16 +96,7 @@ namespace Mist { H.Clean(); return; } - first = true; - initialize(); - if (!selectedTracks.size()){ - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (it->second.type == "meta" ){ - selectedTracks.insert(it->first); - } - } - } parseData = true; wantRequest = false; } diff --git a/src/output/output_raw.cpp b/src/output/output_raw.cpp index 571cf29b..2d98b3b3 100644 --- a/src/output/output_raw.cpp +++ b/src/output/output_raw.cpp @@ -46,7 +46,7 @@ namespace Mist { capa["optional"]["seek"]["help"] = "The time in milliseconds to seek to, 0 by default."; capa["optional"]["seek"]["type"] = "int"; capa["optional"]["seek"]["option"] = "--seek"; - capa["codecs"][0u][0u].append("*"); + capa["codecs"][0u][0u].append("+*"); cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream that this connector will transmit.\"}")); cfg->addOption("tracks", From 6adfc2c970dbbcb7754abaae8b2d2cbb4588df0a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:24:29 +0200 Subject: [PATCH 2/8] Allow downloading of MP4 output as file through ?dl=1 and/or ?dl=filename.ext GET parameter --- src/output/output_progressive_mp4.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp index e96de3a5..ed690583 100644 --- a/src/output/output_progressive_mp4.cpp +++ b/src/output/output_progressive_mp4.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "output_progressive_mp4.h" #include @@ -435,9 +436,19 @@ namespace Mist { } void OutProgressiveMP4::onHTTP() { + std::string dl; + if (H.GetVar("dl").size()){ + dl = H.GetVar("dl"); + if (dl.find('.') == std::string::npos){ + dl = streamName + ".mp4"; + } + } if(H.method == "OPTIONS" || H.method == "HEAD"){ H.Clean(); H.setCORSHeaders(); + if (dl.size()){ + H.SetHeader("Content-Disposition" ,"attachment; filename="+Encodings::URL::encode(dl)+";"); + } H.SetHeader("Content-Type", "video/MP4"); H.SetHeader("Accept-Ranges", "bytes, parsec"); H.SendResponse("200", "OK", myConn); @@ -473,6 +484,10 @@ namespace Mist { } H.Clean(); //make sure no parts of old requests are left in any buffers H.setCORSHeaders(); + if (dl.size()){ + H.SetHeader("Content-Disposition" ,"attachment; filename="+Encodings::URL::encode(dl)+";"); + realTime = 0;//force max download speed when downloading + } H.SetHeader("Content-Type", "video/MP4"); //Send the correct content-type for MP4 files H.SetHeader("Accept-Ranges", "bytes, parsec"); if (rangeType != ' '){ From bd0b820577d7541faaadfbd75d3ff8dc90c30554 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 6 Jun 2018 23:21:39 +0200 Subject: [PATCH 3/8] Added JSON output duplicate checking and persistent connections --- src/output/output_json.cpp | 59 ++++++++++++++++++++++++++++++++++++-- src/output/output_json.h | 5 ++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index 1add0a8c..b1750ef2 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -1,10 +1,13 @@ #include "output_json.h" +#include #include namespace Mist { OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){ ws = 0; realTime = 0; + keepReselecting = false; + dupcheck = false; } OutJSON::~OutJSON() { if (ws){ @@ -30,8 +33,15 @@ namespace Mist { } void OutJSON::sendNext(){ + JSON::Value jPack = thisPacket.toJSON(); + if (dupcheck){ + if (jPack.compareExcept(lastVal, nodup)){ + return;//skip duplicates + } + lastVal = jPack; + } if (ws){ - ws->sendFrame(thisPacket.toJSON().toString()); + ws->sendFrame(jPack.toString()); return; } if (!jsonp.size()){ @@ -44,7 +54,7 @@ namespace Mist { }else{ myConn.SendNow(jsonp + "("); } - myConn.SendNow(thisPacket.toJSON().toString()); + myConn.SendNow(jPack.toString()); if (jsonp.size()){ myConn.SendNow(");\n", 3); } @@ -60,7 +70,37 @@ namespace Mist { sentHeader = true; } + void OutJSON::onFail(){ + //Only run failure handle if we're not being persistent + if (!keepReselecting){ + HTTPOutput::onFail(); + }else{ + onFinish(); + } + } + bool OutJSON::onFinish(){ + static bool recursive = false; + if (recursive){return true;} + recursive = true; + if (keepReselecting){ + uint64_t maxTimer = 7200; + while (--maxTimer && nProxy.userClient.isAlive() && keepGoing()){ + Util::wait(500); + stats(); + if (Util::getStreamStatus(streamName) != STRMSTAT_READY){ + disconnect(); + }else{ + updateMeta(); + if (isReadyForPlay()){ + recursive = false; + return true; + } + } + } + recursive = false; + return false; + } if (!jsonp.size() && !first){ myConn.SendNow("]);\n\n", 5); } @@ -71,6 +111,21 @@ namespace Mist { void OutJSON::onHTTP(){ std::string method = H.method; jsonp = ""; + if (H.GetVar("persist") != ""){keepReselecting = true;} + if (H.GetVar("dedupe") != ""){ + dupcheck = true; + size_t index; + std::string dupes = H.GetVar("dedupe"); + while (dupes != "") { + index = dupes.find(','); + nodup.insert(dupes.substr(0, index)); + if (index != std::string::npos) { + dupes.erase(0, index + 1); + } else { + dupes = ""; + } + } + } if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");} if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");} diff --git a/src/output/output_json.h b/src/output/output_json.h index 3e8c7023..7959b4a2 100644 --- a/src/output/output_json.h +++ b/src/output/output_json.h @@ -9,10 +9,15 @@ namespace Mist { static void init(Util::Config * cfg); void onHTTP(); bool onFinish(); + void onFail(); void sendNext(); void sendHeader(); protected: + JSON::Value lastVal; + bool keepReselecting; std::string jsonp; + bool dupcheck; + std::set nodup; bool first; HTTP::Websocket * ws; }; From 43934cf69c17d9b2d74e5e1253def3d7be243fe5 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:30:38 +0200 Subject: [PATCH 4/8] Added bootMsOffset to DTSC::Meta, RTMP now syncs on it. --- lib/dtsc.h | 3 ++- lib/dtscmeta.cpp | 35 +++++++++++++++++++++++++++++++++-- src/output/output.cpp | 3 +++ src/output/output_rtmp.cpp | 12 +++++++++++- src/output/output_rtmp.h | 4 +++- 5 files changed, 52 insertions(+), 5 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index 428fa3c2..472a34c4 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -118,7 +118,7 @@ namespace DTSC { packType getVersion() const; void reInit(Socket::Connection & src); void reInit(const char * data_, unsigned int len, bool noCopy = false); - void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe); + void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset = 0); void getString(const char * identifier, char *& result, unsigned int & len) const; void getString(const char * identifier, std::string & result) const; void getInt(const char * identifier, uint64_t & result) const; @@ -348,6 +348,7 @@ namespace DTSC { uint16_t version; long long int moreheader; long long int bufferWindow; + int64_t bootMsOffset;///< Millis to add to packet timestamps to get millis since system boot. std::string sourceURI; }; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index a1833849..d6be464d 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -209,7 +209,7 @@ namespace DTSC { /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. /// When given a NULL pointer, the data is reserved and memset to 0 /// If given a NULL pointer and a zero size, an empty packet is created. - void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe){ + void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset){ null(); master = true; //time and trackID are part of the 20-byte header. @@ -217,12 +217,13 @@ namespace DTSC { //offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen) //bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen) //keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen) + //bootmsoffset, if != 0, adds 9 bytes (integer type) and 5 bytes (2+namelen) //data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen) if (packData && packDataSize < 1){ FAIL_MSG("Attempted to fill a packet with %lli bytes!", packDataSize); return; } - unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + packDataSize+11; + unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + (bootMsOffset?14:0) + packDataSize+11; resize(sendLen); //set internal variables version = DTSC_V2; @@ -259,6 +260,14 @@ namespace DTSC { memcpy(data+offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19); offset += 19; } + if (bootMsOffset){ + memcpy(data+offset, "\000\003bmo\001", 6); + tmpLong = htonl((int)(bootMsOffset >> 32)); + memcpy(data+offset+6, (char *)&tmpLong, 4); + tmpLong = htonl((int)(bootMsOffset & 0xFFFFFFFF)); + memcpy(data+offset+10, (char *)&tmpLong, 4); + offset += 14; + } memcpy(data+offset, "\000\004data\002", 7); tmpLong = htonl(packDataSize); memcpy(data+offset+7, (char *)&tmpLong, 4); @@ -1411,6 +1420,7 @@ namespace DTSC { moreheader = 0; merged = false; bufferWindow = 0; + bootMsOffset = 0; } Meta::Meta(const DTSC::Packet & source) { @@ -1425,6 +1435,7 @@ namespace DTSC { merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); + bootMsOffset = source.getInt("bootoffset"); source.getString("source", sourceURI); Scan tmpTracks = source.getScan().getMember("tracks"); unsigned int num = 0; @@ -1447,6 +1458,7 @@ namespace DTSC { live = meta.isMember("live") && meta["live"]; sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : ""; version = meta.isMember("version") ? meta["version"].asInt() : 0; + bootMsOffset = meta.isMember("bootoffset") ? meta["bootoffset"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; if (meta.isMember("buffer_window")) { @@ -1476,6 +1488,9 @@ namespace DTSC { unsigned int dataLen; pack.getString("data", data, dataLen); update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size); + if (!bootMsOffset && pack.hasMember("bmo")){ + bootMsOffset = pack.getInt("bmo"); + } } ///\brief Updates a meta object given a DTSC::Packet with byte position override. @@ -1874,6 +1889,7 @@ namespace DTSC { } } if (version){dataLen += 18;} + if (bootMsOffset){dataLen += 21;} if (sourceURI.size()){dataLen += 13+sourceURI.size();} return dataLen + 8; //add 8 bytes header } @@ -1904,6 +1920,10 @@ namespace DTSC { writePointer(p, "\000\007version\001", 10); writePointer(p, convertLongLong(version), 8); } + if (bootMsOffset) { + writePointer(p, "\000\012bootoffset\001", 13); + writePointer(p, convertLongLong(bootMsOffset), 8); + } if (sourceURI.size()) { writePointer(p, "\000\006source\002", 9); writePointer(p, convertInt(sourceURI.size()), 4); @@ -1946,6 +1966,10 @@ namespace DTSC { conn.SendNow("\000\007version\001", 10); conn.SendNow(convertLongLong(version), 8); } + if (bootMsOffset) { + conn.SendNow("\000\012bootoffset\001", 10); + conn.SendNow(convertLongLong(bootMsOffset), 8); + } if (sourceURI.size()) { conn.SendNow("\000\006source\002", 9); conn.SendNow(convertInt(sourceURI.size()), 4); @@ -2044,6 +2068,9 @@ namespace DTSC { if (version) { result["version"] = (long long)version; } + if (bootMsOffset){ + result["bootoffset"] = (long long)bootMsOffset; + } if (sourceURI.size()){ result["source"] = sourceURI; } @@ -2083,6 +2110,9 @@ namespace DTSC { if (sourceURI.size()){ str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl; } + if (bootMsOffset){ + str << std::string(indent, ' ') << "Boot MS offset: " << bootMsOffset << std::endl; + } str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; } @@ -2091,6 +2121,7 @@ namespace DTSC { for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { it->second.reset(); } + bootMsOffset = 0; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 87bdd79b..20d94182 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1187,6 +1187,7 @@ namespace Mist{ return false; } } + initialize(); return true; } @@ -1225,6 +1226,7 @@ namespace Mist{ DTSC::Meta reMeta; reMeta.reinit(tmpMeta); myMeta.sourceURI = reMeta.sourceURI; + myMeta.bootMsOffset = reMeta.bootMsOffset; } if (liveSem){ liveSem->post(); @@ -1232,6 +1234,7 @@ namespace Mist{ liveSem = 0; } } + nProxy.metaPages.clear(); } } } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index fc46a3c3..b0115571 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -925,6 +925,16 @@ namespace Mist { F.toMeta(myMeta, *amf_storage, reTrack); if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ uint64_t tagTime = next.timestamp; + if (!bootMsOffset){ + if (myMeta.bootMsOffset){ + bootMsOffset = myMeta.bootMsOffset; + rtmpOffset = (Util::bootMS() - tagTime) - bootMsOffset; + }else{ + bootMsOffset = Util::bootMS() - tagTime; + rtmpOffset = 0; + } + } + tagTime += rtmpOffset; uint64_t & ltt = lastTagTime[reTrack]; //Check for decreasing timestamps - this is a connection error. //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. @@ -949,7 +959,7 @@ namespace Mist { ptr[i+1] = tmpchar; } } - thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe, F.isKeyframe?bootMsOffset:0); ltt = tagTime; if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 3256c6b5..591ccad3 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -15,8 +15,10 @@ namespace Mist { void sendHeader(); bool onFinish(); protected: - uint64_t rtmpOffset; void parseVars(std::string data); + int64_t rtmpOffset; + uint64_t lastOutTime; + int64_t bootMsOffset; std::string app_name; void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); From 9104d68a5d3d2a07a75841734cb55f0cf2d19b2e Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:36:57 +0200 Subject: [PATCH 5/8] Updated generic HTTP output to support websockets. Added basic websocket JSON push support. --- src/output/output_http.cpp | 46 +++++++++++++- src/output/output_http.h | 11 +++- src/output/output_json.cpp | 122 ++++++++++++++++++++++++++++--------- src/output/output_json.h | 12 +++- 4 files changed, 156 insertions(+), 35 deletions(-) diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 74e12f1a..b50d8cac 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -6,6 +6,9 @@ namespace Mist { HTTPOutput::HTTPOutput(Socket::Connection & conn) : Output(conn) { + webSock = 0; + idleInterval = 0; + idleLast = 0; if (config->getString("ip").size()){ myConn.setHost(config->getString("ip")); } @@ -14,6 +17,13 @@ namespace Mist { } config->activate(); } + + HTTPOutput::~HTTPOutput() { + if (webSock){ + delete webSock; + webSock = 0; + } + } void HTTPOutput::init(Util::Config * cfg){ Output::init(cfg); @@ -153,6 +163,21 @@ namespace Mist { } void HTTPOutput::requestHandler(){ + if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){ + onIdle(); + idleLast = Util::bootMS(); + } + if (webSock){ + if (webSock->readFrame()){ + onWebsocketFrame(); + idleLast = Util::bootMS(); + }else{ + if (!isBlocking && !parseData){ + Util::sleep(100); + } + } + return; + } if (myConn.Received().size() && myConn.spool()){ DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); onRequest(); @@ -197,12 +222,14 @@ namespace Mist { onRequest(); } if (!myConn.Received().size()){ - Util::sleep(500); + if (!isBlocking && !parseData){ + Util::sleep(100); + } } } }else{ if (!isBlocking && !parseData){ - Util::sleep(500); + Util::sleep(100); } } } @@ -224,8 +251,23 @@ namespace Mist { } INFO_MSG("Received request %s", H.getUrl().c_str()); + //Handle upgrade to websocket if the output supports it + if (doesWebsockets() && H.GetHeader("Upgrade") == "websocket"){ + INFO_MSG("Switching to Websocket mode"); + preWebsocketConnect(); + webSock = new HTTP::Websocket(myConn, H); + if (!(*webSock)){ + delete webSock; + webSock = 0; + return; + } + onWebsocketConnect(); + H.Clean(); + return; + } preHTTP(); onHTTP(); + idleLast = Util::bootMS(); if (!H.bufferChunks){ H.Clean(); } diff --git a/src/output/output_http.h b/src/output/output_http.h index bad8ae6a..9cf37718 100644 --- a/src/output/output_http.h +++ b/src/output/output_http.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include "output.h" namespace Mist { @@ -8,18 +9,26 @@ namespace Mist { class HTTPOutput : public Output { public: HTTPOutput(Socket::Connection & conn); - virtual ~HTTPOutput(){}; + virtual ~HTTPOutput(); static void init(Util::Config * cfg); void onRequest(); virtual void onFail(); virtual void onHTTP(){}; + virtual void onIdle(){}; + virtual void onWebsocketFrame(){}; + virtual void onWebsocketConnect(){}; + virtual void preWebsocketConnect(){}; virtual void requestHandler(); virtual void preHTTP(); static bool listenMode(){return false;} + virtual bool doesWebsockets(){return false;} void reConnector(std::string & connector); std::string getHandler(); bool parseRange(uint64_t & byteStart, uint64_t & byteEnd); protected: HTTP::Parser H; + HTTP::Websocket * webSock; + uint32_t idleInterval; + uint64_t idleLast; }; } diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index b1750ef2..ca7b682c 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -4,18 +4,13 @@ namespace Mist { OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){ - ws = 0; realTime = 0; + bootMsOffset = 0; keepReselecting = false; dupcheck = false; + noReceive = false; } - OutJSON::~OutJSON() { - if (ws){ - delete ws; - ws = 0; - } - } - + void OutJSON::init(Util::Config * cfg){ HTTPOutput::init(cfg); capa["name"] = "JSON"; @@ -31,7 +26,7 @@ namespace Mist { capa["methods"][1u]["priority"] = 0ll; capa["methods"][1u]["url_rel"] = "/$.json"; } - + void OutJSON::sendNext(){ JSON::Value jPack = thisPacket.toJSON(); if (dupcheck){ @@ -40,8 +35,8 @@ namespace Mist { } lastVal = jPack; } - if (ws){ - ws->sendFrame(jPack.toString()); + if (webSock){ + webSock->sendFrame(jPack.toString()); return; } if (!jsonp.size()){ @@ -83,7 +78,7 @@ namespace Mist { static bool recursive = false; if (recursive){return true;} recursive = true; - if (keepReselecting){ + if (keepReselecting && !isPushing()){ uint64_t maxTimer = 7200; while (--maxTimer && nProxy.userClient.isAlive() && keepGoing()){ Util::wait(500); @@ -99,18 +94,23 @@ namespace Mist { } } recursive = false; - return false; } if (!jsonp.size() && !first){ - myConn.SendNow("]);\n\n", 5); + myConn.SendNow("]\n", 2); } myConn.close(); return false; } - void OutJSON::onHTTP(){ - std::string method = H.method; - jsonp = ""; + void OutJSON::onWebsocketConnect(){ + sentHeader = true; + parseData = !noReceive; + } + + void OutJSON::preWebsocketConnect(){ + if (H.GetVar("password") != ""){pushPass = H.GetVar("password");} + if (H.GetVar("password").size() || H.GetVar("push").size()){noReceive = true;} + if (H.GetVar("persist") != ""){keepReselecting = true;} if (H.GetVar("dedupe") != ""){ dupcheck = true; @@ -126,22 +126,84 @@ namespace Mist { } } } + } + + void OutJSON::onWebsocketFrame(){ + if (!isPushing()){ + if (!allowPush(pushPass)){ + onFinish(); + return; + } + } + if (!bootMsOffset){ + if (myMeta.bootMsOffset){ + bootMsOffset = myMeta.bootMsOffset; + }else{ + bootMsOffset = Util::bootMS(); + } + } + //We now know we're allowed to push. Read a JSON object. + JSON::Value inJSON = JSON::fromString(webSock->data, webSock->data.size()); + if (!inJSON || !inJSON.isObject()){ + //Ignore empty and/or non-parsable JSON packets + MEDIUM_MSG("Ignoring non-JSON object: %s", webSock->data); + return; + } + //Let's create a new track for pushing purposes, if needed + if (!pushTrack){ + pushTrack = 1; + while (myMeta.tracks.count(pushTrack)){ + ++pushTrack; + } + } + myMeta.tracks[pushTrack].type = "meta"; + myMeta.tracks[pushTrack].codec = "JSON"; + //We have a track set correctly. Let's attempt to buffer a frame. + inJSON["trackid"] = (long long)pushTrack; + inJSON["datatype"] = "meta"; + lastSendTime = Util::bootMS(); + if (!inJSON.isMember("unix")){ + //Base timestamp on arrival time + inJSON["time"] = (long long)(lastSendTime - bootMsOffset); + }else{ + //Base timestamp on unix time + inJSON["time"] = (long long)((lastSendTime - bootMsOffset) + (Util::epoch() - Util::bootSecs()) * 1000); + } + inJSON["bmo"] = (long long)bootMsOffset; + lastVal = inJSON; + std::string packedJson = inJSON.toNetPacked(); + DTSC::Packet newPack(packedJson.data(), packedJson.size(), true); + bufferLivePacket(newPack); + if (!idleInterval){idleInterval = 100;} + if (isBlocking){setBlocking(false);} + } + + /// Repeats last JSON packet every 5 seconds to keep stream alive. + void OutJSON::onIdle(){ + if (nProxy.trackState[pushTrack] != FILL_ACC){ + continueNegotiate(pushTrack); + if (nProxy.trackState[pushTrack] == FILL_ACC){ + idleInterval = 5000; + } + return; + } + lastVal["time"] = (long long)(lastVal["time"].asInt() + (Util::bootMS() - lastSendTime)); + lastSendTime = Util::bootMS(); + lastVal.netPrepare(); + std::string packedJson = lastVal.toNetPacked(); + DTSC::Packet newPack(packedJson.data(), packedJson.size(), true); + myMeta.tracks[pushTrack].type = "meta"; + myMeta.tracks[pushTrack].codec = "JSON"; + bufferLivePacket(newPack); + } + + void OutJSON::onHTTP(){ + std::string method = H.method; + preWebsocketConnect();//Not actually a websocket, but we need to do the same checks + jsonp = ""; if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");} if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");} - if (H.GetHeader("Upgrade") == "websocket"){ - ws = new HTTP::Websocket(myConn, H); - if (!(*ws)){ - delete ws; - ws = 0; - return; - } - sentHeader = true; - parseData = true; - wantRequest = false; - return; - } - H.Clean(); H.setCORSHeaders(); if(method == "OPTIONS" || method == "HEAD"){ diff --git a/src/output/output_json.h b/src/output/output_json.h index 7959b4a2..3a290fc6 100644 --- a/src/output/output_json.h +++ b/src/output/output_json.h @@ -5,21 +5,29 @@ namespace Mist { class OutJSON : public HTTPOutput { public: OutJSON(Socket::Connection & conn); - ~OutJSON(); static void init(Util::Config * cfg); void onHTTP(); + void onIdle(); + virtual void onWebsocketFrame(); + virtual void onWebsocketConnect(); + virtual void preWebsocketConnect(); bool onFinish(); void onFail(); void sendNext(); void sendHeader(); + bool doesWebsockets(){return true;} protected: JSON::Value lastVal; + uint64_t lastSendTime; bool keepReselecting; std::string jsonp; + std::string pushPass; + uint64_t pushTrack; + int64_t bootMsOffset; bool dupcheck; std::set nodup; bool first; - HTTP::Websocket * ws; + bool noReceive; }; } From a984243d56b9d405ca53a8367f2d1ca4ecd220be Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:38:17 +0200 Subject: [PATCH 6/8] Implemented basic version of DTSC::RetimedPacket --- lib/dtsc.h | 19 ++++++++++++++++++- src/io.cpp | 4 ++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index 472a34c4..cf832493 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -129,7 +129,7 @@ namespace DTSC { void appendNal(const char * appendData, uint32_t appendLen); void upgradeNal(const char * appendData, uint32_t appendLen); void setKeyFrame(bool kf); - long long unsigned int getTime() const; + virtual long long unsigned int getTime() const; long int getTrackId() const; char * getData() const; int getDataLen() const; @@ -150,6 +150,23 @@ namespace DTSC { uint64_t prevNalSize; }; + /// A child class of DTSC::Packet, which allows overriding the packet time efficiently. + class RetimedPacket : public Packet { + public: + RetimedPacket(uint64_t reTime){ + timeOverride = reTime; + } + RetimedPacket(uint64_t reTime, const Packet & rhs) : Packet(rhs){ + timeOverride = reTime; + } + RetimedPacket(uint64_t reTime, const char * data_, unsigned int len, bool noCopy = false) : Packet(data_, len, noCopy){ + timeOverride = reTime; + } + virtual long long unsigned int getTime() const{return timeOverride;} + protected: + uint64_t timeOverride; + }; + /// A simple structure used for ordering byte seek positions. struct livePos { livePos() { diff --git a/src/io.cpp b/src/io.cpp index ebf48ff6..d5687db4 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -303,9 +303,9 @@ namespace Mist { //First memcpy only the payload to the destination //Leaves the 20 bytes inbetween empty to ensure the data is not accidentally read before it is complete memcpy(myPage.mapped + curOffset + 20, pack.getData() + 20, pack.getDataLen() - 20); - //Copy the remaing values in reverse order: + //Copy the remaining values in reverse order: //8 byte timestamp - memcpy(myPage.mapped + curOffset + 12, pack.getData() + 12, 8); + Bit::htobll(myPage.mapped + curOffset + 12, pack.getTime()); //The mapped track id ((int *)(myPage.mapped + curOffset + 8))[0] = htonl(mapTid); int size = Bit::btohl(pack.getData() + 4); From 972315959248b4b4501cd142a790634367e5cb8b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:43:29 +0200 Subject: [PATCH 7/8] Improved selectDefaultTracks function to give feedback on selection changes. --- src/output/output.cpp | 29 ++++++++++++++++++++++++++--- src/output/output.h | 2 +- src/output/output_json.cpp | 6 ++++++ src/output/output_rtmp.cpp | 7 ++----- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 20d94182..649679cf 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -255,16 +255,31 @@ namespace Mist{ } } - void Output::selectDefaultTracks(){ + /// Automatically selects the tracks that are possible and/or wanted. + /// Returns true if the track selection changed in any way. + bool Output::selectDefaultTracks(){ if (!isInitialized){ initialize(); - if (!isInitialized){return;} + if (!isInitialized){return false;} } + + //First, back up and wipe the existing selections, if any. + std::set oldSel = selectedTracks; + selectedTracks.clear(); + + bool autoSeek = buffer.size(); + uint64_t seekTarget = currentTime(); + //check which tracks don't actually exist std::set toRemove; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ if (!myMeta.tracks.count(*it)){ toRemove.insert(*it); + continue; + } + //autoSeeking and target not in bounds? Drop it too. + if (autoSeek && myMeta.tracks[*it].lastms < seekTarget - 6000){ + toRemove.insert(*it); } } //remove those from selectedtracks @@ -344,6 +359,7 @@ namespace Mist{ if (myMeta.live){ for (std::map::reverse_iterator trit = myMeta.tracks.rbegin(); trit != myMeta.tracks.rend(); trit++){ if ((!byType && trit->second.codec == strRef.substr(shift)) || (byType && trit->second.type == strRef.substr(shift)) || strRef.substr(shift) == "*"){ + if (autoSeek && trit->second.lastms < seekTarget - 6000){continue;} selectedTracks.insert(trit->first); found = true; if (!multiSel){break;} @@ -352,6 +368,7 @@ namespace Mist{ }else{ for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ if ((!byType && trit->second.codec == strRef.substr(shift)) || (byType && trit->second.type == strRef.substr(shift)) || strRef.substr(shift) == "*"){ + if (autoSeek && trit->second.lastms < seekTarget - 6000){continue;} selectedTracks.insert(trit->first); found = true; if (!multiSel){break;} @@ -381,6 +398,12 @@ namespace Mist{ if (!selectedTracks.size() && myMeta.tracks.size() && capa["codecs"][bestSoFar].size()){ WARN_MSG("No tracks selected (%u total) for stream %s!", myMeta.tracks.size(), streamName.c_str()); } + bool madeChange = (oldSel != selectedTracks); + if (autoSeek && madeChange){ + INFO_MSG("Automatically seeking to position %llu to resume playback", seekTarget); + seek(seekTarget); + } + return madeChange; } /// Clears the buffer, sets parseData to false, and generally makes not very much happen at all. @@ -825,7 +848,7 @@ namespace Mist{ if (buffer.size() != selectedTracks.size()){ std::set dropTracks; if (buffer.size() < selectedTracks.size()){ - //prepare to drop any selectedTrack without buffe entry + //prepare to drop any selectedTrack without buffer entry for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ bool found = false; for (std::set::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){ diff --git a/src/output/output.h b/src/output/output.h index d58cdfae..4db104be 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -51,7 +51,7 @@ namespace Mist { uint64_t endTime(); void setBlocking(bool blocking); void updateMeta(); - void selectDefaultTracks(); + bool selectDefaultTracks(); bool connectToFile(std::string file); static bool listenMode(){return true;} uint32_t currTrackCount() const; diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index ca7b682c..610619b1 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -28,6 +28,12 @@ namespace Mist { } void OutJSON::sendNext(){ + if (keepReselecting){ + //If we can select more tracks, do it and continue. + if (selectDefaultTracks()){ + return;//After a seek, the current packet is invalid. Do nothing and return here. + } + } JSON::Value jPack = thisPacket.toJSON(); if (dupcheck){ if (jPack.compareExcept(lastVal, nodup)){ diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index b0115571..27405122 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -160,12 +160,9 @@ namespace Mist { lastMeta = Util::epoch(); updateMeta(); if (myMeta.tracks.size() > 1){ - size_t prevTrackCount = selectedTracks.size(); - selectDefaultTracks(); - if (selectedTracks.size() > prevTrackCount){ - INFO_MSG("Picked up new track - selecting it and resetting state."); + if (selectDefaultTracks()){ + INFO_MSG("Track selection changed - resending headers and continuing"); sentHeader = false; - initialSeek(); return; } } From 67cba61ed72cc9583eb7557584ee9d503a8785b9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 30 Jun 2018 18:51:54 +0200 Subject: [PATCH 8/8] Edited JSON codec format --- src/output/output_json.cpp | 34 +++++++++++++++++++--------------- src/output/output_json.h | 2 ++ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index 610619b1..2a8d800f 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -34,7 +34,17 @@ namespace Mist { return;//After a seek, the current packet is invalid. Do nothing and return here. } } - JSON::Value jPack = thisPacket.toJSON(); + JSON::Value jPack; + if (myMeta.tracks[thisPacket.getTrackId()].codec == "JSON"){ + char * dPtr; + unsigned int dLen; + thisPacket.getString("data", dPtr, dLen); + jPack["data"] = JSON::fromString(dPtr, dLen); + jPack["time"] = (long long)thisPacket.getTime(); + jPack["track"] = (long long)thisPacket.getTrackId(); + }else{ + jPack = thisPacket.toJSON(); + } if (dupcheck){ if (jPack.compareExcept(lastVal, nodup)){ return;//skip duplicates @@ -165,20 +175,17 @@ namespace Mist { myMeta.tracks[pushTrack].type = "meta"; myMeta.tracks[pushTrack].codec = "JSON"; //We have a track set correctly. Let's attempt to buffer a frame. - inJSON["trackid"] = (long long)pushTrack; - inJSON["datatype"] = "meta"; lastSendTime = Util::bootMS(); if (!inJSON.isMember("unix")){ //Base timestamp on arrival time - inJSON["time"] = (long long)(lastSendTime - bootMsOffset); + lastOutTime = (lastSendTime - bootMsOffset); }else{ //Base timestamp on unix time - inJSON["time"] = (long long)((lastSendTime - bootMsOffset) + (Util::epoch() - Util::bootSecs()) * 1000); + lastOutTime = (lastSendTime - bootMsOffset) + (inJSON["unix"].asInt() - Util::epoch()) * 1000; } - inJSON["bmo"] = (long long)bootMsOffset; - lastVal = inJSON; - std::string packedJson = inJSON.toNetPacked(); - DTSC::Packet newPack(packedJson.data(), packedJson.size(), true); + lastOutData = inJSON.toString(); + static DTSC::Packet newPack; + newPack.genericFill(lastOutTime, 0, pushTrack, lastOutData.data(), lastOutData.size(), 0, true, bootMsOffset); bufferLivePacket(newPack); if (!idleInterval){idleInterval = 100;} if (isBlocking){setBlocking(false);} @@ -193,13 +200,10 @@ namespace Mist { } return; } - lastVal["time"] = (long long)(lastVal["time"].asInt() + (Util::bootMS() - lastSendTime)); + lastOutTime += (Util::bootMS() - lastSendTime); lastSendTime = Util::bootMS(); - lastVal.netPrepare(); - std::string packedJson = lastVal.toNetPacked(); - DTSC::Packet newPack(packedJson.data(), packedJson.size(), true); - myMeta.tracks[pushTrack].type = "meta"; - myMeta.tracks[pushTrack].codec = "JSON"; + static DTSC::Packet newPack; + newPack.genericFill(lastOutTime, 0, pushTrack, lastOutData.data(), lastOutData.size(), 0, true, bootMsOffset); bufferLivePacket(newPack); } diff --git a/src/output/output_json.h b/src/output/output_json.h index 3a290fc6..6010d2c5 100644 --- a/src/output/output_json.h +++ b/src/output/output_json.h @@ -18,6 +18,8 @@ namespace Mist { bool doesWebsockets(){return true;} protected: JSON::Value lastVal; + std::string lastOutData; + uint64_t lastOutTime; uint64_t lastSendTime; bool keepReselecting; std::string jsonp;