Added Util::packetSorter with async/sync modes, set DTSC outputs to use async mode

This commit is contained in:
Thulinma 2021-07-04 22:14:40 +02:00
parent 6042c1ea70
commit dae32ede11
5 changed files with 278 additions and 135 deletions

View file

@ -201,6 +201,133 @@ void Util::sanitizeName(std::string &streamname){
}
}
/// Initalizes the packetSorter in sync mode.
Util::packetSorter::packetSorter(){
dequeMode = false;
}
/// Sets sync mode on if true (sync), off if false (async).
void Util::packetSorter::setSyncMode(bool synced){
if (dequeMode != !synced){
dequeMode = !synced;
if (!dequeMode){
//we've switched away from deque
for (std::deque<Util::sortedPageInfo>::iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){
insert(*it);
}
dequeBuffer.clear();
}else{
//we've switched away from set
for (std::set<Util::sortedPageInfo>::iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){
insert(*it);
}
setBuffer.clear();
}
}
}
/// Returns true if we're synced, false if async.
bool Util::packetSorter::getSyncMode() const{return !dequeMode;}
/// Returns the amount of packets currently in the sorter.
size_t Util::packetSorter::size() const{
if (dequeMode){return dequeBuffer.size();}else{return setBuffer.size();}
}
/// Clears all packets from the sorter; does not reset mode.
void Util::packetSorter::clear(){
dequeBuffer.clear();
setBuffer.clear();
}
/// Returns a pointer to the first packet in the sorter.
const Util::sortedPageInfo * Util::packetSorter::begin() const{
if (dequeMode){
return &*dequeBuffer.begin();
}else{
return &*setBuffer.begin();
}
}
/// Inserts a new packet in the sorter.
void Util::packetSorter::insert(const sortedPageInfo &pInfo){
if (dequeMode){
dequeBuffer.push_back(pInfo);
}else{
setBuffer.insert(pInfo);
}
}
/// Removes the given track ID packet from the sorter. Removes at most one packet, make sure to prevent duplicates elsewhere!
void Util::packetSorter::dropTrack(size_t tid){
if (dequeMode){
for (std::deque<Util::sortedPageInfo>::iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){
if (it->tid == tid){
dequeBuffer.erase(it);
return;
}
}
}else{
for (std::set<Util::sortedPageInfo>::iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){
if (it->tid == tid){
setBuffer.erase(it);
return;
}
}
}
}
/// Removes the first packet from the sorter and inserts the given packet.
void Util::packetSorter::replaceFirst(const sortedPageInfo &pInfo){
if (dequeMode){
dequeBuffer.pop_front();
if (dequeBuffer.size() && dequeBuffer.front().time > pInfo.time){
dequeBuffer.push_front(pInfo);
}else{
dequeBuffer.push_back(pInfo);
}
}else{
setBuffer.erase(setBuffer.begin());
setBuffer.insert(pInfo);
}
}
/// Removes the first packet from the sorter and inserts it back at the end. No-op for sync mode.
void Util::packetSorter::moveFirstToEnd(){
if (dequeMode){
dequeBuffer.push_back(dequeBuffer.front());
dequeBuffer.pop_front();
}
}
/// Returns true if there is an entry in the sorter for the given track ID.
bool Util::packetSorter::hasEntry(size_t tid) const{
if (dequeMode){
for (std::deque<Util::sortedPageInfo>::const_iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){
if (it->tid == tid){return true;}
}
}else{
for (std::set<Util::sortedPageInfo>::const_iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){
if (it->tid == tid){return true;}
}
}
return false;
}
/// Fills toFill with track IDs of tracks that are in the sorter.
void Util::packetSorter::getTrackList(std::set<size_t> &toFill) const{
toFill.clear();
if (dequeMode){
for (std::deque<Util::sortedPageInfo>::const_iterator it = dequeBuffer.begin(); it != dequeBuffer.end(); ++it){
toFill.insert(it->tid);
}
}else{
for (std::set<Util::sortedPageInfo>::const_iterator it = setBuffer.begin(); it != setBuffer.end(); ++it){
toFill.insert(it->tid);
}
}
}
JSON::Value Util::getStreamConfig(const std::string &streamname){
JSON::Value result;
if (streamname.size() > 100){

View file

@ -50,6 +50,39 @@ namespace Util{
extern trackSortOrder defaultTrackSortOrder;
void sortTracks(std::set<size_t> & validTracks, const DTSC::Meta & M, trackSortOrder sorting, std::list<size_t> & srtTrks);
/// This struct keeps packet information sorted in playback order
struct sortedPageInfo{
bool operator<(const sortedPageInfo &rhs) const{
if (time < rhs.time){return true;}
return (time == rhs.time && tid < rhs.tid);
}
size_t tid;
uint64_t time;
uint64_t offset;
size_t partIndex;
};
/// Packet sorter used to determine which packet should be output next
class packetSorter{
public:
packetSorter();
size_t size() const;
void clear();
const sortedPageInfo * begin() const;
void insert(const sortedPageInfo &pInfo);
void dropTrack(size_t tid);
void replaceFirst(const sortedPageInfo &pInfo);
void moveFirstToEnd();
bool hasEntry(size_t tid) const;
void getTrackList(std::set<size_t> &toFill) const;
void setSyncMode(bool synced);
bool getSyncMode() const;
private:
bool dequeMode;
std::deque<sortedPageInfo> dequeBuffer;
std::set<sortedPageInfo> setBuffer;
};
class DTSCShmReader{
public:

View file

@ -788,7 +788,7 @@ namespace Mist{
userSelect.erase(tid);
return false;
}
sortedPageInfo tmp;
Util::sortedPageInfo tmp;
tmp.tid = tid;
tmp.offset = 0;
tmp.partIndex = 0;
@ -1456,12 +1456,7 @@ namespace Mist{
streamName.c_str(), meta.getCodec(trackId).c_str(), trackId, usr.getKeyNum() + 1,
pageNumForKey(trackId, usr.getKeyNum() + 1), pageNumMax(trackId), reason.c_str());
// now actually drop the track from the buffer
for (std::set<sortedPageInfo>::iterator it = buffer.begin(); it != buffer.end(); ++it){
if (it->tid == trackId){
buffer.erase(it);
break;
}
}
buffer.dropTrack(trackId);
userSelect.erase(trackId);
}
@ -1471,7 +1466,7 @@ namespace Mist{
/// prepareNext continues as if this function was never called.
bool Output::getKeyFrame(){
// store copy of current state
std::set<sortedPageInfo> tmp_buffer = buffer;
Util::packetSorter tmp_buffer = buffer;
std::map<size_t, Comms::Users> tmp_userSelect = userSelect;
std::map<size_t, uint32_t> tmp_currentPage = currentPage;
@ -1528,26 +1523,13 @@ namespace Mist{
if (buffer.size() < userSelect.size()){
// prepare to drop any selectedTrack without buffer entry
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); ++it){
bool found = false;
for (std::set<sortedPageInfo>::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){
if (bi->tid == it->first){
found = true;
break;
}
}
if (!found){dropTracks.insert(it->first);}
if (!buffer.hasEntry(it->first)){dropTracks.insert(it->first);}
}
}else{
std::set<size_t> seen;
// prepare to drop any buffer entry without selectedTrack
for (std::set<sortedPageInfo>::iterator bi = buffer.begin(); bi != buffer.end(); ++bi){
if (!userSelect.count(bi->tid)){dropTracks.insert(bi->tid);}
if (seen.count(bi->tid)){
INFO_MSG("Dropping duplicate buffer entry for track %zu", bi->tid);
buffer.erase(bi);
return false;
}
seen.insert(bi->tid);
buffer.getTrackList(dropTracks);
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); ++it){
dropTracks.erase(it->first);
}
}
if (!dropTracks.size()){
@ -1565,118 +1547,129 @@ namespace Mist{
return false;
}
sortedPageInfo nxt = *(buffer.begin());
if (meta.reloadReplacedPagesIfNeeded()){return false;}
if (!M.getValidTracks().count(nxt.tid)){
dropTrack(nxt.tid, "disappeared from metadata");
return false;
}
// if we're going to read past the end of the data page, load the next page
// this only happens for VoD
if (nxt.offset >= curPage[nxt.tid].len ||
(!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
dropTrack(nxt.tid, "end of VoD track reached", false);
return false;
}
if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){
loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time));
nxt.offset = 0;
//Only read the next time if the page load succeeded and there is a packet to read from
if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0);
}
buffer.erase(buffer.begin());
buffer.insert(nxt);
return false;
}
dropTrack(nxt.tid, "VoD page load failure");
return false;
}
// We know this packet will be valid, pre-load it so we know its length
DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true);
Util::sortedPageInfo nxt;
uint64_t nextTime = 0;
//In case we're not in sync mode, we might have to retry a few times
for (size_t trackTries = 0; trackTries < buffer.size(); ++trackTries){
// Check if we have a next valid packet
if (curPage[nxt.tid].len > nxt.offset+preLoad.getDataLen()+20 && memcmp(curPage[nxt.tid].mapped + nxt.offset + preLoad.getDataLen(), "\000\000\000\000", 4)){
nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset + preLoad.getDataLen());
if (!nextTime){
WARN_MSG("Next packet is available (offset %" PRIu64 " / %" PRIu64 " on %s), but has no time. Please warn the developers if you see this message!", nxt.offset, curPage[nxt.tid].len, curPage[nxt.tid].name.c_str());
dropTrack(nxt.tid, "EOP: invalid next packet");
nxt = *(buffer.begin());
if (meta.reloadReplacedPagesIfNeeded()){return false;}
if (!M.getValidTracks().count(nxt.tid)){
dropTrack(nxt.tid, "disappeared from metadata");
return false;
}
if (nextTime < nxt.time){
std::stringstream errMsg;
errMsg << "next packet has timestamp " << nextTime << " but current timestamp is " << nxt.time;
dropTrack(nxt.tid, errMsg.str().c_str());
// if we're going to read past the end of the data page, load the next page
// this only happens for VoD
if (nxt.offset >= curPage[nxt.tid].len ||
(!memcmp(curPage[nxt.tid].mapped + nxt.offset, "\000\000\000\000", 4))){
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
dropTrack(nxt.tid, "end of VoD track reached", false);
return false;
}
if (M.getPageNumberForTime(nxt.tid, nxt.time) != currentPage[nxt.tid]){
loadPageForKey(nxt.tid, M.getPageNumberForTime(nxt.tid, nxt.time));
nxt.offset = 0;
//Only read the next time if the page load succeeded and there is a packet to read from
if (curPage[nxt.tid].mapped && curPage[nxt.tid].mapped[0] == 'D'){
nxt.time = getDTSCTime(curPage[nxt.tid].mapped, 0);
}
buffer.replaceFirst(nxt);
return false;
}
INFO_MSG("Invalid packet: no data @%" PRIu64 " for time %" PRIu64 " on track %zu", nxt.offset, nxt.time, nxt.tid);
dropTrack(nxt.tid, "VoD page load failure");
return false;
}
}else{
//no next packet yet!
//Check if this is the last packet of a VoD stream. Return success and drop the track.
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisIdx = nxt.tid;
dropTrack(nxt.tid, "end of VoD track reached", false);
return true;
}
uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
//Check if there exists a different page for the next key
uint32_t nextKeyPage = INVALID_KEY_NUM;
//Make sure we only try to read the page for the next key if it actually should be available
DTSC::Keys keys(M.keys(nxt.tid));
if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);}
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
// If so, the next key is our next packet
nextTime = keys.getTime(thisKey + 1);
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
// We know this packet will be valid, pre-load it so we know its length
DTSC::Packet preLoad(curPage[nxt.tid].mapped + nxt.offset, 0, true);
nextTime = 0;
// Check if we have a next valid packet
if (curPage[nxt.tid].len > nxt.offset+preLoad.getDataLen()+20 && memcmp(curPage[nxt.tid].mapped + nxt.offset + preLoad.getDataLen(), "\000\000\000\000", 4)){
nextTime = getDTSCTime(curPage[nxt.tid].mapped, nxt.offset + preLoad.getDataLen());
if (!nextTime){
WARN_MSG("Next packet is available (offset %" PRIu64 " / %" PRIu64 " on %s), but has no time. Please warn the developers if you see this message!", nxt.offset, curPage[nxt.tid].len, curPage[nxt.tid].name.c_str());
dropTrack(nxt.tid, "EOP: invalid next packet");
return false;
}
if (nextTime < nxt.time){
std::stringstream errMsg;
errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time;
errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage;
errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid();
errMsg << "next packet has timestamp " << nextTime << " but current timestamp is " << nxt.time;
dropTrack(nxt.tid, errMsg.str().c_str());
return false;
}
}else{
//Okay, there's no next page yet, and no next packet on this page either.
//That means we're waiting for data to show up, somewhere.
// after ~25 seconds, give up and drop the track.
if (++emptyCount >= 2500){
dropTrack(nxt.tid, "EOP: data wait timeout");
return false;
}
//every ~1 second, check if the stream is not offline
if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){
Util::logExitReason("Stream source shut down");
thisPacket.null();
//no next packet yet!
//Check if this is the last packet of a VoD stream. Return success and drop the track.
if (M.getVod() && nxt.time >= M.getLastms(nxt.tid)){
thisPacket.reInit(curPage[nxt.tid].mapped + nxt.offset, 0, true);
thisIdx = nxt.tid;
dropTrack(nxt.tid, "end of VoD track reached", false);
return true;
}
//every ~16 seconds, reconnect to metadata
if (emptyCount % 1600 == 0){
INFO_MSG("Reconnecting to input; track %" PRIu64 " key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]);
reconnect();
if (!meta){
onFail("Could not connect to stream data", true);
uint32_t thisKey = M.getKeyNumForTime(nxt.tid, nxt.time);
//Check if there exists a different page for the next key
uint32_t nextKeyPage = INVALID_KEY_NUM;
//Make sure we only try to read the page for the next key if it actually should be available
DTSC::Keys keys(M.keys(nxt.tid));
if (keys.getEndValid() >= thisKey+1){nextKeyPage = M.getPageNumberForKey(nxt.tid, thisKey + 1);}
if (nextKeyPage != INVALID_KEY_NUM && nextKeyPage != currentPage[nxt.tid]){
// If so, the next key is our next packet
nextTime = keys.getTime(thisKey + 1);
//If the next packet should've been before the current packet, something is wrong. Abort, abort!
if (nextTime < nxt.time){
std::stringstream errMsg;
errMsg << "next key (" << (thisKey+1) << ") time " << nextTime << " but current time " << nxt.time;
errMsg << "; currPage=" << currentPage[nxt.tid] << ", nxtPage=" << nextKeyPage;
errMsg << ", firstKey=" << keys.getFirstValid() << ", endKey=" << keys.getEndValid();
dropTrack(nxt.tid, errMsg.str().c_str());
return false;
}
}else{
if (!buffer.getSyncMode() && trackTries < buffer.size()-1){
//We shuffle the just-tried packet back to the end of the queue, then retry up to buffer.size() times
buffer.moveFirstToEnd();
continue;
}
//Okay, there's no next page yet, and no next packet on this page either.
//That means we're waiting for data to show up, somewhere.
// after ~25 seconds, give up and drop the track.
if (++emptyCount >= 2500){
dropTrack(nxt.tid, "EOP: data wait timeout");
return false;
}
//every ~1 second, check if the stream is not offline
if (emptyCount % 100 == 0 && M.getLive() && Util::getStreamStatus(streamName) == STRMSTAT_OFF){
Util::logExitReason("Stream source shut down");
thisPacket.null();
return true;
}
// if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile.
if (!meta){
Util::logExitReason("Attempted reconnect to source failed");
thisPacket.null();
return true;
//every ~16 seconds, reconnect to metadata
if (emptyCount % 1600 == 0){
INFO_MSG("Reconnecting to input; track %" PRIu64 " key %" PRIu32 " is on page %" PRIu32 " and we're currently serving %" PRIu32 " from %" PRIu32, nxt.tid, thisKey+1, nextKeyPage, thisKey, currentPage[nxt.tid]);
reconnect();
if (!meta){
onFail("Could not connect to stream data", true);
thisPacket.null();
return true;
}
// if we don't have a connection to the metadata here, this means the stream has gone offline in the meanwhile.
if (!meta){
Util::logExitReason("Attempted reconnect to source failed");
thisPacket.null();
return true;
}
return false;//no sleep after reconnect
}
return false;//no sleep after reconnect
//Fine! We didn't want a packet, anyway. Let's try again later.
playbackSleep(10);
return false;
}
//Fine! We didn't want a packet, anyway. Let's try again later.
playbackSleep(10);
return false;
}
}
@ -1693,7 +1686,7 @@ namespace Mist{
emptyCount = 0; // valid packet - reset empty counter
if (!userSelect[nxt.tid]){
INFO_MSG("Track %zu is not alive!", nxt.tid);
dropTrack(nxt.tid, "track is not alive!");
return false;
}
@ -1710,9 +1703,7 @@ namespace Mist{
++nxt.partIndex;
// exchange the current packet in the buffer for the next one
buffer.erase(buffer.begin());
buffer.insert(nxt);
buffer.replaceFirst(nxt);
return true;
}

View file

@ -10,23 +10,11 @@
#include <mist/shared_memory.h>
#include <mist/socket.h>
#include <mist/timing.h>
#include <mist/stream.h>
#include <set>
namespace Mist{
/// This struct keeps packet information sorted in playback order, so the
/// Mist::Output class knows when to buffer which packet.
struct sortedPageInfo{
bool operator<(const sortedPageInfo &rhs) const{
if (time < rhs.time){return true;}
return (time == rhs.time && tid < rhs.tid);
}
size_t tid;
uint64_t time;
uint64_t offset;
size_t partIndex;
};
/// The output class is intended to be inherited by MistOut process classes.
/// It contains all generic code and logic, while the child classes implement
/// anything specific to particular protocols or containers.
@ -85,6 +73,9 @@ namespace Mist{
void selectAllTracks();
/// Accessor for buffer.setSyncMode.
void setSyncMode(bool synced){buffer.setSyncMode(synced);}
private: // these *should* not be messed with in child classes.
/*LTS-START*/
void Log(std::string type, std::string message);
@ -102,7 +93,7 @@ namespace Mist{
bool isRecordingToFile;
uint64_t lastStats; ///< Time of last sending of stats.
std::set<sortedPageInfo> buffer; ///< A sorted list of next-to-be-loaded packets.
Util::packetSorter buffer; ///< A sorted list of next-to-be-loaded packets.
bool sought; ///< If a seek has been done, this is set to true. Used for seeking on
///< prepareNext().
std::string prevHost; ///< Old value for getConnectedBinHost, for caching

View file

@ -12,6 +12,7 @@
namespace Mist{
OutDTSC::OutDTSC(Socket::Connection &conn) : Output(conn){
JSON::Value prep;
setSyncMode(false);
if (config->getString("target").size()){
streamName = config->getString("streamname");
pushUrl = HTTP::URL(config->getString("target"));