From f69fe8a4fb48b3a1ca9c97a9bab33e3e6c28dc0f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 24 Oct 2016 17:35:23 +0200 Subject: [PATCH 1/8] Moved HLS-specific TS output handling from output_ts_base to output_hls --- src/output/output_hls.cpp | 14 ++++++++++++++ src/output/output_hls.h | 2 ++ src/output/output_ts_base.cpp | 10 +--------- src/output/output_ts_base.h | 3 +-- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index f25983f8..143e4c1c 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -108,6 +108,7 @@ namespace Mist { OutHLS::OutHLS(Socket::Connection & conn) : TSOutput(conn){ realTime = 0; + until=0xFFFFFFFFFFFFFFFFull; } OutHLS::~OutHLS() {} @@ -270,6 +271,19 @@ namespace Mist { } } + void OutHLS::sendNext(){ + //First check if we need to stop. + if (thisPacket.getTime() >= until){ + stop(); + wantRequest = true; + parseData = false; + //Ensure alignment of contCounters for selected tracks, to prevent discontinuities. + H.Chunkify("", 0, myConn); + return; + } + //Invoke the generic TS output sendNext handler + TSOutput::sendNext(); + } void OutHLS::sendTS(const char * tsData, unsigned int len){ H.Chunkify(tsData, len, myConn); diff --git a/src/output/output_hls.h b/src/output/output_hls.h index 7cbff9b9..903baf7d 100644 --- a/src/output/output_hls.h +++ b/src/output/output_hls.h @@ -8,6 +8,7 @@ namespace Mist { ~OutHLS(); static void init(Util::Config * cfg); void sendTS(const char * tsData, unsigned int len=188); + void sendNext(); void onHTTP(); bool isReadyForPlay(); protected: @@ -18,6 +19,7 @@ namespace Mist { int keysToSend; unsigned int vidTrack; unsigned int audTrack; + long long unsigned int until; }; } diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index a7bb5d00..78c9dce6 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -5,7 +5,6 @@ namespace Mist { packCounter=0; haveAvcc = false; ts_from = 0; - until=0xFFFFFFFFFFFFFFFFull; setBlocking(true); sendRepeatingHeaders = false; appleCompat=false; @@ -30,7 +29,7 @@ namespace Mist { if (!dataLen){return;} if (packData.getBytesFree() == 184){ - packData.clear(); + packData.clear(); packData.setPID(pkgPid); packData.setContinuityCounter(++contPkg); if (firstPack){ @@ -68,13 +67,6 @@ namespace Mist { char * dataPointer = 0; unsigned int dataLen = 0; thisPacket.getString("data", dataPointer, dataLen); //data - if (packTime >= until){ //this if should only trigger for HLS - stop(); - wantRequest = true; - parseData = false; - sendTS("",0); - return; - } //apple compatibility timestamp correction if (appleCompat){ packTime -= ts_from; diff --git a/src/output/output_ts_base.h b/src/output/output_ts_base.h index 6995bca5..90f17628 100644 --- a/src/output/output_ts_base.h +++ b/src/output/output_ts_base.h @@ -14,7 +14,7 @@ namespace Mist { public: TSOutput(Socket::Connection & conn); virtual ~TSOutput(){}; - void sendNext(); + virtual void sendNext(); virtual void sendTS(const char * tsData, unsigned int len=188){}; void fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg); protected: @@ -29,7 +29,6 @@ namespace Mist { bool appleCompat; bool sendRepeatingHeaders; long long unsigned int ts_from; - long long unsigned int until; long long unsigned int lastVid; }; } From 653f5cf0853afb66bd8df00304c8e03e896a0d7f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 27 Oct 2016 11:22:56 +0200 Subject: [PATCH 2/8] Removed TS-level discontinuities from HLS --- src/output/output_hls.cpp | 18 ++++++++++++++++++ src/output/output_ts_base.cpp | 1 - 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index 143e4c1c..cc06fb92 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -277,7 +277,25 @@ namespace Mist { stop(); wantRequest = true; parseData = false; + //Ensure alignment of contCounters for selected tracks, to prevent discontinuities. + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ + DTSC::Track & Trk = myMeta.tracks[*it]; + uint32_t pkgPid = 255 + *it; + int & contPkg = contCounters[pkgPid]; + if (contPkg % 16 != 0){ + packData.clear(); + packData.setPID(pkgPid); + packData.addStuffing(); + while (contPkg % 16 != 0){ + packData.setContinuityCounter(++contPkg); + sendTS(packData.checkAndGetBuffer()); + } + packData.clear(); + } + } + + //Signal end of data H.Chunkify("", 0, myConn); return; } diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 78c9dce6..c1669963 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -34,7 +34,6 @@ namespace Mist { packData.setContinuityCounter(++contPkg); if (firstPack){ packData.setUnitStart(1); - packData.setDiscontinuity(true); if (video){ if (keyframe){ packData.setRandomAccess(true); From 56c1d1e3a12e0be18ccf387ed3d5202aedd4728a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 12 Oct 2016 00:48:34 +0200 Subject: [PATCH 3/8] RTMP push security improvements --- src/output/output_rtmp.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 66737e6e..3cb81b3e 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -927,6 +927,7 @@ namespace Mist { case 9: //video data case 18: {//meta data static std::map pushMeta; + static uint64_t lastTagTime = 0; if (!isInitialized) { MEDIUM_MSG("Received useless media data"); onFinish(); @@ -944,7 +945,23 @@ namespace Mist { unsigned int reTrack = next.cs_id*3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3)); F.toMeta(myMeta, *amf_storage, reTrack); if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ - thisPacket.genericFill(F.tagTime(), F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + uint64_t tagTime = next.timestamp; + //Check for decreasing timestamps - this is a connection error. + //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. + /// \TODO Provide time continuity for wrap-around. + if (lastTagTime && tagTime < lastTagTime && lastTagTime < 0xFF000000ull){ + FAIL_MSG("Timestamps went from %llu to %llu (decreased): disconnecting!", lastTagTime, tagTime); + onFinish(); + break; + } + //Check if we went more than 10 minutes into the future + if (lastTagTime && tagTime > lastTagTime + 600000){ + FAIL_MSG("Timestamps went from %llu to %llu (> 10m in future): disconnecting!", lastTagTime, tagTime); + onFinish(); + break; + } + thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + lastTagTime = tagTime; if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); From 0eef699b45343f316ac9ffc0f39b385388daeaa3 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 8 Nov 2016 15:08:49 +0100 Subject: [PATCH 4/8] Only retrieve stream source if no tracks available --- src/output/output.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 08d4cad5..b79977cc 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -323,7 +323,7 @@ namespace Mist { INSANE_MSG("Found track/codec: %s", trit->second.codec.c_str()); } static std::string source; - if (!source.size()){ + if (!myMeta.tracks.size() && !source.size()){ IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); From 0e649b7c727629186be52ff6e3990a05e3797a22 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 8 Nov 2016 15:13:55 +0100 Subject: [PATCH 5/8] Fixed creating accounts on wrong login attempt --- src/controller/controller_api.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index f0558421..aef78505 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -125,7 +125,7 @@ bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket return true; } } - if (Request["authorize"]["password"].asString() != "" && Secure::md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){ + if (Request["authorize"]["password"].asString() != ""){ Log("AUTH", "Failed login attempt " + UserID + " from " + conn.getHost()); } } From 19b67e4551335785bc30020d5f9afc62867eceff Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 9 Nov 2016 12:38:05 +0100 Subject: [PATCH 6/8] Updated default live metadata page size to 16MiB, to allow for bigger streams --- lib/defines.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index b034336f..e9d5c40e 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -71,11 +71,8 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define INPUT_TIMEOUT STATS_DELAY #endif -/// The size used for stream header pages under Windows, where they cannot be size-detected. -#define DEFAULT_META_PAGE_SIZE 16 * 1024 * 1024 - -/// The size used for stream header pages under Windows, where they cannot be size-detected. -#define DEFAULT_STRM_PAGE_SIZE 4 * 1024 * 1024 +/// The size used for stream headers for live streams +#define DEFAULT_STRM_PAGE_SIZE 16 * 1024 * 1024 /// The size used for stream data pages under Windows, where they cannot be size-detected. #define DEFAULT_DATA_PAGE_SIZE SHM_DATASIZE * 1024 * 1024 From 94e39f83234c2ae2fe0713a18de5b46d39ac88b1 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 14 Nov 2016 10:55:58 +0100 Subject: [PATCH 7/8] Fixed JIT timestamps for live --- src/output/output.cpp | 56 +++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index b79977cc..8e793cc9 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -17,7 +17,7 @@ #define MIN_DELAY 2500 #endif -namespace Mist { +namespace Mist{ JSON::Value Output::capa = JSON::Value(); int getDTSCLen(char * mapped, long long int offset){ @@ -35,7 +35,7 @@ namespace Mist { capa["optional"]["debug"]["type"] = "debug"; } - Output::Output(Socket::Connection & conn) : myConn(conn) { + Output::Output(Socket::Connection & conn) : myConn(conn){ firstTime = 0; crc = getpid(); parseData = false; @@ -135,7 +135,7 @@ namespace Mist { return myConn.getBinHost(); } - bool Output::isReadyForPlay() { + bool Output::isReadyForPlay(){ if (myMeta.tracks.size()){ if (!selectedTracks.size()){ selectDefaultTracks(); @@ -229,14 +229,14 @@ namespace Mist { unsigned int bestSoFar = 0; unsigned int bestSoFarCount = 0; unsigned int index = 0; - jsonForEach(capa["codecs"], it) { + jsonForEach(capa["codecs"], it){ unsigned int genCounter = 0; unsigned int selCounter = 0; if ((*it).size() > 0){ - jsonForEach((*it), itb) { + jsonForEach((*it), itb){ if ((*itb).size() > 0){ bool found = false; - jsonForEach(*itb, itc) { + jsonForEach(*itb, itc){ for (std::set::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){ if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){ selCounter++; @@ -274,11 +274,11 @@ namespace Mist { MEDIUM_MSG("Trying to fill: %s", capa["codecs"][bestSoFar].toString().c_str()); //try to fill as many codecs simultaneously as possible if (capa["codecs"][bestSoFar].size() > 0){ - jsonForEach(capa["codecs"][bestSoFar], itb) { + jsonForEach(capa["codecs"][bestSoFar], itb){ if ((*itb).size() && myMeta.tracks.size()){ bool found = false; - jsonForEach((*itb), itc) { - if (found) { + jsonForEach((*itb), itc){ + if (found){ break; } for (std::set::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){ @@ -317,7 +317,7 @@ namespace Mist { DEBUG_MSG(DLVL_MEDIUM, "Selected tracks: %s (%lu)", selected.str().c_str(), selectedTracks.size()); } - if (selectedTracks.size() == 0) { + if (selectedTracks.size() == 0){ INSANE_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0."); for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ INSANE_MSG("Found track/codec: %s", trit->second.codec.c_str()); @@ -652,7 +652,7 @@ namespace Mist { } } - int Output::run() { + int Output::run(){ DONTEVEN_MSG("MistOut client handler started"); while (config->is_active && myConn && (wantRequest || parseData)){ if (wantRequest){ @@ -676,7 +676,7 @@ namespace Mist { //slow down processing, if real time speed is wanted if (realTime){ uint8_t i = 6; - while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn) { + while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){ Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu)); stats(); } @@ -917,16 +917,26 @@ namespace Mist { //if there's a timestamp mismatch, print this. //except for live, where we never know the time in advance - if (thisPacket.getTime() != nxt.time && nxt.time && !atLivePoint){ - static int warned = 0; - if (warned < 5){ - WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); - if (++warned == 5){ - WARN_MSG("Further warnings about time mismatches printed on HIGH level."); + if (thisPacket.getTime() != nxt.time && nxt.time){ + if (!atLivePoint){ + static int warned = 0; + if (warned < 5){ + WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), + thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), + myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); + if (++warned == 5){WARN_MSG("Further warnings about time mismatches printed on HIGH level.");} + }else{ + HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), + thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), + myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); } - }else{ - HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); } + nxt.time = thisPacket.getTime(); + //swap out the next object in the buffer with a new one + buffer.erase(buffer.begin()); + buffer.insert(nxt); + VERYHIGH_MSG("JIT reordering %u@%llu.", nxt.tid, nxt.time); + return false; } //when live, every keyframe, check correctness of the keyframe number @@ -1059,17 +1069,17 @@ namespace Mist { sentHeader = true; } - bool Output::connectToFile(std::string file) { + bool Output::connectToFile(std::string file){ int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; int mode = O_RDWR | O_CREAT | O_TRUNC; int outFile = open(file.c_str(), mode, flags); - if (outFile < 0) { + if (outFile < 0){ ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno)); return false; } int r = dup2(outFile, myConn.getSocket()); - if (r == -1) { + if (r == -1){ ERROR_MSG("Failed to create an alias for the socket using dup2: %s.", strerror(errno)); return false; } From 8d83a203bebf1756b92eeefc499cab1145cfb06c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 14 Nov 2016 11:01:00 +0100 Subject: [PATCH 8/8] Added new generalized input keepRunning() function, FLV input now shuts down if the file is updated file it is active, added 15 second DTSH regeneration window --- src/input/input.cpp | 26 ++++++++++++++++++++------ src/input/input.h | 2 ++ src/input/input_flv.cpp | 24 ++++++++++++++++++++++++ src/input/input_flv.h | 2 ++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index 91df1c0c..8a2ff609 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -81,7 +81,8 @@ namespace Mist { INSANE_MSG("No header exists to compare - ignoring header check"); return; } - if (bufHeader.st_mtime < bufStream.st_mtime) { + //the same second is not enough - add a 15 second window where we consider it too old + if (bufHeader.st_mtime < bufStream.st_mtime + 15) { INFO_MSG("Overwriting outdated DTSH header file: %s ", headerFile.c_str()); remove(headerFile.c_str()); } @@ -166,13 +167,17 @@ namespace Mist { char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); userPage.init(userPageName, PLAY_EX_SIZE, true); - - DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str()); - - long long int activityCounter = Util::bootSecs(); - while (((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)) && config->is_active) { //15 second timeout + + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); + activityCounter = Util::bootSecs(); + //main serve loop + while (keepRunning()) { + //load pages for connected clients on request + //through the callbackWrapper function userPage.parseEach(callbackWrapper); + //unload pages that haven't been used for a while removeUnused(); + //If users are connected and tracks exist, reset the activity counter if (userPage.connectedUsers) { if (myMeta.tracks.size()){ activityCounter = Util::bootSecs(); @@ -181,6 +186,7 @@ namespace Mist { }else{ DEBUG_MSG(DLVL_INSANE, "Timer running"); } + //if not shutting down, wait 1 second before looping if (config->is_active){ Util::wait(1000); } @@ -190,6 +196,14 @@ namespace Mist { //end player functionality } + bool Input::keepRunning(){ + //We keep running in serve mode if the config is still active AND either + // - INPUT_TIMEOUT seconds haven't passed yet, + // - this is a live stream and at least two of the biggest fragment haven't passed yet, + bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500))); + return ret; + } + /// Main loop for stream-style inputs. /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by .... void Input::stream(){ diff --git a/src/input/input.h b/src/input/input.h index 996f4261..88d8ddd2 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -34,6 +34,7 @@ namespace Mist { virtual void getNext(bool smart = true) {}; virtual void seek(int seekTime){}; virtual void finish(); + virtual bool keepRunning(); virtual bool openStreamSource() { return false; }; virtual void closeStreamSource() {}; virtual void parseStreamHeader() {}; @@ -60,6 +61,7 @@ namespace Mist { unsigned int benchMark; bool isBuffer; + uint64_t activityCounter; JSON::Value capa; diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 1065f6ef..9d8f6557 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -5,6 +5,9 @@ #include #include #include +#include //for stat +#include //for stat +#include //for stat #include #include #include @@ -46,9 +49,30 @@ namespace Mist { if (!inFile) { return false; } + struct stat statData; + lastModTime = 0; + if (stat(config->getString("input").c_str(), &statData) != -1){ + lastModTime = statData.st_mtime; + } return true; } + /// Overrides the default keepRunning function to shut down + /// if the file disappears or changes, by polling the file's mtime. + /// If neither applies, calls the original function. + bool inputFLV::keepRunning(){ + struct stat statData; + if (stat(config->getString("input").c_str(), &statData) == -1){ + INFO_MSG("Shutting down because input file disappeared"); + return false; + } + if (lastModTime != statData.st_mtime){ + INFO_MSG("Shutting down because input file changed"); + return false; + } + return Input::keepRunning(); + } + bool inputFLV::readHeader() { if (!inFile){return false;} //See whether a separate header file exists. diff --git a/src/input/input_flv.h b/src/input/input_flv.h index 66b85861..d2c438f1 100644 --- a/src/input/input_flv.h +++ b/src/input/input_flv.h @@ -13,7 +13,9 @@ namespace Mist { void getNext(bool smart = true); void seek(int seekTime); void trackSelect(std::string trackSpec); + bool keepRunning(); FLV::Tag tmpTag; + uint64_t lastModTime; FILE * inFile; }; }