From a5a9facc224df2cc4f8cdb352e7f99fd3feb1090 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Tue, 10 May 2016 14:12:58 +0200 Subject: [PATCH] DTSC Pull optimizes and quick-negotiate. --- lib/defines.h | 9 ++ lib/dtscmeta.cpp | 4 +- lib/shared_memory.cpp | 107 ++++++++++------------- lib/shared_memory.h | 5 +- src/controller/controller_statistics.cpp | 6 +- src/input/input.cpp | 47 ++++------ src/input/input_buffer.cpp | 4 +- src/io.cpp | 4 +- src/output/output.cpp | 79 ++++++++++++----- src/output/output.h | 2 +- src/output/output_dtsc.cpp | 34 +------ src/output/output_hls.cpp | 32 +++---- src/output/output_hls.h | 1 + src/output/output_progressive_mp4.cpp | 20 +---- src/output/output_rtmp.cpp | 14 ++- src/output/output_rtmp.h | 2 +- 16 files changed, 159 insertions(+), 211 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index 18c2ab25..74eb16e2 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -62,6 +62,15 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define SHM_DATASIZE 25 #endif + +#ifndef STATS_DELAY +#define STATS_DELAY 15 +#endif + +#ifndef INPUT_TIMEOUT +#define INPUT_TIMEOUT STATS_DELAY +#endif + /// The size used for stream header pages under Windows, where they cannot be size-detected. #define DEFAULT_META_PAGE_SIZE 16 * 1024 * 1024 diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 64cf84ed..1a548f68 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -127,10 +127,10 @@ namespace DTSC { return; } if(!src.spool()){ - if (sleepCount++ > 5){ + if (sleepCount++ > 60){ return; } - Util::sleep(500); + Util::sleep(100); } } } diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 1ed06b12..72352393 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -772,6 +772,7 @@ namespace IPC { ///\brief The deconstructor sharedServer::~sharedServer() { + finishEach(); mySemaphore.close(); mySemaphore.unlink(); } @@ -828,6 +829,23 @@ namespace IPC { return false; } + ///Disconnect all connected users + void sharedServer::finishEach(){ + if (!hasCounter){ + return; + } + for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { + if (!it->mapped || !it->len) { + break; + } + unsigned int offset = 0; + while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { + it->mapped[offset] = 126; + offset += payLen + (hasCounter ? 1 : 0); + } + } + } + ///\brief Parse each of the possible payload pieces, and runs a callback on it if in use. void sharedServer::parseEach(void (*callback)(char * data, size_t len, unsigned int id)) { char * empty = 0; @@ -839,6 +857,7 @@ namespace IPC { unsigned int id = 0; unsigned int userCount = 0; unsigned int emptyCount = 0; + connectedUsers = 0; for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { if (!it->mapped || !it->len) { DEBUG_MSG(DLVL_FAIL, "Something went terribly wrong?"); @@ -852,28 +871,25 @@ namespace IPC { char * counter = it->mapped + offset; //increase the count if needed ++userCount; + if (*counter & 0x80){ + connectedUsers++; + } if (id >= amount) { amount = id + 1; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } unsigned short tmpPID = *((unsigned short *)(it->mapped + 1 + offset + payLen - 2)); - if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127 || *counter == 254 || *counter == 255)) { + if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127)){ WARN_MSG("process disappeared, timing out. (pid %d)", tmpPID); *counter = 126; //if process is already dead, instant timeout. } callback(it->mapped + offset + 1, payLen, id); switch (*counter) { case 127: - DEBUG_MSG(DLVL_HIGH, "Client %u requested disconnect", id); + HIGH_MSG("Client %u requested disconnect", id); break; case 126: - DEBUG_MSG(DLVL_WARN, "Client %u timed out", id); - break; - case 255: - DEBUG_MSG(DLVL_HIGH, "Client %u disconnected on request", id); - break; - case 254: - DEBUG_MSG(DLVL_WARN, "Client %u disconnect timed out", id); + HIGH_MSG("Client %u timed out", id); break; default: #ifndef NOCRASHCHECK @@ -893,7 +909,7 @@ namespace IPC { #endif break; } - if (*counter == 127 || *counter == 126 || *counter == 255 || *counter == 254) { + if (*counter == 127 || *counter == 126){ memset(it->mapped + offset + 1, 0, payLen); it->mapped[offset] = 0; } else { @@ -905,7 +921,7 @@ namespace IPC { //bring the counter down if this was the last element if (id == amount - 1) { amount = id; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } //stop, we're guaranteed no more pages are full at this point break; @@ -917,7 +933,7 @@ namespace IPC { //increase the count if needed if (id >= amount) { amount = id + 1; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } callback(it->mapped + offset, payLen, id); } else { @@ -926,7 +942,7 @@ namespace IPC { //bring the counter down if this was the last element if (id == amount - 1) { amount = id; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } //stop, we're guaranteed no more pages are full at this point if (empty) { @@ -962,12 +978,14 @@ namespace IPC { hasCounter = 0; payLen = 0; offsetOnPage = 0; + countAsViewer= true; } ///\brief Copy constructor for sharedClients ///\param rhs The client ro copy sharedClient::sharedClient(const sharedClient & rhs) { + countAsViewer = rhs.countAsViewer; baseName = rhs.baseName; payLen = rhs.payLen; hasCounter = rhs.hasCounter; @@ -988,6 +1006,7 @@ namespace IPC { ///\brief Assignment operator void sharedClient::operator =(const sharedClient & rhs) { + countAsViewer = rhs.countAsViewer; baseName = rhs.baseName; payLen = rhs.payLen; hasCounter = rhs.hasCounter; @@ -1011,6 +1030,7 @@ namespace IPC { ///\param len The size of the payload to allocate ///\param withCounter Whether or not this payload has a counter sharedClient::sharedClient(std::string name, int len, bool withCounter) : baseName("/" + name), payLen(len), offsetOnPage(-1), hasCounter(withCounter) { + countAsViewer = true; #ifdef __APPLE__ //note: O_CREAT is only needed for mac, probably mySemaphore.open(baseName.c_str(), O_RDWR | O_CREAT, 0); @@ -1072,52 +1092,6 @@ namespace IPC { } - bool sharedClient::isSingleEntry() { - semaphore tmpSem(baseName.c_str(), O_RDWR); - - if (!tmpSem) { - HIGH_MSG("Creating semaphore %s failed: %s, assuming we're alone", baseName.c_str(), strerror(errno)); - return true; - } - //Empty is used to compare for emptyness. This is not needed when the page uses a counter - char * empty = 0; - if (!hasCounter) { - empty = (char *)malloc(payLen * sizeof(char)); - if (!empty) { - HIGH_MSG("Failed to allocate %u bytes for empty payload, assuming we're not alone", payLen); - return false; - } - memset(empty, 0, payLen); - } - bool result = true; - { - semGuard tmpGuard(&tmpSem); - for (char i = 'A'; i <= 'Z'; i++) { - sharedPage tmpPage(baseName.substr(1) + i, (4096 << (i - 'A')), false, false); - if (!tmpPage.mapped) { - break; - } - int offset = 0; - while (offset + payLen + (hasCounter ? 1 : 0) <= tmpPage.len) { - //Skip our own entry - if (tmpPage.name == myPage.name && offset == offsetOnPage){ - offset += payLen + (hasCounter ? 1 : 0); - continue; - } - if (!((hasCounter && tmpPage.mapped[offset] == 0) || (!hasCounter && !memcmp(tmpPage.mapped + offset, empty, payLen)))) { - result = false; - break; - } - offset += payLen + (hasCounter ? 1 : 0); - } - } - } - if (empty) { - free(empty); - } - return result; - } - ///\brief Writes data to the shared data void sharedClient::write(char * data, int len) { if (hasCounter) { @@ -1137,7 +1111,7 @@ namespace IPC { } if (myPage.mapped) { semGuard tmpGuard(&mySemaphore); - myPage.mapped[offsetOnPage] = 127; + myPage.mapped[offsetOnPage] = 126; } } @@ -1147,13 +1121,20 @@ namespace IPC { DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element without counters"); return; } - if (myPage.mapped[offsetOnPage] < 128) { - myPage.mapped[offsetOnPage] = 1; + if ((myPage.mapped[offsetOnPage] & 0x7F) < 126) { + myPage.mapped[offsetOnPage] = (countAsViewer ? 0x81 : 0x01); } else { DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element that needs to timeout, ignoring"); } } + bool sharedClient::isAlive() { + if (!hasCounter) { + return true; + } + return (myPage.mapped[offsetOnPage] & 0x7F) < 126; + } + ///\brief Get a pointer to the data of this client char * sharedClient::getData() { if (!myPage.mapped) { diff --git a/lib/shared_memory.h b/lib/shared_memory.h index adb99c06..ba291e55 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -184,6 +184,7 @@ namespace IPC { operator bool() const; ///\brief The amount of connected clients unsigned int amount; + unsigned int connectedUsers; private: bool isInUse(unsigned int id); void newPage(); @@ -198,6 +199,7 @@ namespace IPC { semaphore mySemaphore; ///\brief Whether the payload has a counter, if so, it is added in front of the payload bool hasCounter; + void finishEach(); }; ///\brief The client part of a server/client model for shared memory. @@ -219,9 +221,10 @@ namespace IPC { void write(char * data, int len); void finish(); void keepAlive(); + bool isAlive(); char * getData(); int getCounter(); - bool isSingleEntry(); + bool countAsViewer; private: ///\brief The basename of the shared pages. std::string baseName; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 0017b3c5..56667556 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -32,10 +32,6 @@ #define COUNTABLE_BYTES 128*1024 -#ifndef STATS_DELAY -#define STATS_DELAY 15 -#endif - std::map Controller::sessions; ///< list of sessions that have statistics data available std::map Controller::connToSession; ///< Map of socket IDs to session info. @@ -595,7 +591,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ sessions[idx].update(id, tmpEx); //check validity of stats data char counter = (*(data - 1)); - if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ + if (counter == 126 || counter == 127){ //the data is no longer valid - connection has gone away, store for later sessions[idx].finish(id); connToSession.erase(id); diff --git a/src/input/input.cpp b/src/input/input.cpp index a2f924e7..b1a637cd 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -215,17 +215,19 @@ namespace Mist { DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); long long int activityCounter = Util::bootSecs(); - while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout + while ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT && config->is_active) { //15 second timeout userPage.parseEach(callbackWrapper); removeUnused(); - if (userPage.amount) { - activityCounter = Util::bootSecs(); - DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount); + if (userPage.connectedUsers) { + if (myMeta.tracks.size()){ + activityCounter = Util::bootSecs(); + } + DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.connectedUsers); } else { DEBUG_MSG(DLVL_INSANE, "Timer running"); } /*LTS-START*/ - if ((Util::bootSecs() - activityCounter) >= 10 || !config->is_active){//10 second timeout + if ((Util::bootSecs() - activityCounter) >= INPUT_TIMEOUT || !config->is_active){//15 second timeout if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){ std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n"; if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){ @@ -251,16 +253,19 @@ namespace Mist { pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!pullLock.tryWait()){ DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); return; } if (Util::streamAlive(streamName)){ pullLock.post(); pullLock.close(); + pullLock.unlink(); return; } if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer pullLock.post(); pullLock.close(); + pullLock.unlink(); return; } @@ -283,8 +288,10 @@ namespace Mist { finish(); pullLock.post(); pullLock.close(); + pullLock.unlink(); return; } + nProxy.userClient.countAsViewer = false; for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ it->second.firstms = 0; @@ -294,44 +301,20 @@ namespace Mist { getNext(); unsigned long long lastTime = Util::getMS(); unsigned long long lastActive = Util::getMS(); - while (thisPacket && config->is_active){ + while (thisPacket && config->is_active && nProxy.userClient.isAlive()){ nProxy.bufferLivePacket(thisPacket, myMeta); getNext(); nProxy.userClient.keepAlive(); - if (Util::getMS() - lastTime >= 1000){ - lastTime = Util::getMS(); - if (nProxy.userClient.isSingleEntry()){ - if (lastTime - lastActive >= 10000){//10sec timeout - config->is_active = false; - } - }else{ - lastActive = lastTime; - } - } } closeStreamSource(); - while (config->is_active){ - Util::sleep(500); - nProxy.userClient.keepAlive(); - if (Util::getMS() - lastTime >= 1000){ - lastTime = Util::getMS(); - if (nProxy.userClient.isSingleEntry()){ - if (lastTime - lastActive >= 10000){//10sec timeout - config->is_active = false; - } - }else{ - lastActive = lastTime; - } - } - } - - nProxy.userClient.finish(); finish(); pullLock.post(); pullLock.close(); + pullLock.unlink(); + DEBUG_MSG(DLVL_DEVEL, "Pull input for stream %s closing clean", streamName.c_str()); return; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 8ab05e57..f7bd8f58 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -530,7 +530,7 @@ namespace Mist { //If the current value indicates a valid trackid, and it is pushed from this user if (pushLocation[value] == data) { //Check for timeouts, and erase the track if necessary - if (counter == 126 || counter == 127 || counter == 254 || counter == 255) { + if (counter == 126 || counter == 127){ pushLocation.erase(value); if (negotiatingTracks.count(value)) { negotiatingTracks.erase(value); @@ -594,11 +594,9 @@ namespace Mist { char firstPage[NAME_BUFFER_SIZE]; snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); nProxy.metaPages[finalMap].init(firstPage, 8192, false); - INFO_MSG("Meh %d", finalMap); //Update the metadata for this track updateTrackMeta(finalMap); - INFO_MSG("Setting hasPush to true, quickNegotiate"); hasPush = true; } //Write the final mapped track number and keyframe number to the user page element diff --git a/src/io.cpp b/src/io.cpp index 595b6c6f..8da2ef71 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -621,10 +621,8 @@ namespace Mist { if (!trackState.count(tid)) { memset(tmp + offset, 0, 4); if (quickNegotiate){ - - unsigned long finalTid = getpid() + tid; + unsigned long finalTid = tid; unsigned short firstPage = 1; - INFO_MSG("HANDLING quick negotiation for track %d ~> %d", tid, finalTid) MEDIUM_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage); trackMap[tid] = finalTid; if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){ diff --git a/src/output/output.cpp b/src/output/output.cpp index a61a2147..578d1575 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -184,6 +184,16 @@ namespace Mist { return myConn.getBinHost(); } + bool Output::isReadyForPlay() { + if (myMeta.tracks.size()){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.keys.size() >= 2){ + return true; + } + } + } + return false; + } /// Connects or reconnects to the stream. /// Assumes streamName class member has been set already. /// Will start input if not currently active, calls onFail() if this does not succeed. @@ -215,27 +225,15 @@ namespace Mist { return; } updateMeta(); - if (myMeta.live && needsPlayableKeys()){ - bool waitALittleLonger = true; + if (myMeta.live && !isReadyForPlay()){ unsigned int maxWaits = 15; - while (waitALittleLonger){ - waitALittleLonger = true; - if (myMeta.tracks.size()){ - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (it->second.keys.size() >= needsPlayableKeys()){ - waitALittleLonger = false; - break; - } - } - } - if (waitALittleLonger){ - Util::sleep(1000); - if (--maxWaits == 0){ - FAIL_MSG("Giving up waiting for playable tracks"); - waitALittleLonger = false; - } - updateMeta(); + while (!isReadyForPlay()){ + Util::sleep(1000); + if (--maxWaits == 0){ + FAIL_MSG("Giving up waiting for playable tracks"); + break; } + updateMeta(); } } } @@ -435,6 +433,7 @@ namespace Mist { return; } DEBUG_MSG(DLVL_VERYHIGH, "Loading track %lu, containing key %lld", trackId, keyNum); + INFO_MSG("Loading track %lu, containing key %lld", trackId, keyNum); unsigned int timeout = 0; unsigned long pageNum = pageNumForKey(trackId, keyNum); while (pageNum == -1){ @@ -482,6 +481,7 @@ namespace Mist { return; } currKeyOpen[trackId] = pageNum; + INFO_MSG("page %s loaded", id); } /// Prepares all tracks from selectedTracks for seeking to the specified ms position. @@ -507,7 +507,14 @@ namespace Mist { INFO_MSG("Aborting seek to %llums in track %u: past end of track.", pos, tid); return false; } - loadPageForKey(tid, getKeyForTime(tid, pos) + (getNextKey?1:0)); + unsigned int keyNum = getKeyForTime(tid, pos); + if (myMeta.tracks[tid].getKey(keyNum).getTime() > pos){ + if (myMeta.live){ + INFO_MSG("Actually seeking to %d, for %d is not available anymore", myMeta.tracks[tid].getKey(keyNum).getTime(), pos); + pos = myMeta.tracks[tid].getKey(keyNum).getTime(); + } + } + loadPageForKey(tid, keyNum + (getNextKey?1:0)); if (!nProxy.curPage.count(tid) || !nProxy.curPage[tid].mapped){ INFO_MSG("Aborting seek to %llums in track %u: not available.", pos, tid); return false; @@ -524,6 +531,7 @@ namespace Mist { tmpPack.reInit(mpd + tmp.offset, 0, true); tmp.time = tmpPack.getTime(); } + INFO_MSG("Found time %d", tmp.time); if (tmpPack){ buffer.insert(tmp); return true; @@ -1022,9 +1030,26 @@ namespace Mist { if (thisPacket.getTime() != nxt.time && nxt.time){ WARN_MSG("Loaded track %ld@%llu instead of %ld@%llu", thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time); } - if ((myMeta.tracks[nxt.tid].type == "video" && thisPacket.getFlag("keyframe")) || (++nonVideoCount % 30 == 0)){ + bool isVideoTrack = (myMeta.tracks[nxt.tid].type == "video"); + if ((isVideoTrack && thisPacket.getFlag("keyframe")) || (!isVideoTrack && (++nonVideoCount % 30 == 0))){ if (myMeta.live){ - updateMeta(); + if (myMeta.tracks[nxt.tid].type == "video"){ + //Check whether returned keyframe is correct. If not, wait for approximately 5 seconds while checking. + //Failure here will cause tracks to drop due to inconsistent internal state. + nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); + int counter = 0; + while(counter < 10 && myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime() != thisPacket.getTime()){ + if (counter++){ + //Only sleep 500ms if this is not the first updatemeta try + Util::sleep(500); + } + updateMeta(); + nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); + } + }else{ + //On non-video tracks, just update metadata and assume everything else is correct + updateMeta(); + } } nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); DEBUG_MSG(DLVL_VERYHIGH, "Track %u @ %llums = key %lu", nxt.tid, thisPacket.getTime(), nxtKeyNum[nxt.tid]); @@ -1081,6 +1106,12 @@ namespace Mist { if (nProxy.curPage[nxt.tid]){ if (nxt.offset < nProxy.curPage[nxt.tid].len){ unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset); + int ctr = 0; + //sleep at most half a second for new data. + while (!nextTime && ++ctr < 5){ + Util::sleep(1000); + nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset); + } if (nextTime){ nxt.time = nextTime; }else{ @@ -1100,7 +1131,7 @@ namespace Mist { unsigned long long int now = Util::epoch(); if (now != lastStats){ /*LTS-START*/ - if (statsPage.getData()[-1] > 127){ + if (!statsPage.isAlive()){ myConn.close(); return; } @@ -1136,7 +1167,7 @@ namespace Mist { return; } } - if (nProxy.userClient.getData()[-1] > 127){ + if (!nProxy.userClient.isAlive()){ myConn.close(); return; } diff --git a/src/output/output.h b/src/output/output.h index 3e0fbb5b..a54da52e 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -64,7 +64,7 @@ namespace Mist { void selectDefaultTracks(); bool connectToFile(std::string file); static bool listenMode(){return true;} - virtual unsigned int needsPlayableKeys(){return 2;} + virtual bool isReadyForPlay(); //virtuals. The optional virtuals have default implementations that do as little as possible. virtual void sendNext() {}//REQUIRED! Others are optional. virtual void prepareNext(); diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index 069de089..8fc8e0a0 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -29,7 +29,6 @@ namespace Mist { myConn.SendNow(sSize, 4); prep.sendTo(myConn); pushing = false; - fastAsPossibleTime = 0; } OutDTSC::~OutDTSC() {} @@ -45,29 +44,6 @@ namespace Mist { } void OutDTSC::sendNext(){ - if (!realTime && thisPacket.getTime() >= fastAsPossibleTime){ - realTime = 1000; - } - if (thisPacket.getFlag("keyframe")){ - std::set availableTracks; - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (it->second.type == "video" || it->second.type == "audio"){ - availableTracks.insert(it->first); - } - } - if (availableTracks != selectedTracks){ - //reset, resendheader - JSON::Value prep; - prep["cmd"] = "reset"; - /// \todo Make this securererer. - unsigned long sendSize = prep.packedSize(); - myConn.SendNow("DTCM"); - char sSize[4] = {0, 0, 0, 0}; - Bit::htobl(sSize, prep.packedSize()); - myConn.SendNow(sSize, 4); - prep.sendTo(myConn); - } - } myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen()); } @@ -81,15 +57,9 @@ namespace Mist { } myMeta.send(myConn, true, selectedTracks); if (myMeta.live){ - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (!fastAsPossibleTime || it->second.lastms < fastAsPossibleTime){ - fastAsPossibleTime = it->second.lastms; - realTime = 0; - } - } - }else{ - realTime = 1000; + realTime = 0; } + seek(0); } void OutDTSC::onRequest(){ diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index 68f22e95..463f49f0 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -3,30 +3,20 @@ #include namespace Mist { + bool OutHLS::isReadyForPlay() { + if (myMeta.tracks.size()){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.fragments.size() >= 3){ + return true; + } + } + } + return false; + } + ///\brief Builds an index file for HTTP Live streaming. ///\return The index file for HTTP Live Streaming. std::string OutHLS::liveIndex() { - - static int timer = 0; - bool checkWait = true; - while (checkWait && ++timer < 10){ - checkWait = false; - if (!myMeta.tracks.size()){ - checkWait = true; - } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (it->second.keys.size() <= 3){ - checkWait = true; - break; - } - } - if (checkWait){ - Util::sleep(500); - INFO_MSG("SLeeping timer %d", timer); - updateMeta(); - } - } - std::stringstream result; result << "#EXTM3U\r\n"; int audioId = -1; diff --git a/src/output/output_hls.h b/src/output/output_hls.h index 3a4d93c9..d8eca1e2 100644 --- a/src/output/output_hls.h +++ b/src/output/output_hls.h @@ -9,6 +9,7 @@ namespace Mist { static void init(Util::Config * cfg); void sendTS(const char * tsData, unsigned int len=188); void onHTTP(); + bool isReadyForPlay(); protected: std::string liveIndex(); std::string liveIndex(int tid, std::string & sessId); diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp index 6e70cf40..2bd6d8d6 100644 --- a/src/output/output_progressive_mp4.cpp +++ b/src/output/output_progressive_mp4.cpp @@ -9,6 +9,7 @@ namespace Mist { OutProgressiveMP4::OutProgressiveMP4(Socket::Connection & conn) : HTTPOutput(conn) { completeKeysOnly = false; } + OutProgressiveMP4::~OutProgressiveMP4() {} void OutProgressiveMP4::init(Util::Config * cfg) { @@ -747,25 +748,6 @@ namespace Mist { void OutProgressiveMP4::setvidTrack() { vidTrack = 0; - static int timer = 0; - bool checkWait = true; - while (checkWait && ++timer < 10){ - checkWait = false; - if (!myMeta.tracks.size()){ - checkWait = true; - } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (!it->second.keys.size()){ - checkWait = true; - break; - } - } - if (checkWait){ - Util::sleep(500); - updateMeta(); - } - } - if (!selectedTracks.size()){ selectDefaultTracks(); } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 2b0e1f83..a29920c0 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -153,12 +153,18 @@ namespace Mist { return !(config->getString("target").size()); } - unsigned int OutRTMP::needsPlayableKeys(){ + bool OutRTMP::isReadyForPlay(){ if (isPushing){ - return 0; - }else{ - return 2; + return true; } + if (myMeta.tracks.size()){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.keys.size() >= 2){ + return true; + } + } + } + return false; } void OutRTMP::parseVars(std::string data){ diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 2e170e66..87154c22 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -14,7 +14,7 @@ namespace Mist { void onRequest(); void sendNext(); void sendHeader(); - unsigned int needsPlayableKeys(); + bool isReadyForPlay(); static bool listenMode(); protected: bool isPushing;