diff --git a/lib/defines.h b/lib/defines.h index 9f26a974..8a736198 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -71,11 +71,8 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define INPUT_TIMEOUT STATS_DELAY #endif -/// The size used for stream header pages under Windows, where they cannot be size-detected. -#define DEFAULT_META_PAGE_SIZE 16 * 1024 * 1024 - -/// The size used for stream header pages under Windows, where they cannot be size-detected. -#define DEFAULT_STRM_PAGE_SIZE 4 * 1024 * 1024 +/// The size used for stream headers for live streams +#define DEFAULT_STRM_PAGE_SIZE 16 * 1024 * 1024 /// The size used for stream data pages under Windows, where they cannot be size-detected. #define DEFAULT_DATA_PAGE_SIZE SHM_DATASIZE * 1024 * 1024 diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 9a4d2069..152c028e 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -135,7 +135,7 @@ bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket return true; } } - if (Request["authorize"]["password"].asString() != "" && Secure::md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){ + if (Request["authorize"]["password"].asString() != ""){ Log("AUTH", "Failed login attempt " + UserID + " from " + conn.getHost()); } } diff --git a/src/input/input.cpp b/src/input/input.cpp index 947bd513..81af744d 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -97,7 +97,8 @@ namespace Mist { INSANE_MSG("No header exists to compare - ignoring header check"); return; } - if (bufHeader.st_mtime < bufStream.st_mtime) { + //the same second is not enough - add a 15 second window where we consider it too old + if (bufHeader.st_mtime < bufStream.st_mtime + 15) { INFO_MSG("Overwriting outdated DTSH header file: %s ", headerFile.c_str()); remove(headerFile.c_str()); } @@ -180,12 +181,6 @@ namespace Mist { /// streamname /// input name /// ~~~~~~~~~~~~~~~ - /// The `"STREAM_UNLOAD"` trigger is stream-specific, and is ran right before an input shuts down and stops serving a stream. If cancelled, the shut down is delayed. Its payload is: - /// ~~~~~~~~~~~~~~~ - /// streamname - /// input name - /// ~~~~~~~~~~~~~~~ - // void Input::serve(){ if (!isBuffer) { for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { @@ -205,11 +200,15 @@ namespace Mist { /*LTS-END*/ DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); - - long long int activityCounter = Util::bootSecs(); - while (((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)) && config->is_active) { //15 second timeout + activityCounter = Util::bootSecs(); + //main serve loop + while (keepRunning()) { + //load pages for connected clients on request + //through the callbackWrapper function userPage.parseEach(callbackWrapper); + //unload pages that haven't been used for a while removeUnused(); + //If users are connected and tracks exist, reset the activity counter if (userPage.connectedUsers) { if (myMeta.tracks.size()){ activityCounter = Util::bootSecs(); @@ -218,17 +217,7 @@ namespace Mist { } else { DEBUG_MSG(DLVL_INSANE, "Timer running"); } - /*LTS-START*/ - if ((Util::bootSecs() - activityCounter) >= INPUT_TIMEOUT || !config->is_active){//15 second timeout - if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){ - std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; - if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){ - activityCounter = Util::bootSecs(); - config->is_active = true; - } - } - } - /*LTS-END*/ + //if not shutting down, wait 1 second before looping if (config->is_active){ Util::wait(1000); } @@ -238,6 +227,14 @@ namespace Mist { //end player functionality } + bool Input::keepRunning(){ + //We keep running in serve mode if the config is still active AND either + // - INPUT_TIMEOUT seconds haven't passed yet, + // - this is a live stream and at least two of the biggest fragment haven't passed yet, + bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500))); + return ret; + } + /// Main loop for stream-style inputs. /// This loop will start the buffer without resume support, and then repeatedly call ..... followed by .... void Input::stream(){ diff --git a/src/input/input.h b/src/input/input.h index 8e0c3887..66580bb1 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -34,6 +34,7 @@ namespace Mist { virtual void getNext(bool smart = true) {}; virtual void seek(int seekTime){}; virtual void finish(); + virtual bool keepRunning(); virtual bool openStreamSource() { return false; }; virtual void closeStreamSource() {}; virtual void parseStreamHeader() {}; @@ -60,6 +61,7 @@ namespace Mist { unsigned int benchMark; bool isBuffer; + uint64_t activityCounter; JSON::Value capa; diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 1065f6ef..9d8f6557 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -5,6 +5,9 @@ #include #include #include +#include //for stat +#include //for stat +#include //for stat #include #include #include @@ -46,9 +49,30 @@ namespace Mist { if (!inFile) { return false; } + struct stat statData; + lastModTime = 0; + if (stat(config->getString("input").c_str(), &statData) != -1){ + lastModTime = statData.st_mtime; + } return true; } + /// Overrides the default keepRunning function to shut down + /// if the file disappears or changes, by polling the file's mtime. + /// If neither applies, calls the original function. + bool inputFLV::keepRunning(){ + struct stat statData; + if (stat(config->getString("input").c_str(), &statData) == -1){ + INFO_MSG("Shutting down because input file disappeared"); + return false; + } + if (lastModTime != statData.st_mtime){ + INFO_MSG("Shutting down because input file changed"); + return false; + } + return Input::keepRunning(); + } + bool inputFLV::readHeader() { if (!inFile){return false;} //See whether a separate header file exists. diff --git a/src/input/input_flv.h b/src/input/input_flv.h index 66b85861..d2c438f1 100644 --- a/src/input/input_flv.h +++ b/src/input/input_flv.h @@ -13,7 +13,9 @@ namespace Mist { void getNext(bool smart = true); void seek(int seekTime); void trackSelect(std::string trackSpec); + bool keepRunning(); FLV::Tag tmpTag; + uint64_t lastModTime; FILE * inFile; }; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 8ca7645e..dafcd81d 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -24,7 +24,7 @@ #define MIN_DELAY 2500 #endif -namespace Mist { +namespace Mist{ JSON::Value Output::capa = JSON::Value(); int getDTSCLen(char * mapped, long long int offset){ @@ -42,7 +42,7 @@ namespace Mist { capa["optional"]["debug"]["type"] = "debug"; } - Output::Output(Socket::Connection & conn) : myConn(conn) { + Output::Output(Socket::Connection & conn) : myConn(conn){ firstTime = 0; crc = getpid(); parseData = false; @@ -263,7 +263,7 @@ namespace Mist { return myConn.getBinHost(); } - bool Output::isReadyForPlay() { + bool Output::isReadyForPlay(){ if (myMeta.tracks.size()){ if (!selectedTracks.size()){ selectDefaultTracks(); @@ -358,14 +358,14 @@ namespace Mist { unsigned int bestSoFar = 0; unsigned int bestSoFarCount = 0; unsigned int index = 0; - jsonForEach(capa["codecs"], it) { + jsonForEach(capa["codecs"], it){ unsigned int genCounter = 0; unsigned int selCounter = 0; if ((*it).size() > 0){ - jsonForEach((*it), itb) { + jsonForEach((*it), itb){ if ((*itb).size() > 0){ bool found = false; - jsonForEach(*itb, itc) { + jsonForEach(*itb, itc){ for (std::set::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){ if (myMeta.tracks[*itd].codec == (*itc).asStringRef()){ selCounter++; @@ -403,11 +403,11 @@ namespace Mist { MEDIUM_MSG("Trying to fill: %s", capa["codecs"][bestSoFar].toString().c_str()); //try to fill as many codecs simultaneously as possible if (capa["codecs"][bestSoFar].size() > 0){ - jsonForEach(capa["codecs"][bestSoFar], itb) { + jsonForEach(capa["codecs"][bestSoFar], itb){ if ((*itb).size() && myMeta.tracks.size()){ bool found = false; - jsonForEach((*itb), itc) { - if (found) { + jsonForEach((*itb), itc){ + if (found){ break; } for (std::set::iterator itd = selectedTracks.begin(); itd != selectedTracks.end(); itd++){ @@ -446,13 +446,13 @@ namespace Mist { DEBUG_MSG(DLVL_MEDIUM, "Selected tracks: %s (%lu)", selected.str().c_str(), selectedTracks.size()); } - if (selectedTracks.size() == 0) { + if (selectedTracks.size() == 0){ INSANE_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0."); for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ INSANE_MSG("Found track/codec: %s", trit->second.codec.c_str()); } static std::string source; - if (!source.size()){ + if (!myMeta.tracks.size() && !source.size()){ IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.wait(); @@ -796,7 +796,7 @@ namespace Mist { /// output handler name /// request URL (if any) /// ~~~~~~~~~~~~~~~ - int Output::run() { + int Output::run(){ /*LTS-START*/ if(Triggers::shouldTrigger("CONN_OPEN", streamName)){ std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; @@ -828,7 +828,7 @@ namespace Mist { //slow down processing, if real time speed is wanted if (realTime){ uint8_t i = 6; - while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn) { + while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){ Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu)); stats(); } @@ -1082,16 +1082,26 @@ namespace Mist { //if there's a timestamp mismatch, print this. //except for live, where we never know the time in advance - if (thisPacket.getTime() != nxt.time && nxt.time && !atLivePoint){ - static int warned = 0; - if (warned < 5){ - WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); - if (++warned == 5){ - WARN_MSG("Further warnings about time mismatches printed on HIGH level."); + if (thisPacket.getTime() != nxt.time && nxt.time){ + if (!atLivePoint){ + static int warned = 0; + if (warned < 5){ + WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), + thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), + myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); + if (++warned == 5){WARN_MSG("Further warnings about time mismatches printed on HIGH level.");} + }else{ + HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), + thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), + myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); } - }else{ - HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); } + nxt.time = thisPacket.getTime(); + //swap out the next object in the buffer with a new one + buffer.erase(buffer.begin()); + buffer.insert(nxt); + VERYHIGH_MSG("JIT reordering %u@%llu.", nxt.tid, nxt.time); + return false; } //when live, every keyframe, check correctness of the keyframe number @@ -1232,17 +1242,17 @@ namespace Mist { sentHeader = true; } - bool Output::connectToFile(std::string file) { + bool Output::connectToFile(std::string file){ int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; int mode = O_RDWR | O_CREAT | O_TRUNC; int outFile = open(file.c_str(), mode, flags); - if (outFile < 0) { + if (outFile < 0){ ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno)); return false; } int r = dup2(outFile, myConn.getSocket()); - if (r == -1) { + if (r == -1){ ERROR_MSG("Failed to create an alias for the socket using dup2: %s.", strerror(errno)); return false; } diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index 7ccd7f73..b85b512e 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -259,6 +259,7 @@ namespace Mist { OutHLS::OutHLS(Socket::Connection & conn) : TSOutput(conn) { realTime = 0; + until=0xFFFFFFFFFFFFFFFFull; } OutHLS::~OutHLS() {} @@ -474,6 +475,37 @@ namespace Mist { } } + void OutHLS::sendNext(){ + //First check if we need to stop. + if (thisPacket.getTime() >= until){ + stop(); + wantRequest = true; + parseData = false; + + //Ensure alignment of contCounters for selected tracks, to prevent discontinuities. + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ + DTSC::Track & Trk = myMeta.tracks[*it]; + uint32_t pkgPid = 255 + *it; + int & contPkg = contCounters[pkgPid]; + if (contPkg % 16 != 0){ + packData.clear(); + packData.setPID(pkgPid); + packData.addStuffing(); + while (contPkg % 16 != 0){ + packData.setContinuityCounter(++contPkg); + sendTS(packData.checkAndGetBuffer()); + } + packData.clear(); + } + } + + //Signal end of data + H.Chunkify("", 0, myConn); + return; + } + //Invoke the generic TS output sendNext handler + TSOutput::sendNext(); + } void OutHLS::sendTS(const char * tsData, unsigned int len) { H.Chunkify(tsData, len, myConn); diff --git a/src/output/output_hls.h b/src/output/output_hls.h index 2fb14e36..2ddd123b 100644 --- a/src/output/output_hls.h +++ b/src/output/output_hls.h @@ -8,6 +8,7 @@ namespace Mist { ~OutHLS(); static void init(Util::Config * cfg); void sendTS(const char * tsData, unsigned int len=188); + void sendNext(); void onHTTP(); bool isReadyForPlay(); protected: @@ -25,6 +26,7 @@ namespace Mist { int keysToSend; unsigned int vidTrack; unsigned int audTrack; + long long unsigned int until; }; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index e79d6cea..a69d0be5 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -1144,6 +1144,7 @@ namespace Mist { case 9: //video data case 18: {//meta data static std::map pushMeta; + static uint64_t lastTagTime = 0; if (!isInitialized) { MEDIUM_MSG("Received useless media data"); onFinish(); @@ -1161,7 +1162,23 @@ namespace Mist { unsigned int reTrack = next.cs_id*3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3)); F.toMeta(myMeta, *amf_storage, reTrack); if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ - thisPacket.genericFill(F.tagTime(), F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + uint64_t tagTime = next.timestamp; + //Check for decreasing timestamps - this is a connection error. + //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. + /// \TODO Provide time continuity for wrap-around. + if (lastTagTime && tagTime < lastTagTime && lastTagTime < 0xFF000000ull){ + FAIL_MSG("Timestamps went from %llu to %llu (decreased): disconnecting!", lastTagTime, tagTime); + onFinish(); + break; + } + //Check if we went more than 10 minutes into the future + if (lastTagTime && tagTime > lastTagTime + 600000){ + FAIL_MSG("Timestamps went from %llu to %llu (> 10m in future): disconnecting!", lastTagTime, tagTime); + onFinish(); + break; + } + thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + lastTagTime = tagTime; if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 7705b5d5..b520000a 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -6,7 +6,6 @@ namespace Mist { haveAvcc = false; haveHvcc = false; ts_from = 0; - until=0xFFFFFFFFFFFFFFFFull; setBlocking(true); sendRepeatingHeaders = false; appleCompat=false; @@ -31,12 +30,11 @@ namespace Mist { if (!dataLen){return;} if (packData.getBytesFree() == 184){ - packData.clear(); + packData.clear(); packData.setPID(pkgPid); packData.setContinuityCounter(++contPkg); if (firstPack){ packData.setUnitStart(1); - packData.setDiscontinuity(true); if (video){ if (keyframe){ packData.setRandomAccess(true); @@ -69,13 +67,6 @@ namespace Mist { char * dataPointer = 0; unsigned int dataLen = 0; thisPacket.getString("data", dataPointer, dataLen); //data - if (packTime >= until){ //this if should only trigger for HLS - stop(); - wantRequest = true; - parseData = false; - sendTS("",0); - return; - } //apple compatibility timestamp correction if (appleCompat){ packTime -= ts_from; diff --git a/src/output/output_ts_base.h b/src/output/output_ts_base.h index 906f36cc..e9f95339 100644 --- a/src/output/output_ts_base.h +++ b/src/output/output_ts_base.h @@ -14,7 +14,7 @@ namespace Mist { public: TSOutput(Socket::Connection & conn); virtual ~TSOutput(){}; - void sendNext(); + virtual void sendNext(); virtual void sendTS(const char * tsData, unsigned int len=188){}; void fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg); protected: @@ -33,7 +33,6 @@ namespace Mist { /*LTS-END*/ bool sendRepeatingHeaders; long long unsigned int ts_from; - long long unsigned int until; long long unsigned int lastVid; }; }