diff --git a/src/input/input.cpp b/src/input/input.cpp index cf46c699..2fac23e6 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -1226,42 +1226,23 @@ namespace Mist{ } void Input::removeUnused(){ + uint64_t timeout = config->getInteger("pagetimeout") * 1000; uint64_t cTime = Util::bootSecs(); - std::set validTracks = M.getValidTracks(); - std::map > checkedPages; - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - Util::RelAccX &tPages = meta.pages(*it); - for (size_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ - uint64_t pageNum = tPages.getInt("firstkey", i); - checkedPages[*it].insert(pageNum); - if (pageCounter[*it].count(pageNum)){ - // If the page is still being written to, reset the counter rather than potentially unloading it - if (isCurrentLivePage(*it, pageNum)){ - pageCounter[*it][pageNum] = cTime; - continue; - } - if (cTime > pageCounter[*it][pageNum] + DEFAULT_PAGE_TIMEOUT){ - pageCounter[*it].erase(pageNum); - bufferRemove(*it, pageNum); - } - }else{ - pageCounter[*it][pageNum] = cTime; - } - } - } - //Check pages we buffered but forgot about for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ + std::set deletedEntries; for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ - if (!checkedPages.count(it->first) || !checkedPages[it->first].count(it2->first)){ - INFO_MSG("Deleting forgotten page %zu:%" PRIu32, it->first, it2->first); + if (isRecentLivePage(it->first, it2->first, timeout)){continue;} + if (cTime > it2->second + DEFAULT_PAGE_TIMEOUT){ + deletedEntries.insert(it2->first); bufferRemove(it->first, it2->first); - it->second.erase(it2); - it2 = it->second.begin(); } } + while (deletedEntries.size()){ + it->second.erase(*(deletedEntries.begin())); + deletedEntries.erase(deletedEntries.begin()); + } } - } std::string formatGUID(const std::string &val){ @@ -1509,9 +1490,9 @@ namespace Mist{ pageIdx = i; } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); + pageCounter[idx][pageNumber] = Util::bootSecs(); if (isBuffered(idx, pageNumber, meta)){ // Mark the page as still actively requested - pageCounter[idx][pageNumber] = Util::bootSecs(); DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32 ". Cancelling bufferFrame", idx, keyNum, pageNumber); @@ -1666,7 +1647,7 @@ namespace Mist{ INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, tPages.getInt("parts", pageIdx), byteCounter); pageCounter[idx].erase(pageNumber); - bufferRemove(idx, pageNumber); + bufferRemove(idx, pageNumber, pageIdx); return false; }else{ INFO_MSG("Track %zu, page %" PRIu32 " (" PRETTY_PRINT_MSTIME " - " PRETTY_PRINT_MSTIME ") buffered in %" PRIu64 "ms", diff --git a/src/input/input.h b/src/input/input.h index 57b9a0f0..5902fed7 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -74,7 +74,7 @@ namespace Mist{ virtual bool openStreamSource(){return readHeader();} virtual void closeStreamSource(){} virtual void parseStreamHeader(){} - void checkHeaderTimes(const HTTP::URL & streamFile); + virtual void checkHeaderTimes(const HTTP::URL & streamFile); virtual void removeUnused(); virtual void convert(); virtual void serve(); diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 6a6b04c1..014092e1 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -35,8 +35,15 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){ } const size_t Z = ts.find_first_of("Z+-", T); const std::string date = ts.substr(0, T); - const std::string time = ts.substr(T + 1, Z - T - 1); - const std::string zone = ts.substr(Z); + std::string time; + std::string zone; + if (Z == std::string::npos){ + WARN_MSG("HLS segment timestamp is missing timezone information! Assumed to be UTC."); + time = ts.substr(T + 1); + }else{ + time = ts.substr(T + 1, Z - T - 1); + zone = ts.substr(Z); + } unsigned long year, month, day; if (sscanf(date.c_str(), "%lu-%lu-%lu", &year, &month, &day) != 3){ ERROR_MSG("Could not parse date: %s", date.c_str()); @@ -516,6 +523,7 @@ namespace Mist{ DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); while (std::getline(input, line)){ + if (input.eof()){break;} // Skip last line, might be incomplete DONTEVEN_MSG("Parsing line '%s'", line.c_str()); cleanLine(line); if (line.empty()){continue;}// skip empty lines @@ -653,6 +661,9 @@ namespace Mist{ cleanLine(entry.filename); entry.bytePos = totalBytes; entry.duration = duration; + if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){ + DTSC::veryUglyJitterOverride = entry.duration * 1000; + } entry.mUTC = nextUTC; if (key.size() && iv.size()){ @@ -663,12 +674,12 @@ namespace Mist{ memset(entry.keyAES, 0, 16); } - if (!isUrl()){ - std::ifstream fileSource; - std::string test = root.link(entry.filename).getFilePath(); - fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); - if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));} - totalBytes += fileSource.tellg(); + if (!isUrl()){ + std::ifstream fileSource; + std::string test = root.link(entry.filename).getFilePath(); + fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); + if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));} + totalBytes += fileSource.tellg(); } entry.timestamp = lastTimestamp + startTime; @@ -747,8 +758,15 @@ namespace Mist{ // If the playlist is of event type, init the amount of segments in the playlist if (isLiveDVR){ + // Set the previousSegmentIndex by quickly going through the existing PLS files - setParsedSegments(); + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); + pListIt++){ + parsedSegments[pListIt->first] = 0; + INFO_MSG("Playlist %" PRIu32 " contains %zu segments", pListIt->first, pListIt->second.size()); + } + meta.setLive(true); meta.setVod(true); streamIsLive = true; @@ -758,69 +776,71 @@ namespace Mist{ } void inputHLS::parseStreamHeader(){ - if (!initPlaylist(config->getString("input"))){ + if (!readExistingHeader()){ + if (!initPlaylist(config->getString("input"))){ Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); - return; - } - uint64_t oldBootMsOffset = M.getBootMsOffset(); - meta.reInit(isSingular() ? streamName : "", false); - meta.setUTCOffset(zUTC); - meta.setBootMsOffset(oldBootMsOffset); - INFO_MSG("Parsing live stream to create header..."); - TS::Packet packet; // to analyse and extract data - int pidCounter = 1; + return; + } + uint64_t oldBootMsOffset = M.getBootMsOffset(); + meta.reInit(isSingular() ? streamName : "", false); + meta.setUTCOffset(zUTC); + meta.setBootMsOffset(oldBootMsOffset); + INFO_MSG("Parsing live stream to create header..."); + TS::Packet packet; // to analyse and extract data + int pidCounter = 1; - tthread::lock_guard guard(entryMutex); - for (std::map >::iterator pListIt = listEntries.begin(); - pListIt != listEntries.end(); pListIt++){ - // Skip empty playlists - if (!pListIt->second.size()){continue;} - int prepidCounter = pidCounter; - tsStream.clear(); + tthread::lock_guard guard(entryMutex); + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); pListIt++){ + // Skip empty playlists + if (!pListIt->second.size()){continue;} + int prepidCounter = pidCounter; + tsStream.clear(); - for (std::deque::iterator entryIt = pListIt->second.begin(); - entryIt != pListIt->second.end(); ++entryIt){ - keepAlive(); - if (!segDowner.loadSegment(*entryIt)){ - WARN_MSG("Skipping segment that could not be loaded in an attempt to recover"); - tsStream.clear(); - continue; - } - - do{ - if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){ - WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str()); + for (std::deque::iterator entryIt = pListIt->second.begin(); + entryIt != pListIt->second.end(); ++entryIt){ + keepAlive(); + if (!segDowner.loadSegment(*entryIt)){ + WARN_MSG("Skipping segment that could not be loaded in an attempt to recover"); tsStream.clear(); - break; // Abort load + continue; } - tsStream.parse(packet, entryIt->bytePos); - if (tsStream.hasPacketOnEachTrack()){ - while (tsStream.hasPacket()){ - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + do{ + if (!segDowner.readNext() || !packet.FromPointer(segDowner.packetPtr)){ + WARN_MSG("Could not load TS packet from %s, aborting segment parse", entryIt->filename.c_str()); + tsStream.clear(); + break; // Abort load + } + tsStream.parse(packet, entryIt->bytePos); - size_t idx = M.trackIDToIndex(packetId, getpid()); - if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - if (idx != INVALID_TRACK_ID){ - meta.setMinKeepAway(idx, globalWaitTime * 2000); - VERYHIGH_MSG("setting minKeepAway = %" PRIu32 " for track: %zu", globalWaitTime * 2000, idx); + if (tsStream.hasPacketOnEachTrack()){ + while (tsStream.hasPacket()){ + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + int tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + + size_t idx = M.trackIDToIndex(packetId, getpid()); + if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){ + tsStream.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + if (idx != INVALID_TRACK_ID){ + meta.setMinKeepAway(idx, globalWaitTime * 2000); + VERYHIGH_MSG("setting minKeepAway = %" PRIu32 " for track: %zu", globalWaitTime * 2000, idx); + } } } + break; // we have all tracks discovered, next playlist! } - break; // we have all tracks discovered, next playlist! + }while (!segDowner.atEnd()); + if (!segDowner.atEnd()){ + segDowner.close(); + tsStream.clear(); } - }while (!segDowner.atEnd()); - if (!segDowner.atEnd()){ - segDowner.close(); - tsStream.clear(); - } - if (prepidCounter < pidCounter){break;}// We're done reading this playlist! + if (prepidCounter < pidCounter){break;}// We're done reading this playlist! + } } } tsStream.clear(); @@ -839,10 +859,6 @@ namespace Mist{ return false; } // Check if the DTSH file contains all expected data - if (!M.inputLocalVars.isMember("streamoffset")){ - INFO_MSG("Header needs update as it contains no streamoffset, regenerating"); - return false; - } if (!M.inputLocalVars.isMember("playlistEntries")){ INFO_MSG("Header needs update as it contains no playlist entries, regenerating"); return false; @@ -869,6 +885,9 @@ namespace Mist{ newEntry.bytePos = thisEntry[1u].asInt(); newEntry.mUTC = thisEntry[2u].asInt(); newEntry.duration = thisEntry[3u].asDouble(); + if (newEntry.duration * 1000 > DTSC::veryUglyJitterOverride){ + DTSC::veryUglyJitterOverride = newEntry.duration * 1000; + } newEntry.timestamp = thisEntry[4u].asInt(); newEntry.timeOffset = thisEntry[5u].asInt(); newEntry.wait = thisEntry[6u].asInt(); @@ -890,6 +909,16 @@ namespace Mist{ pidMappingR[key] = val; pidMapping[val] = key; } + if (M.inputLocalVars.isMember("parsedSegments")){ + jsonForEachConst(M.inputLocalVars["parsedSegments"], i){ + uint64_t key = JSON::Value(i.key()).asInt(); + uint64_t val = i->asInt(); + parsedSegments[key] = val; + playlistMapping[key].lastFileIndex = val; + INFO_MSG("Playlist %" PRIu64 " already parsed %" PRIu64 " segments", key, val); + } + + } // Set bootMsOffset in order to display the program time correctly in the player zUTC = M.inputLocalVars["zUTC"].asInt(); meta.setUTCOffset(zUTC); @@ -1004,9 +1033,15 @@ namespace Mist{ // set bootMsOffset in order to display the program time correctly in the player meta.setUTCOffset(zUTC); if (M.getLive()){meta.setBootMsOffset(streamOffset);} - if (streamIsLive || isLiveDVR){return true;} + if (streamIsLive && !isLiveDVR){return true;} - // Set local vars used for parsing existing headers + injectLocalVars(); + return true; + } + + /// Sets inputLocalVars based on data ingested + void inputHLS::injectLocalVars(){ + meta.inputLocalVars.null(); meta.inputLocalVars["version"] = 4; // Write playlist entry info @@ -1029,6 +1064,7 @@ namespace Mist{ thisPlaylist.append(thisEntries); } allEntries[JSON::Value(pListIt->first).asString()] = thisPlaylist; + meta.inputLocalVars["parsedSegments"][JSON::Value(pListIt->first).asString()] = pListIt->second.size(); } meta.inputLocalVars["playlist_urls"] = playlist_urls; meta.inputLocalVars["playlistEntries"] = allEntries; @@ -1041,7 +1077,6 @@ namespace Mist{ thisMappingsR[JSON::Value(pidIt->first).asString()] = pidIt->second; } meta.inputLocalVars["pidMappingR"] = thisMappingsR; - return true; } bool inputHLS::needsLock(){ @@ -1151,21 +1186,23 @@ namespace Mist{ return; } + uint64_t maxTime = Util::bootMS() + 500; // Update all playlists to make sure listEntries contains all live segments for (std::map::iterator pListIt = playlistMapping.begin(); pListIt != playlistMapping.end(); pListIt++){ if (pListIt->second.reloadNext < Util::bootSecs()){ pListIt->second.reload(); } - } - - HIGH_MSG("Current playlist has parsed %zu/%" PRIu64 " entries", listEntries[currentPlaylist].size(), parsedSegments[currentPlaylist]); - for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){ - MEDIUM_MSG("Adding entry #%" PRIu64 " as live data", entryIt); - if (parseSegmentAsLive(entryIt)){ - parsedSegments[currentPlaylist]++; - }else{ - break; + currentPlaylist = pListIt->first; + if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist]){ + INFO_MSG("Current playlist has parsed %" PRIu64 "/%zu entries", parsedSegments[currentPlaylist], listEntries[currentPlaylist].size()); + } + for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){ + INFO_MSG("Adding entry #%" PRIu64 " as live data", entryIt+1); + if (parseSegmentAsLive(entryIt)){ + parsedSegments[currentPlaylist]++; + } + if (Util::bootMS() > maxTime){return;} } } } @@ -1449,11 +1486,6 @@ namespace Mist{ /// \brief Sets parsedSegments for all playlists, specifying how many segments /// have already been parsed. Additional segments can then be parsed as live data void inputHLS::setParsedSegments(){ - for (std::map >::iterator pListIt = listEntries.begin(); - pListIt != listEntries.end(); pListIt++){ - parsedSegments[pListIt->first] = pListIt->second.size(); - INFO_MSG("Playlist %" PRIu32 " already contains %" PRIu64 " VOD segments", pListIt->first, parsedSegments[pListIt->first]); - } } /// Parses the main playlist, possibly containing variants. @@ -1736,4 +1768,19 @@ namespace Mist{ return tmpId; } + void inputHLS::finish(){ + if (isLiveDVR){ + INFO_MSG("Writing updated header to disk"); + injectLocalVars(); + M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl()); + } + Input::finish(); + } + + void inputHLS::checkHeaderTimes(const HTTP::URL & streamFile){ + if (isLiveDVR){return;} + Input::checkHeaderTimes(streamFile); + } + + }// namespace Mist diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 2d799299..7d0cc334 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -181,6 +181,9 @@ namespace Mist{ uint64_t getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC = 0); uint64_t getPacketID(uint64_t currentPlaylist, uint64_t trackId); size_t getEntryId(uint32_t playlistId, uint64_t bytePos); + virtual void finish(); + void injectLocalVars(); + virtual void checkHeaderTimes(const HTTP::URL & streamFile); }; }// namespace Mist diff --git a/src/io.cpp b/src/io.cpp index 0e2cf07a..31573924 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -127,17 +127,19 @@ namespace Mist{ /// Does not do anything if the process is not standalone, in this case the master process will have an overloaded version of this function. ///\param tid The trackid to remove the page from ///\param pageNumber The number of the page to remove - void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber){ + void InOutBase::bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx){ if (!standAlone){// A different process will handle this for us return; } Util::RelAccX &tPages = meta.pages(idx); + Util::RelAccXFieldData firstKey = tPages.getFieldData("firstkey"); - uint32_t pageIdx = INVALID_KEY_NUM; - for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ - if (tPages.getInt("firstkey", i) == pageNumber){ - pageIdx = i; - break; + if (pageIdx == INVALID_KEY_NUM){ + for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){ + if (tPages.getInt(firstKey, i) == pageNumber){ + pageIdx = i; + break; + } } } // If the given pagenumber is not a valid page on this track, do nothing diff --git a/src/io.h b/src/io.h index 964ef5a0..3db01406 100644 --- a/src/io.h +++ b/src/io.h @@ -23,7 +23,7 @@ namespace Mist{ void bufferFinalize(size_t idx, IPC::sharedPage & page); void liveFinalize(size_t idx); bool isCurrentLivePage(size_t idx, uint32_t pageNumber); - void bufferRemove(size_t idx, uint32_t pageNumber); + void bufferRemove(size_t idx, uint32_t pageNumber, uint32_t pageIdx = INVALID_KEY_NUM); void bufferLivePacket(const DTSC::Packet &packet); void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,