MistInHLS improvements and speedups, part 1/2

This commit is contained in:
Thulinma 2023-05-30 18:00:20 +02:00
parent b7c6815e4f
commit ed1c291955
6 changed files with 152 additions and 119 deletions

View file

@ -1226,42 +1226,23 @@ namespace Mist{
}
void Input::removeUnused(){
uint64_t timeout = config->getInteger("pagetimeout") * 1000;
uint64_t cTime = Util::bootSecs();
std::set<size_t> validTracks = M.getValidTracks();
std::map<size_t, std::set<uint32_t> > checkedPages;
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
Util::RelAccX &tPages = meta.pages(*it);
for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
uint64_t pageNum = tPages.getInt("firstkey", i);
checkedPages[*it].insert(pageNum);
if (pageCounter[*it].count(pageNum)){
// If the page is still being written to, reset the counter rather than potentially unloading it
if (isCurrentLivePage(*it, pageNum)){
pageCounter[*it][pageNum] = cTime;
continue;
}
if (cTime > pageCounter[*it][pageNum] + DEFAULT_PAGE_TIMEOUT){
pageCounter[*it].erase(pageNum);
bufferRemove(*it, pageNum);
}
}else{
pageCounter[*it][pageNum] = cTime;
}
}
}
//Check pages we buffered but forgot about
for (std::map<size_t, std::map<uint32_t, uint64_t> >::iterator it = pageCounter.begin();
it != pageCounter.end(); it++){
std::set<uint32_t> deletedEntries;
for (std::map<uint32_t, uint64_t>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){
if (!checkedPages.count(it->first) || !checkedPages[it->first].count(it2->first)){
INFO_MSG("Deleting forgotten page %zu:%" PRIu32, it->first, it2->first);
if (isRecentLivePage(it->first, it2->first, timeout)){continue;}
if (cTime > it2->second + DEFAULT_PAGE_TIMEOUT){
deletedEntries.insert(it2->first);
bufferRemove(it->first, it2->first);
it->second.erase(it2);
it2 = it->second.begin();
}
}
while (deletedEntries.size()){
it->second.erase(*(deletedEntries.begin()));
deletedEntries.erase(deletedEntries.begin());
}
}
}
std::string formatGUID(const std::string &val){
@ -1509,9 +1490,9 @@ namespace Mist{
pageIdx = i;
}
uint32_t pageNumber = tPages.getInt("firstkey", pageIdx);
pageCounter[idx][pageNumber] = Util::bootSecs();
if (isBuffered(idx, pageNumber, meta)){
// Mark the page as still actively requested
pageCounter[idx][pageNumber] = Util::bootSecs();
DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32
". Cancelling bufferFrame",
idx, keyNum, pageNumber);
@ -1666,7 +1647,7 @@ namespace Mist{
INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter,
tPages.getInt("parts", pageIdx), byteCounter);
pageCounter[idx].erase(pageNumber);
bufferRemove(idx, pageNumber);
bufferRemove(idx, pageNumber, pageIdx);
return false;
}else{
INFO_MSG("Track %zu, page %" PRIu32 " (" PRETTY_PRINT_MSTIME " - " PRETTY_PRINT_MSTIME ") buffered in %" PRIu64 "ms",

View file

@ -74,7 +74,7 @@ namespace Mist{
virtual bool openStreamSource(){return readHeader();}
virtual void closeStreamSource(){}
virtual void parseStreamHeader(){}
void checkHeaderTimes(const HTTP::URL & streamFile);
virtual void checkHeaderTimes(const HTTP::URL & streamFile);
virtual void removeUnused();
virtual void convert();
virtual void serve();

View file

@ -35,8 +35,15 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){
}
const size_t Z = ts.find_first_of("Z+-", T);
const std::string date = ts.substr(0, T);
const std::string time = ts.substr(T + 1, Z - T - 1);
const std::string zone = ts.substr(Z);
std::string time;
std::string zone;
if (Z == std::string::npos){
WARN_MSG("HLS segment timestamp is missing timezone information! Assumed to be UTC.");
time = ts.substr(T + 1);
}else{
time = ts.substr(T + 1, Z - T - 1);
zone = ts.substr(Z);
}
unsigned long year, month, day;
if (sscanf(date.c_str(), "%lu-%lu-%lu", &year, &month, &day) != 3){
ERROR_MSG("Could not parse date: %s", date.c_str());
@ -516,6 +523,7 @@ namespace Mist{
DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str());
while (std::getline(input, line)){
if (input.eof()){break;} // Skip last line, might be incomplete
DONTEVEN_MSG("Parsing line '%s'", line.c_str());
cleanLine(line);
if (line.empty()){continue;}// skip empty lines
@ -653,6 +661,9 @@ namespace Mist{
cleanLine(entry.filename);
entry.bytePos = totalBytes;
entry.duration = duration;
if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){
DTSC::veryUglyJitterOverride = entry.duration * 1000;
}
entry.mUTC = nextUTC;
if (key.size() && iv.size()){
@ -663,12 +674,12 @@ namespace Mist{
memset(entry.keyAES, 0, 16);
}
if (!isUrl()){
std::ifstream fileSource;
std::string test = root.link(entry.filename).getFilePath();
fileSource.open(test.c_str(), std::ios::ate | std::ios::binary);
if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));}
totalBytes += fileSource.tellg();
if (!isUrl()){
std::ifstream fileSource;
std::string test = root.link(entry.filename).getFilePath();
fileSource.open(test.c_str(), std::ios::ate | std::ios::binary);
if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));}
totalBytes += fileSource.tellg();
}
entry.timestamp = lastTimestamp + startTime;
@ -747,8 +758,15 @@ namespace Mist{
// If the playlist is of event type, init the amount of segments in the playlist
if (isLiveDVR){
// Set the previousSegmentIndex by quickly going through the existing PLS files
setParsedSegments();
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end();
pListIt++){
parsedSegments[pListIt->first] = 0;
INFO_MSG("Playlist %" PRIu32 " contains %zu segments", pListIt->first, pListIt->second.size());
}
meta.setLive(true);
meta.setVod(true);
streamIsLive = true;
@ -758,69 +776,71 @@ namespace Mist{
}
void inputHLS::parseStreamHeader(){
if (!initPlaylist(config->getString("input"))){
if (!readExistingHeader()){
if (!initPlaylist(config->getString("input"))){
Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting");
return;
}
uint64_t oldBootMsOffset = M.getBootMsOffset();
meta.reInit(isSingular() ? streamName : "", false);
meta.setUTCOffset(zUTC);
meta.setBootMsOffset(oldBootMsOffset);
INFO_MSG("Parsing live stream to create header...");
TS::Packet packet; // to analyse and extract data
int pidCounter = 1;
return;
}
uint64_t oldBootMsOffset = M.getBootMsOffset();
meta.reInit(isSingular() ? streamName : "", false);
meta.setUTCOffset(zUTC);
meta.setBootMsOffset(oldBootMsOffset);
INFO_MSG("Parsing live stream to create header...");
TS::Packet packet; // to analyse and extract data
int pidCounter = 1;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
// Skip empty playlists
if (!pListIt->second.size()){continue;}
int prepidCounter = pidCounter;
tsStream.clear();
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
// Skip empty playlists
if (!pListIt->second.size()){continue;}
int prepidCounter = pidCounter;
tsStream.clear();
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end(); ++entryIt){
keepAlive();
if (!segDowner.loadSegment(*entryIt)){
WARN_MSG("Skipping segment that could not be loaded in an attempt to recover");
tsStream.clear();
continue;
}
do{
if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){
WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str());
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end(); ++entryIt){
keepAlive();
if (!segDowner.loadSegment(*entryIt)){
WARN_MSG("Skipping segment that could not be loaded in an attempt to recover");
tsStream.clear();
break; // Abort load
continue;
}
tsStream.parse(packet, entryIt->bytePos);
if (tsStream.hasPacketOnEachTrack()){
while (tsStream.hasPacket()){
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
int tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
do{
if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){
WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str());
tsStream.clear();
break; // Abort load
}
tsStream.parse(packet, entryIt->bytePos);
size_t idx = M.trackIDToIndex(packetId, getpid());
if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
if (idx != INVALID_TRACK_ID){
meta.setMinKeepAway(idx, globalWaitTime * 2000);
VERYHIGH_MSG("setting minKeepAway = %" PRIu32 " for track: %zu", globalWaitTime * 2000, idx);
if (tsStream.hasPacketOnEachTrack()){
while (tsStream.hasPacket()){
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
int tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
size_t idx = M.trackIDToIndex(packetId, getpid());
if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
if (idx != INVALID_TRACK_ID){
meta.setMinKeepAway(idx, globalWaitTime * 2000);
VERYHIGH_MSG("setting minKeepAway = %" PRIu32 " for track: %zu", globalWaitTime * 2000, idx);
}
}
}
break; // we have all tracks discovered, next playlist!
}
break; // we have all tracks discovered, next playlist!
}while (!segDowner.atEnd());
if (!segDowner.atEnd()){
segDowner.close();
tsStream.clear();
}
}while (!segDowner.atEnd());
if (!segDowner.atEnd()){
segDowner.close();
tsStream.clear();
}
if (prepidCounter < pidCounter){break;}// We're done reading this playlist!
if (prepidCounter < pidCounter){break;}// We're done reading this playlist!
}
}
}
tsStream.clear();
@ -839,10 +859,6 @@ namespace Mist{
return false;
}
// Check if the DTSH file contains all expected data
if (!M.inputLocalVars.isMember("streamoffset")){
INFO_MSG("Header needs update as it contains no streamoffset, regenerating");
return false;
}
if (!M.inputLocalVars.isMember("playlistEntries")){
INFO_MSG("Header needs update as it contains no playlist entries, regenerating");
return false;
@ -869,6 +885,9 @@ namespace Mist{
newEntry.bytePos = thisEntry[1u].asInt();
newEntry.mUTC = thisEntry[2u].asInt();
newEntry.duration = thisEntry[3u].asDouble();
if (newEntry.duration * 1000 > DTSC::veryUglyJitterOverride){
DTSC::veryUglyJitterOverride = newEntry.duration * 1000;
}
newEntry.timestamp = thisEntry[4u].asInt();
newEntry.timeOffset = thisEntry[5u].asInt();
newEntry.wait = thisEntry[6u].asInt();
@ -890,6 +909,16 @@ namespace Mist{
pidMappingR[key] = val;
pidMapping[val] = key;
}
if (M.inputLocalVars.isMember("parsedSegments")){
jsonForEachConst(M.inputLocalVars["parsedSegments"], i){
uint64_t key = JSON::Value(i.key()).asInt();
uint64_t val = i->asInt();
parsedSegments[key] = val;
playlistMapping[key].lastFileIndex = val;
INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val);
}
}
// Set bootMsOffset in order to display the program time correctly in the player
zUTC = M.inputLocalVars["zUTC"].asInt();
meta.setUTCOffset(zUTC);
@ -1004,9 +1033,15 @@ namespace Mist{
// set bootMsOffset in order to display the program time correctly in the player
meta.setUTCOffset(zUTC);
if (M.getLive()){meta.setBootMsOffset(streamOffset);}
if (streamIsLive || isLiveDVR){return true;}
if (streamIsLive && !isLiveDVR){return true;}
// Set local vars used for parsing existing headers
injectLocalVars();
return true;
}
/// Sets inputLocalVars based on data ingested
void inputHLS::injectLocalVars(){
meta.inputLocalVars.null();
meta.inputLocalVars["version"] = 4;
// Write playlist entry info
@ -1029,6 +1064,7 @@ namespace Mist{
thisPlaylist.append(thisEntries);
}
allEntries[JSON::Value(pListIt->first).asString()] = thisPlaylist;
meta.inputLocalVars["parsedSegments"][JSON::Value(pListIt->first).asString()] = pListIt->second.size();
}
meta.inputLocalVars["playlist_urls"] = playlist_urls;
meta.inputLocalVars["playlistEntries"] = allEntries;
@ -1041,7 +1077,6 @@ namespace Mist{
thisMappingsR[JSON::Value(pidIt->first).asString()] = pidIt->second;
}
meta.inputLocalVars["pidMappingR"] = thisMappingsR;
return true;
}
bool inputHLS::needsLock(){
@ -1151,21 +1186,23 @@ namespace Mist{
return;
}
uint64_t maxTime = Util::bootMS() + 500;
// Update all playlists to make sure listEntries contains all live segments
for (std::map<uint64_t, Playlist>::iterator pListIt = playlistMapping.begin();
pListIt != playlistMapping.end(); pListIt++){
if (pListIt->second.reloadNext < Util::bootSecs()){
pListIt->second.reload();
}
}
HIGH_MSG("Current playlist has parsed %zu/%" PRIu64 " entries", listEntries[currentPlaylist].size(), parsedSegments[currentPlaylist]);
for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){
MEDIUM_MSG("Adding entry #%" PRIu64 " as live data", entryIt);
if (parseSegmentAsLive(entryIt)){
parsedSegments[currentPlaylist]++;
}else{
break;
currentPlaylist = pListIt->first;
if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist]){
INFO_MSG("Current playlist has parsed %" PRIu64 "/%zu entries", parsedSegments[currentPlaylist], listEntries[currentPlaylist].size());
}
for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){
INFO_MSG("Adding entry #%" PRIu64 " as live data", entryIt+1);
if (parseSegmentAsLive(entryIt)){
parsedSegments[currentPlaylist]++;
}
if (Util::bootMS() > maxTime){return;}
}
}
}
@ -1449,11 +1486,6 @@ namespace Mist{
/// \brief Sets parsedSegments for all playlists, specifying how many segments
/// have already been parsed. Additional segments can then be parsed as live data
void inputHLS::setParsedSegments(){
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
parsedSegments[pListIt->first] = pListIt->second.size();
INFO_MSG("Playlist %" PRIu32 " already contains %" PRIu64 " VOD segments", pListIt->first, parsedSegments[pListIt->first]);
}
}
/// Parses the main playlist, possibly containing variants.
@ -1736,4 +1768,19 @@ namespace Mist{
return tmpId;
}
void inputHLS::finish(){
if (isLiveDVR){
INFO_MSG("Writing updated header to disk");
injectLocalVars();
M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl());
}
Input::finish();
}
void inputHLS::checkHeaderTimes(const HTTP::URL & streamFile){
if (isLiveDVR){return;}
Input::checkHeaderTimes(streamFile);
}
}// namespace Mist

View file

@ -181,6 +181,9 @@ namespace Mist{
uint64_t getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC = 0);
uint64_t getPacketID(uint64_t currentPlaylist, uint64_t trackId);
size_t getEntryId(uint32_t playlistId, uint64_t bytePos);
virtual void finish();
void injectLocalVars();
virtual void checkHeaderTimes(const HTTP::URL & streamFile);
};
}// namespace Mist

View file

@ -127,17 +127,19 @@ namespace Mist{
/// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function.
///\param tid The trackid to remove the page from
///\param pageNumber The number of the page to remove
void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber){
void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx){
if (!standAlone){// A different process will handle this for us
return;
}
Util::RelAccX &tPages = meta.pages(idx);
Util::RelAccXFieldData firstKey = tPages.getFieldData("firstkey");
uint32_t pageIdx = INVALID_KEY_NUM;
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt("firstkey", i) == pageNumber){
pageIdx = i;
break;
if (pageIdx == INVALID_KEY_NUM){
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
if (tPages.getInt(firstKey, i) == pageNumber){
pageIdx = i;
break;
}
}
}
// If the given pagenumber is not a valid page on this track, do nothing

View file

@ -23,7 +23,7 @@ namespace Mist{
void bufferFinalize(size_t idx, IPC::sharedPage & page);
void liveFinalize(size_t idx);
bool isCurrentLivePage(size_t idx, uint32_t pageNumber);
void bufferRemove(size_t idx, uint32_t pageNumber);
void bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx = INVALID_KEY_NUM);
void bufferLivePacket(const DTSC::Packet &packet);
void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,