diff --git a/lib/stream.cpp b/lib/stream.cpp index 821734b2..11832446 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -201,6 +201,133 @@ void Util::sanitizeName(std::string &streamname){ } } +/// Initalizes the packetSorter in sync mode. +Util::packetSorter::packetSorter(){ + dequeMode = false; +} + +/// Sets sync mode on if true (sync), off if false (async). +void Util::packetSorter::setSyncMode(bool synced){ + if (dequeMode != !synced){ + dequeMode = !synced; + if (!dequeMode){ + //we've switched away from deque + for (std::deque::iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){ + insert(*it); + } + dequeBuffer.clear(); + }else{ + //we've switched away from set + for (std::set::iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){ + insert(*it); + } + setBuffer.clear(); + } + } +} + +/// Returns true if we're synced, false if async. +bool Util::packetSorter::getSyncMode() const{return !dequeMode;} + +/// Returns the amount of packets currently in the sorter. +size_t Util::packetSorter::size() const{ + if (dequeMode){return dequeBuffer.size();}else{return setBuffer.size();} +} + +/// Clears all packets from the sorter; does not reset mode. +void Util::packetSorter::clear(){ + dequeBuffer.clear(); + setBuffer.clear(); +} + +/// Returns a pointer to the first packet in the sorter. +const Util::sortedPageInfo * Util::packetSorter::begin() const{ + if (dequeMode){ + return &*dequeBuffer.begin(); + }else{ + return &*setBuffer.begin(); + } +} + +/// Inserts a new packet in the sorter. +void Util::packetSorter::insert(const sortedPageInfo &pInfo){ + if (dequeMode){ + dequeBuffer.push_back(pInfo); + }else{ + setBuffer.insert(pInfo); + } +} + +/// Removes the given track ID packet from the sorter. Removes at most one packet, make sure to prevent duplicates elsewhere! +void Util::packetSorter::dropTrack(size_t tid){ + if (dequeMode){ + for (std::deque::iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){ + if (it->tid == tid){ + dequeBuffer.erase(it); + return; + } + } + }else{ + for (std::set::iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){ + if (it->tid == tid){ + setBuffer.erase(it); + return; + } + } + } +} + +/// Removes the first packet from the sorter and inserts the given packet. +void Util::packetSorter::replaceFirst(const sortedPageInfo &pInfo){ + if (dequeMode){ + dequeBuffer.pop_front(); + if (dequeBuffer.size() && dequeBuffer.front().time > pInfo.time){ + dequeBuffer.push_front(pInfo); + }else{ + dequeBuffer.push_back(pInfo); + } + }else{ + setBuffer.erase(setBuffer.begin()); + setBuffer.insert(pInfo); + } +} + +/// Removes the first packet from the sorter and inserts it back at the end. No-op for sync mode. +void Util::packetSorter::moveFirstToEnd(){ + if (dequeMode){ + dequeBuffer.push_back(dequeBuffer.front()); + dequeBuffer.pop_front(); + } +} + +/// Returns true if there is an entry in the sorter for the given track ID. +bool Util::packetSorter::hasEntry(size_t tid) const{ + if (dequeMode){ + for (std::deque::const_iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){ + if (it->tid == tid){return true;} + } + }else{ + for (std::set::const_iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){ + if (it->tid == tid){return true;} + } + } + return false; +} + +/// Fills toFill with track IDs of tracks that are in the sorter. +void Util::packetSorter::getTrackList(std::set &toFill) const{ + toFill.clear(); + if (dequeMode){ + for (std::deque::const_iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){ + toFill.insert(it->tid); + } + }else{ + for (std::set::const_iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){ + toFill.insert(it->tid); + } + } +} + JSON::Value Util::getStreamConfig(const std::string &streamname){ JSON::Value result; if (streamname.size() > 100){ diff --git a/lib/stream.h b/lib/stream.h index 15b3cadb..fa760eeb 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -50,6 +50,39 @@ namespace Util{ extern trackSortOrder defaultTrackSortOrder; void sortTracks(std::set & validTracks, const DTSC::Meta & M, trackSortOrder sorting, std::list & srtTrks); + /// This struct keeps packet information sorted in playback order + struct sortedPageInfo{ + bool operator<(const sortedPageInfo &rhs) const{ + if (time < rhs.time){return true;} + return (time == rhs.time && tid < rhs.tid); + } + size_t tid; + uint64_t time; + uint64_t offset; + size_t partIndex; + }; + + /// Packet sorter used to determine which packet should be output next + class packetSorter{ + public: + packetSorter(); + size_t size() const; + void clear(); + const sortedPageInfo * begin() const; + void insert(const sortedPageInfo &pInfo); + void dropTrack(size_t tid); + void replaceFirst(const sortedPageInfo &pInfo); + void moveFirstToEnd(); + bool hasEntry(size_t tid) const; + void getTrackList(std::set &toFill) const; + void setSyncMode(bool synced); + bool getSyncMode() const; + private: + bool dequeMode; + std::deque dequeBuffer; + std::set setBuffer; + }; + class DTSCShmReader{ public: diff --git a/src/output/output.cpp b/src/output/output.cpp index 148b77b5..b2cbb6d3 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -788,7 +788,7 @@ namespace Mist{ userSelect.erase(tid); return false; } - sortedPageInfo tmp; + Util::sortedPageInfo tmp; tmp.tid = tid; tmp.offset = 0; tmp.partIndex = 0; @@ -1456,12 +1456,7 @@ namespace Mist{ streamName.c_str(), meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1, pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str()); // now actually drop the track from the buffer - for (std::set::iterator it = buffer.begin(); it != buffer.end(); ++it){ - if (it->tid == trackId){ - buffer.erase(it); - break; - } - } + buffer.dropTrack(trackId); userSelect.erase(trackId); } @@ -1471,7 +1466,7 @@ namespace Mist{ /// prepareNext continues as if this function was never called. bool Output::getKeyFrame(){ // store copy of current state - std::set tmp_buffer = buffer; + Util::packetSorter tmp_buffer = buffer; std::map tmp_userSelect = userSelect; std::map tmp_currentPage = currentPage; @@ -1528,26 +1523,13 @@ namespace Mist{ if (buffer.size() < userSelect.size()){ // prepare to drop any selectedTrack without buffer entry for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ - bool found = false; - for (std::set::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){ - if (bi->tid == it->first){ - found = true; - break; - } - } - if (!found){dropTracks.insert(it->first);} + if (!buffer.hasEntry(it->first)){dropTracks.insert(it->first);} } }else{ - std::set seen; // prepare to drop any buffer entry without selectedTrack - for (std::set::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){ - if (!userSelect.count(bi->tid)){dropTracks.insert(bi->tid);} - if (seen.count(bi->tid)){ - INFO_MSG("Dropping duplicate buffer entry for track %zu", bi->tid); - buffer.erase(bi); - return false; - } - seen.insert(bi->tid); + buffer.getTrackList(dropTracks); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ + dropTracks.erase(it->first); } } if (!dropTracks.size()){ @@ -1565,118 +1547,129 @@ namespace Mist{ return false; } - sortedPageInfo nxt = *(buffer.begin()); - - if (meta.reloadReplacedPagesIfNeeded()){return false;} - if (!M.getValidTracks().count(nxt.tid)){ - dropTrack(nxt.tid, "disappeared from metadata"); - return false; - } - - // if we're going to read past the end of the data page, load the next page - // this only happens for VoD - if (nxt.offset >= curPage[nxt.tid].len || - (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){ - if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ - dropTrack(nxt.tid, "end of VoD track reached", false); - return false; - } - if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){ - loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time)); - nxt.offset = 0; - //Only read the next time if the page load succeeded and there is a packet to read from - if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){ - nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0); - } - buffer.erase(buffer.begin()); - buffer.insert(nxt); - return false; - } - dropTrack(nxt.tid, "VoD page load failure"); - return false; - } - - // We know this packet will be valid, pre-load it so we know its length - DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); - + Util::sortedPageInfo nxt; uint64_t nextTime = 0; + //In case we're not in sync mode, we might have to retry a few times + for (size_t trackTries = 0; trackTries < buffer.size(); ++trackTries){ - // Check if we have a next valid packet - if (curPage[nxt.tid].len > nxt.offset+preLoad.getDataLen()+20 && memcmp(curPage[nxt.tid].mapped + nxt.offset + preLoad.getDataLen(), "\000\000\000\000", 4)){ - nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset + preLoad.getDataLen()); - if (!nextTime){ - WARN_MSG("Next packet is available (offset %" PRIu64 " / %" PRIu64 " on %s), but has no time. Please warn the developers if you see this message!", nxt.offset, curPage[nxt.tid].len, curPage[nxt.tid].name.c_str()); - dropTrack(nxt.tid, "EOP: invalid next packet"); + nxt = *(buffer.begin()); + + if (meta.reloadReplacedPagesIfNeeded()){return false;} + if (!M.getValidTracks().count(nxt.tid)){ + dropTrack(nxt.tid, "disappeared from metadata"); return false; } - if (nextTime < nxt.time){ - std::stringstream errMsg; - errMsg << "next packet has timestamp " << nextTime << " but current timestamp is " << nxt.time; - dropTrack(nxt.tid, errMsg.str().c_str()); + + // if we're going to read past the end of the data page, load the next page + // this only happens for VoD + if (nxt.offset >= curPage[nxt.tid].len || + (!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){ + if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ + dropTrack(nxt.tid, "end of VoD track reached", false); + return false; + } + if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){ + loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time)); + nxt.offset = 0; + //Only read the next time if the page load succeeded and there is a packet to read from + if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){ + nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0); + } + buffer.replaceFirst(nxt); + return false; + } + INFO_MSG("Invalid packet: no data @%" PRIu64 " for time %" PRIu64 " on track %zu", nxt.offset, nxt.time, nxt.tid); + dropTrack(nxt.tid, "VoD page load failure"); return false; } - }else{ - //no next packet yet! - //Check if this is the last packet of a VoD stream. Return success and drop the track. - if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ - thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); - thisIdx = nxt.tid; - dropTrack(nxt.tid, "end of VoD track reached", false); - return true; - } - uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); - //Check if there exists a different page for the next key - uint32_t nextKeyPage = INVALID_KEY_NUM; - //Make sure we only try to read the page for the next key if it actually should be available - DTSC::Keys keys(M.keys(nxt.tid)); - if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);} - if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){ - // If so, the next key is our next packet - nextTime = keys.getTime(thisKey + 1); - //If the next packet should've been before the current packet, something is wrong. Abort, abort! + // We know this packet will be valid, pre-load it so we know its length + DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true); + + nextTime = 0; + + // Check if we have a next valid packet + if (curPage[nxt.tid].len > nxt.offset+preLoad.getDataLen()+20 && memcmp(curPage[nxt.tid].mapped + nxt.offset + preLoad.getDataLen(), "\000\000\000\000", 4)){ + nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset + preLoad.getDataLen()); + if (!nextTime){ + WARN_MSG("Next packet is available (offset %" PRIu64 " / %" PRIu64 " on %s), but has no time. Please warn the developers if you see this message!", nxt.offset, curPage[nxt.tid].len, curPage[nxt.tid].name.c_str()); + dropTrack(nxt.tid, "EOP: invalid next packet"); + return false; + } if (nextTime < nxt.time){ std::stringstream errMsg; - errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time; - errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage; - errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid(); + errMsg << "next packet has timestamp " << nextTime << " but current timestamp is " << nxt.time; dropTrack(nxt.tid, errMsg.str().c_str()); return false; } }else{ - //Okay, there's no next page yet, and no next packet on this page either. - //That means we're waiting for data to show up, somewhere. - // after ~25 seconds, give up and drop the track. - if (++emptyCount >= 2500){ - dropTrack(nxt.tid, "EOP: data wait timeout"); - return false; - } - //every ~1 second, check if the stream is not offline - if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){ - Util::logExitReason("Stream source shut down"); - thisPacket.null(); + //no next packet yet! + //Check if this is the last packet of a VoD stream. Return success and drop the track. + if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){ + thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true); + thisIdx = nxt.tid; + dropTrack(nxt.tid, "end of VoD track reached", false); return true; } - //every ~16 seconds, reconnect to metadata - if (emptyCount % 1600 == 0){ - INFO_MSG("Reconnecting to input; track %" PRIu64 " key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]); - reconnect(); - if (!meta){ - onFail("Could not connect to stream data", true); + uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time); + //Check if there exists a different page for the next key + uint32_t nextKeyPage = INVALID_KEY_NUM; + //Make sure we only try to read the page for the next key if it actually should be available + DTSC::Keys keys(M.keys(nxt.tid)); + if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);} + if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){ + // If so, the next key is our next packet + nextTime = keys.getTime(thisKey + 1); + + //If the next packet should've been before the current packet, something is wrong. Abort, abort! + if (nextTime < nxt.time){ + std::stringstream errMsg; + errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time; + errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage; + errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid(); + dropTrack(nxt.tid, errMsg.str().c_str()); + return false; + } + }else{ + if (!buffer.getSyncMode() && trackTries < buffer.size()-1){ + //We shuffle the just-tried packet back to the end of the queue, then retry up to buffer.size() times + buffer.moveFirstToEnd(); + continue; + } + //Okay, there's no next page yet, and no next packet on this page either. + //That means we're waiting for data to show up, somewhere. + // after ~25 seconds, give up and drop the track. + if (++emptyCount >= 2500){ + dropTrack(nxt.tid, "EOP: data wait timeout"); + return false; + } + //every ~1 second, check if the stream is not offline + if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){ + Util::logExitReason("Stream source shut down"); thisPacket.null(); return true; } - // if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile. - if (!meta){ - Util::logExitReason("Attempted reconnect to source failed"); - thisPacket.null(); - return true; + //every ~16 seconds, reconnect to metadata + if (emptyCount % 1600 == 0){ + INFO_MSG("Reconnecting to input; track %" PRIu64 " key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]); + reconnect(); + if (!meta){ + onFail("Could not connect to stream data", true); + thisPacket.null(); + return true; + } + // if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile. + if (!meta){ + Util::logExitReason("Attempted reconnect to source failed"); + thisPacket.null(); + return true; + } + return false;//no sleep after reconnect } - return false;//no sleep after reconnect + //Fine! We didn't want a packet, anyway. Let's try again later. + playbackSleep(10); + return false; } - //Fine! We didn't want a packet, anyway. Let's try again later. - playbackSleep(10); - return false; } } @@ -1693,7 +1686,7 @@ namespace Mist{ emptyCount = 0; // valid packet - reset empty counter if (!userSelect[nxt.tid]){ - INFO_MSG("Track %zu is not alive!", nxt.tid); + dropTrack(nxt.tid, "track is not alive!"); return false; } @@ -1710,9 +1703,7 @@ namespace Mist{ ++nxt.partIndex; // exchange the current packet in the buffer for the next one - buffer.erase(buffer.begin()); - buffer.insert(nxt); - + buffer.replaceFirst(nxt); return true; } diff --git a/src/output/output.h b/src/output/output.h index 20425fde..93e3a38a 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -10,23 +10,11 @@ #include #include #include +#include #include namespace Mist{ - /// This struct keeps packet information sorted in playback order, so the - /// Mist::Output class knows when to buffer which packet. - struct sortedPageInfo{ - bool operator<(const sortedPageInfo &rhs) const{ - if (time < rhs.time){return true;} - return (time == rhs.time && tid < rhs.tid); - } - size_t tid; - uint64_t time; - uint64_t offset; - size_t partIndex; - }; - /// The output class is intended to be inherited by MistOut process classes. /// It contains all generic code and logic, while the child classes implement /// anything specific to particular protocols or containers. @@ -85,6 +73,9 @@ namespace Mist{ void selectAllTracks(); + /// Accessor for buffer.setSyncMode. + void setSyncMode(bool synced){buffer.setSyncMode(synced);} + private: // these *should* not be messed with in child classes. /*LTS-START*/ void Log(std::string type, std::string message); @@ -102,7 +93,7 @@ namespace Mist{ bool isRecordingToFile; uint64_t lastStats; ///< Time of last sending of stats. - std::set buffer; ///< A sorted list of next-to-be-loaded packets. + Util::packetSorter buffer; ///< A sorted list of next-to-be-loaded packets. bool sought; ///< If a seek has been done, this is set to true. Used for seeking on ///< prepareNext(). std::string prevHost; ///< Old value for getConnectedBinHost, for caching diff --git a/src/output/output_dtsc.cpp b/src/output/output_dtsc.cpp index 3ee6e61f..8cc16a69 100644 --- a/src/output/output_dtsc.cpp +++ b/src/output/output_dtsc.cpp @@ -12,6 +12,7 @@ namespace Mist{ OutDTSC::OutDTSC(Socket::Connection &conn) : Output(conn){ JSON::Value prep; + setSyncMode(false); if (config->getString("target").size()){ streamName = config->getString("streamname"); pushUrl = HTTP::URL(config->getString("target"));