From 19d7c9fe0724ae5dfc30d5851d0b4d88439fdea0 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Thu, 28 Oct 2021 15:53:53 +0200 Subject: [PATCH] Added HLS DVR mode Moved some duplicate code to seperate functions for readability Fix EXT-X-PROGRAM-DATE-TIME tag for VoD Set bootMSoffset for live DVR streams Implemented readExistingHeader for HLS input set zUTC time based on EXT-X-PROGRAM-DATE-TIME tag rather than guessing --- lib/url.cpp | 9 + lib/url.h | 1 + src/input/input_hls.cpp | 584 +++++++++++++++++++++++++++++----------- src/input/input_hls.h | 22 +- 4 files changed, 459 insertions(+), 157 deletions(-) diff --git a/lib/url.cpp b/lib/url.cpp index 09da6f10..785c1821 100644 --- a/lib/url.cpp +++ b/lib/url.cpp @@ -196,6 +196,15 @@ std::string HTTP::URL::getFilePath() const{ return "/" + path; } +/// Returns whether the URL is probably pointing to a local file +bool HTTP::URL::isLocalPath() const{ + // If we have no host, protocol or port we can assume it is a local path + if (host.size() || protocol.size() || port.size()){ + return false; + } + return true; +} + /// Returns the URL in string format without auth and frag std::string HTTP::URL::getProxyUrl() const{ std::string ret; diff --git a/lib/url.h b/lib/url.h index 98df3ece..1cb23b0a 100644 --- a/lib/url.h +++ b/lib/url.h @@ -20,6 +20,7 @@ namespace HTTP{ std::string getFilePath() const; std::string getBareUrl() const; std::string getProxyUrl() const; + bool isLocalPath() const; std::string host; ///< Hostname or IP address of URL std::string protocol; ///< Protocol of URL std::string port; ///< Port of URL diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 8faad7d1..82090678 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -100,7 +100,10 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){ } namespace Mist{ - + // Save playlist objects for manual reloading + static std::map playlistMapping; + // Track which segment numbers have been parsed + std::map parsedSegments; /// Mutex for accesses to listEntries tthread::mutex entryMutex; @@ -145,8 +148,11 @@ namespace Mist{ } pls.reload(); + playlistMapping[plsTotalCount] = pls; plsInitCount++; - if (initOnly){return;}// Exit because init-only mode + if (initOnly){ + return; + }// Exit because init-only mode while (self->config->is_active){ // If the timer has not expired yet, sleep up to a second. Otherwise, reload. @@ -172,7 +178,12 @@ namespace Mist{ noChangeCount = 0; lastTimestamp = 0; root = HTTP::URL(uriSrc); - uri = root.getUrl(); + if (root.isLocalPath()){ + uri = root.getFilePath(); + } + else{ + uri = root.getUrl(); + } memset(keyAES, 0, 16); startTime = Util::bootSecs(); reloadNext = 0; @@ -378,7 +389,9 @@ namespace Mist{ std::istream &input = (isUrl() ? (std::istream &)urlSource : (std::istream &)fileSource); std::getline(input, line); + DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); while (std::getline(input, line)){ + DONTEVEN_MSG("Parsing line '%s'", line.c_str()); cleanLine(line); if (line.empty()){continue;}// skip empty lines @@ -422,7 +435,9 @@ namespace Mist{ if (waitTime < 2){waitTime = 2;} } - if (key == "MEDIA-SEQUENCE"){fileNo = atoll(val.c_str());} + if (key == "MEDIA-SEQUENCE"){ + fileNo = atoll(val.c_str()); + } if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);} if (key == "PLAYLIST-TYPE"){ @@ -452,6 +467,7 @@ namespace Mist{ std::getline(input, filename); // check for already added segments + DONTEVEN_MSG("Current file has index #%zu, last index was #%zu", fileNo, lastFileIndex); if (fileNo >= lastFileIndex){ cleanLine(filename); filename = root.link(filename).getUrl(); @@ -540,9 +556,8 @@ namespace Mist{ // Note: This method requires never removing playlists, only adding. // The mutex assures we have a unique count/number. if (!id){id = listEntries.size() + 1;} + HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id); listEntries[id].push_back(entry); - DONTEVEN_MSG("Added segment to variant %" PRIu32 " (#%" PRIu64 ", now %zu queued): %s", id, - lastFileIndex, listEntries[id].size(), filename.c_str()); } } @@ -553,6 +568,13 @@ namespace Mist{ streamIsLive = false; globalWaitTime = 0; currentPlaylist = 0; + streamOffset = 0; + + pidCounter = 1; + + isLiveDVR = false; + previousSegmentIndex = -1; + currentIndex = 0; capa["name"] = "HLS"; capa["desc"] = "This input allows you to both play Video on Demand and live HLS streams stored " @@ -565,6 +587,7 @@ namespace Mist{ capa["source_match"].append("https://*.m3u"); capa["source_match"].append("https-hls://*"); capa["source_match"].append("http-hls://*"); + // All URLs can be set to always-on mode. capa["always_match"] = capa["source_match"]; @@ -583,12 +606,21 @@ namespace Mist{ bool inputHLS::checkArguments(){ config->is_active = true; - if (config->getString("input") == "-"){return false;} - HTTP::URL mainPls(config->getString("input")); - if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){ + if (config->getString("input") == "-"){ return false; } + if (!initPlaylist(config->getString("input"), false)){return false;} + + // If the playlist is of event type, init the amount of segments in the playlist + if (isLiveDVR){ + // Set the previousSegmentIndex by quickly going through the existing PLS files + setParsedSegments(); + meta.setLive(true); + meta.setVod(true); + streamIsLive = true; + } + return true; } @@ -600,14 +632,14 @@ namespace Mist{ meta.reInit(config->getString("streamname"), false); INFO_MSG("Parsing live stream to create header..."); TS::Packet packet; // to analyse and extract data - int counter = 1; + int pidCounter = 1; tthread::lock_guard guard(entryMutex); for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); pListIt++){ // Skip empty playlists if (!pListIt->second.size()){continue;} - int preCounter = counter; + int prepidCounter = pidCounter; tsStream.clear(); for (std::deque::iterator entryIt = pListIt->second.begin(); @@ -632,16 +664,7 @@ namespace Mist{ DTSC::Packet headerPack; tsStream.getEarliestPacket(headerPack); int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId]; - - if (packetId == 0){ - pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); - packetId = counter; - VERYHIGH_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(), - headerPack.getTrackId(), counter); - counter++; - } + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); size_t idx = M.trackIDToIndex(packetId, getpid()); if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){ @@ -656,31 +679,85 @@ namespace Mist{ break; // we have all tracks discovered, next playlist! } }while (!segDowner.atEnd()); - if (preCounter < counter){break;}// We're done reading this playlist! + if (prepidCounter < pidCounter){break;}// We're done reading this playlist! } } tsStream.clear(); currentPlaylist = 0; segDowner.close(); // make sure we have nothing left over - INFO_MSG("header complete, beginning live ingest of %d tracks", counter - 1); + INFO_MSG("header complete, beginning live ingest of %d tracks", pidCounter - 1); + } + + bool inputHLS::readExistingHeader(){ + if (!Input::readExistingHeader()){return false;} + if (!M.inputLocalVars.isMember("version") || M.inputLocalVars["version"].asInt() < 2){ + INFO_MSG("Header needs update, regenerating"); + return false; + } + // Vars for parsing TS packets + TS::Packet packet; + bool hasPacket; + + // Set internal variables based on existing header file + tthread::lock_guard guard(entryMutex); + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); pListIt++){ + tsStream.clear(); + uint32_t entId = 0; + // For each entry in the playlist, we need to parse the earliest packet in order to set the segment offset + for (std::deque::iterator entryIt = pListIt->second.begin(); + entryIt != pListIt->second.end(); entryIt++){ + tsStream.partialClear(); + + if (!segDowner.loadSegment(*entryIt)){ + FAIL_MSG("Failed to load segment - skipping to next"); + continue; + } + // Flag to allow getPacketTime to set the offset + entId++; + allowRemap = true; + while (!segDowner.atEnd()){ + hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); + if (hasPacket){ + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + while (headerPack){ + size_t tmpTrackId = headerPack.getTrackId(); + // Call getPacketID in order to set pidmapping + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + // Call getPacketTime in order to set segment offset + uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); + VERYHIGH_MSG("Parsed earliest TS packet with id '%lu' @ '%lu ms' for TS segment with index '%u'", packetId, packetTime, entId - 1); + // Keep parsing until we have called getPacketID for each track + tsStream.getEarliestPacket(headerPack); + } + // If we do not have a packet on each track, read the next TS packet + }else if (segDowner.readNext()){ + packet.FromPointer(segDowner.packetPtr); + tsStream.parse(packet, entId); + } + } + // Finally save the offset as part of the TS segment. This is required for bufferframe + // to work correctly, since not every segment might have an UTC timestamp tag + std::deque &curList = listEntries[pListIt->first]; + VERYHIGH_MSG("Saving offset of '%" PRId64 "' to current TS segment", plsTimeOffset[pListIt->first]); + curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first]; + } + } + tsStream.clear(); + // set bootMsOffset in order to display the program time correctly in the player + meta.setBootMsOffset(streamOffset); + return true; } bool inputHLS::readHeader(){ - if (streamIsLive){return true;} - - bool hasHeader = false; - - // See whether a separate header file exists. - meta.reInit(config->getString("streamname"), config->getString("input") + ".dtsh"); - hasHeader = (bool)M; - - if (!hasHeader){meta.reInit(config->getString("streamname"), true);} - - TS::Packet packet; // to analyse and extract data - + if (streamIsLive && !isLiveDVR){return true;} + // to analyse and extract data + TS::Packet packet; char *data; size_t dataLen; - int counter = 1; + bool hasPacket = false; + meta.reInit(config->getString("streamname"), true); tthread::lock_guard guard(entryMutex); for (std::map >::iterator pListIt = listEntries.begin(); @@ -697,41 +774,33 @@ namespace Mist{ continue; } entId++; + allowRemap = true; while (!segDowner.atEnd()){ - - while (tsStream.hasPacketOnEachTrack()){ + // Wait for packets on each track to make sure the offset is set based on the earliest packet + hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); + if (hasPacket){ DTSC::Packet headerPack; tsStream.getEarliestPacket(headerPack); - - size_t tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId]; - - if (packetId == 0){ - pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); - packetId = counter; - INFO_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(), - headerPack.getTrackId(), counter); - counter++; - } - - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (!hasHeader && (idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } - - if (!hasHeader){ + while (headerPack){ + size_t tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + tsStream.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } headerPack.getString("data", data, dataLen); // keyframe data exists, so always add 19 bytes keyframedata. uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - meta.update(headerPack.getTime(), packOffset, idx, dataLen, entId, - headerPack.hasMember("keyframe"), packSendSize); + VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize); + tsStream.getEarliestPacket(headerPack); } } - + // No packets available, so read the next TS packet if available if (segDowner.readNext()){ packet.FromPointer(segDowner.packetPtr); tsStream.parse(packet, entId); @@ -743,39 +812,37 @@ namespace Mist{ tsStream.getEarliestPacket(headerPack); while (headerPack){ int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId]; - - if (packetId == 0){ - pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); - packetId = counter; - INFO_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(), - headerPack.getTrackId(), counter); - counter++; - } - + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); size_t idx = M.trackIDToIndex(packetId, getpid()); - if (!hasHeader && (idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){ + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ tsStream.initializeMetadata(meta, tmpTrackId, packetId); idx = M.trackIDToIndex(packetId, getpid()); } - if (!hasHeader){ - headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - meta.update(headerPack.getTime(), packOffset, idx, dataLen, entId, - headerPack.hasMember("keyframe"), packSendSize); - } + headerPack.getString("data", data, dataLen); + // keyframe data exists, so always add 19 bytes keyframedata. + long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; + VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %llu on track %zu", dataLen, packetTime, packOffset, idx); + meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize); tsStream.getEarliestPacket(headerPack); } - - if (hasHeader){break;} + // Finally save the offset as part of the TS segment. This is required for bufferframe + // to work correctly, since not every segment might have an UTC timestamp tag + std::deque &curList = listEntries[pListIt->first]; + INFO_MSG("Saving offset of '%" PRId64 "' to current TS segment", plsTimeOffset[pListIt->first]); + curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first]; } } - if (streamIsLive){return true;} + // set bootMsOffset in order to display the program time correctly in the player + meta.setBootMsOffset(streamOffset); + + if (streamIsLive || isLiveDVR){return true;} + + // Set local vars used for parsing existing headers + meta.inputLocalVars["version"] = 2; INFO_MSG("write header file..."); M.toFile((config->getString("input") + ".dtsh").c_str()); @@ -783,7 +850,127 @@ namespace Mist{ return true; } - bool inputHLS::needsLock(){return !streamIsLive;} + bool inputHLS::needsLock(){ + if (isLiveDVR){ + return true; + } + return !streamIsLive; + } + + /// \brief Parses new segments added to playlist files as live data + /// \param segmentIndex: the index of the segment in the current playlist + /// \return True if the segment has been buffered successfully + bool inputHLS::parseSegmentAsLive(uint64_t segmentIndex){ + bool hasOffset = false; + bool hasPacket = false; + // Keep our own variables to make sure buffering live data does not interfere with VoD pages loading + TS::Packet packet; + TS::Stream tsStream; + char *data; + size_t dataLen; + // Get the updated list of entries + std::deque &curList = listEntries[currentPlaylist]; + if (curList.size() <= segmentIndex){ + FAIL_MSG("Tried to load segment with index '%lu', but the playlist only contains '%zu' entries!", segmentIndex, curList.size()); + return false; + } + if (!segDowner.loadSegment(curList.at(segmentIndex))){ + FAIL_MSG("Failed to load segment"); + return false; + } + + while (!segDowner.atEnd()){ + // Wait for packets on each track to make sure the offset is set based on the earliest packet + hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); + if (hasPacket){ + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + while (headerPack){ + size_t tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); + uint64_t packetTime = headerPack.getTime(); + // Set segment offset and save it + if (!hasOffset && curList.at(segmentIndex).mUTC){ + hasOffset = true; + DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; + MEDIUM_MSG("Setting current live segment time offset to '%ld'", DVRTimeOffsets[currentPlaylist]); + curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; + } + if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ + hasOffset = true; + packetTime += DVRTimeOffsets[currentPlaylist]; + HIGH_MSG("Adjusting current packet timestamp '%ld' -> '%ld'", headerPack.getTime(), packetTime); + } + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + tsStream.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } + + headerPack.getString("data", data, dataLen); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe")); + tsStream.getEarliestPacket(headerPack); + } + } + // No packets available, so read the next TS packet if available + if (segDowner.readNext()){ + packet.FromPointer(segDowner.packetPtr); + tsStream.parse(packet, segmentIndex + 1); + } + } + // get last packets + tsStream.finish(); + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + while (headerPack){ + int tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); + uint64_t packetTime = headerPack.getTime(); + if (DVRTimeOffsets.count(currentPlaylist)){ + packetTime += DVRTimeOffsets[currentPlaylist]; + VERYHIGH_MSG("Adjusting current packet timestamp '%ld' -> '%ld'", headerPack.getTime(), packetTime); + } + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + tsStream.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } + + headerPack.getString("data", data, dataLen); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe")); + tsStream.getEarliestPacket(headerPack); + } + return true; + } + + /// \brief Override userLeadOut to buffer new data as live packets + void inputHLS::userLeadOut(){ + if (!isLiveDVR){ + return; + } + + // Update all playlists to make sure listEntries contains all live segments + for (std::map::iterator pListIt = playlistMapping.begin(); + pListIt != playlistMapping.end(); pListIt++){ + pListIt->second.reload(); + } + + HIGH_MSG("Current playlist has parsed %lu/%lu entries", listEntries[currentPlaylist].size(), parsedSegments[currentPlaylist]); + for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){ + MEDIUM_MSG("Adding entry #%lu as live data", entryIt); + if (parseSegmentAsLive(entryIt)){ + parsedSegments[currentPlaylist]++; + }else{ + break; + } + } + } bool inputHLS::openStreamSource(){return true;} @@ -793,7 +980,6 @@ namespace Mist{ bool finished = false; thisPacket.null(); while (config->is_active && (needsLock() || keepAlive())){ - // Check if we have a packet bool hasPacket = false; if (idx == INVALID_TRACK_ID){ @@ -813,70 +999,26 @@ namespace Mist{ continue; } }else{ - tsStream.getPacket(getMappedTrackId(M.getID(idx)), thisPacket); + tid = getMappedTrackId(M.getID(idx)); + tsStream.getPacket(tid, thisPacket); } if (!thisPacket){ FAIL_MSG("Could not getNext TS packet!"); return; } - uint64_t newTime = thisPacket.getTime(); - - // Apply offset if any was set - if (plsTimeOffset.count(currentPlaylist)){newTime += plsTimeOffset[currentPlaylist];} - - if (zUTC){ - if (allowSoftRemap && thisPacket.getTime() < 1000){allowSoftRemap = false;} - // UTC based timestamp offsets - if ((allowRemap || allowSoftRemap) && nUTC){ - allowRemap = false; - allowSoftRemap = !thisPacket.getTime(); - int64_t prevOffset = plsTimeOffset[currentPlaylist]; - plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - thisPacket.getTime(); - newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist]; - INFO_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu32 "@%" PRIu64 - "ms -> %" PRIu64 "ms", - prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime); - } - }else{ - // Non-UTC based - if (plsLastTime.count(currentPlaylist)){ - if (plsInterval.count(currentPlaylist)){ - if (allowRemap && (newTime < plsLastTime[currentPlaylist] || - newTime > plsLastTime[currentPlaylist] + plsInterval[currentPlaylist] * 60)){ - allowRemap = false; - // time difference too great, change offset to correct for it - int64_t prevOffset = plsTimeOffset[currentPlaylist]; - plsTimeOffset[currentPlaylist] += - (int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime; - newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist]; - INFO_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu32 "@%" PRIu64 - "ms -> %" PRIu64 "ms", - prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime); - } - } - // check if time increased, and no increase yet or is less than current, set new interval - if (newTime > plsLastTime[currentPlaylist] && - (!plsInterval.count(currentPlaylist) || - newTime - plsLastTime[currentPlaylist] < plsInterval[currentPlaylist])){ - plsInterval[currentPlaylist] = newTime - plsLastTime[currentPlaylist]; - } - } - // store last time for interval/offset calculations - plsLastTime[tid] = newTime; - } - - DONTEVEN_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), newTime); + uint64_t packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); + HIGH_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); // overwrite trackId on success Bit::htobl(thisPacket.getData() + 8, tid); - Bit::htobll(thisPacket.getData() + 12, newTime); + Bit::htobll(thisPacket.getData() + 12, packetTime); return; // Success! } // No? Let's read some more data and check again. if (!segDowner.atEnd() && segDowner.readNext()){ tsBuf.FromPointer(segDowner.packetPtr); - tsStream.parse(tsBuf, streamIsLive ? 0 : currentIndex); + tsStream.parse(tsBuf, streamIsLive && !isLiveDVR ? 0 : currentIndex + 1); continue; // check again } @@ -898,7 +1040,6 @@ namespace Mist{ } if (currentPlaylist == 0){ VERYHIGH_MSG("Waiting for segments..."); - keepAlive(); Util::wait(500); continue; } @@ -939,7 +1080,11 @@ namespace Mist{ DTSC::Keys keys(M.keys(idx)); for (size_t i = keys.getFirstValid(); i < keys.getEndValid(); i++){ - if (keys.getTime(i) > seekTime){break;} + if (keys.getTime(i) > seekTime){ + VERYHIGH_MSG("Found elapsed key with a time of %lu ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1); + break; + } + VERYHIGH_MSG("Found valid key with a time of %lu ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1); plistEntry = keys.getBpos(i); } @@ -966,6 +1111,11 @@ namespace Mist{ } playListEntries &entry = curPlaylist.at(currentIndex); segDowner.loadSegment(entry); + // If we have an offset, load it + if (entry.timeOffset){ + HIGH_MSG("Setting time offset of this TS segment to '%ld'", entry.timeOffset); + plsTimeOffset[currentPlaylist] = entry.timeOffset; + } } HIGH_MSG("readPMT()"); @@ -973,8 +1123,88 @@ namespace Mist{ while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){ if (!segDowner.readNext()){break;} tsBuffer.FromPointer(segDowner.packetPtr); - tsStream.parse(tsBuffer, 0); + tsStream.parse(tsBuffer, streamIsLive && !isLiveDVR ? 0 : plistEntry); + } + } + + /// \brief Applies any offset to the packets original timestamp + /// \param packetTime: the original timestamp of the packet + /// \param tid: the trackid corresponding to this track and playlist + /// \param currentPlaylist: the ID of the playlist we are currently trying to parse + /// \param nUTC: Defaults to 0. If larger than 0, sync the timestamp based on this value and zUTC + /// \return the (modified) packetTime, used for meta.updates and buffering packets + uint64_t inputHLS::getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC){ + INSANE_MSG("Calculating adjusted packet time for track '%lu' on playlist '%lu' with current timestamp '%lu'. UTC timestamp is '%lu'", tid, currentPlaylist, packetTime, nUTC); + uint64_t newTime = packetTime; + + // UTC based timestamp offsets + if (zUTC){ + // Overwrite offset if we have an UTC timestamp + if (allowRemap && nUTC){ + allowRemap = false; + int64_t prevOffset = plsTimeOffset[currentPlaylist]; + plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - packetTime; + newTime = packetTime + plsTimeOffset[currentPlaylist]; + MEDIUM_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu64 "@%" PRIu64 + "ms -> %" PRIu64 "ms", prevOffset, plsTimeOffset[currentPlaylist], tid, packetTime, newTime); + }else if (plsTimeOffset.count(currentPlaylist)){ + // Prevent integer overflow for large negative offsets, which can happen + // when the first time of another track is lower that the firsttime + if (plsTimeOffset[currentPlaylist] + int64_t(newTime) < 0){ + newTime = 0; + FAIL_MSG("Time offset is too negative causing an integer overflow. Setting current packet time to 0."); + }else{ + VERYHIGH_MSG("Adjusting timestamp %lu -> %lu (offset is %ld)", newTime, newTime + plsTimeOffset[currentPlaylist], plsTimeOffset[currentPlaylist]); + newTime += plsTimeOffset[currentPlaylist]; + } + } + // Non-UTC based + }else{ + // Apply offset if any was set + if (plsTimeOffset.count(currentPlaylist)){ + VERYHIGH_MSG("Adjusting timestamp %lu -> %lu (offset is %ld)", newTime, newTime + plsTimeOffset[currentPlaylist], plsTimeOffset[currentPlaylist]); + newTime += plsTimeOffset[currentPlaylist]; + } + if (plsLastTime.count(currentPlaylist)){ + if (plsInterval.count(currentPlaylist)){ + if (allowRemap && (newTime < plsLastTime[currentPlaylist] || + newTime > plsLastTime[currentPlaylist] + plsInterval[currentPlaylist] * 60)){ + allowRemap = false; + // time difference too great, change offset to correct for it + int64_t prevOffset = plsTimeOffset[currentPlaylist]; + plsTimeOffset[currentPlaylist] += + (int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime; + newTime = packetTime + plsTimeOffset[currentPlaylist]; + MEDIUM_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu64 "@%" PRIu64 + "ms -> %" PRIu64 "ms", + prevOffset, plsTimeOffset[currentPlaylist], tid, packetTime, newTime); + } + } + // check if time increased, and no increase yet or is less than current, set new interval + if (newTime > plsLastTime[currentPlaylist] && + (!plsInterval.count(currentPlaylist) || + newTime - plsLastTime[currentPlaylist] < plsInterval[currentPlaylist])){ + plsInterval[currentPlaylist] = newTime - plsLastTime[currentPlaylist]; + } + } + // store last time for interval/offset calculations + plsLastTime[tid] = newTime; } + return newTime; + } + + /// \brief Returns the packet ID corresponding to this playlist and track + /// \param trackId: the trackid corresponding to this track and playlist + /// \param currentPlaylist: the ID of the playlist we are currently trying to parse + uint64_t inputHLS::getPacketID(uint64_t currentPlaylist, uint64_t trackId){ + uint64_t packetId = pidMapping[(((uint64_t)currentPlaylist) << 32) + trackId]; + if (packetId == 0){ + pidMapping[(((uint64_t)currentPlaylist) << 32) + trackId] = pidCounter; + pidMappingR[pidCounter] = (((uint64_t)currentPlaylist) << 32) + trackId; + packetId = pidCounter; + pidCounter++; + } + return packetId; } size_t inputHLS::getEntryId(uint32_t playlistId, uint64_t bytePos){ @@ -1014,8 +1244,21 @@ namespace Mist{ return lastOut; } + /// \brief Sets parsedSegments for all playlists, specifying how many segments + /// have already been parsed. Additional segments can then be parsed as live data + void inputHLS::setParsedSegments(){ + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); pListIt++){ + parsedSegments[pListIt->first] = pListIt->second.size(); + INFO_MSG("Playlist %u already contains %li VOD segments", pListIt->first, parsedSegments[pListIt->first]); + } + } + /// Parses the main playlist, possibly containing variants. bool inputHLS::initPlaylist(const std::string &uri, bool fullInit){ + // Used to set zUTC, in case the first EXT-X-PROGRAM-DATE-TIME does not appear before the first segment + float timestampSum = 0; + bool isRegularPls = false; plsInitCount = 0; plsTotalCount = 0; { @@ -1025,8 +1268,9 @@ namespace Mist{ std::string line; bool ret = false; startTime = Util::bootSecs(); + std::string playlistLocation = uri; - HTTP::URL playlistRootPath(uri); + HTTP::URL playlistRootPath(playlistLocation); // Convert custom http(s)-hls protocols into regular notation. if (playlistRootPath.protocol == "http-hls"){playlistRootPath.protocol = "http";} if (playlistRootPath.protocol == "https-hls"){playlistRootPath.protocol = "https";} @@ -1034,7 +1278,7 @@ namespace Mist{ std::istringstream urlSource; std::ifstream fileSource; - bool isUrl = (uri.find("://") != std::string::npos); + bool isUrl = (playlistLocation.find("://") != std::string::npos); if (isUrl){ INFO_MSG("Downloading main playlist file from '%s'", uri.c_str()); HTTP::Downloader plsDL; @@ -1047,16 +1291,16 @@ namespace Mist{ urlSource.str(plsDL.data()); }else{ // If we're not a URL and there is no / at the start, ensure we get the full absolute path. - if (uri[0] != '/'){ - char *rp = realpath(uri.c_str(), 0); + if (playlistLocation[0] != '/'){ + char *rp = realpath(playlistLocation.c_str(), 0); if (rp){ playlistRootPath = HTTP::URL((std::string)rp); free(rp); } } - fileSource.open(uri.c_str()); + fileSource.open(playlistLocation.c_str()); if (!fileSource.good()){ - FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), uri.c_str()); + FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), playlistLocation.c_str()); } } @@ -1069,6 +1313,9 @@ namespace Mist{ // skip empty lines in the playlist continue; } + if (line.compare(0, 26, "#EXT-X-PLAYLIST-TYPE:EVENT") == 0){ + isLiveDVR = true; + } if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){ // this is a variant playlist file.. next line is an uri to a playlist // file @@ -1133,15 +1380,30 @@ namespace Mist{ } }else if (line.compare(0, 7, "#EXTINF") == 0){ - // current file is not a variant playlist, but regular playlist. - ret = readPlaylist(playlistRootPath.getUrl(), fullInit); - break; + // Read as regular playlist after we are done checking for UTC timestamps + isRegularPls = true; + // Sum the duration to make sure we set zUTC time right + float f = atof(line.c_str() + 8); + timestampSum += f * 1000; + }else if (line.compare(0, 24, "#EXT-X-PROGRAM-DATE-TIME") == 0 && !zUTC){ + // Init UTC variables used to rewrite packet timestamps + size_t pos = line.find(":"); + std::string val = line.c_str() + pos + 1; + zUTC = ISO8601toUnixmillis(val) - uint64_t(timestampSum); + nUTC = zUTC; + INFO_MSG("Setting program unix start time to '%s' (%" PRIu64 ")", line.substr(pos + 1).c_str(), zUTC); + // store offset so that we can set it after reading the header + streamOffset = zUTC - (Util::unixMS() - Util::bootMS()); }else{ // ignore wrong lines VERYHIGH_MSG("ignore wrong line: %s", line.c_str()); } } + if (isRegularPls){ + ret = readPlaylist(playlistRootPath.getUrl(), fullInit); + } + if (!isUrl){fileSource.close();} uint32_t maxWait = 0; @@ -1164,7 +1426,15 @@ namespace Mist{ /// Function for reading every playlist. bool inputHLS::readPlaylist(const HTTP::URL &uri, bool fullInit){ - std::string urlBuffer = (fullInit ? "" : ";") + uri.getUrl(); + std::string urlBuffer; + // Wildcard streams can have a ' ' in the name, which getUrl converts to a '+' + if (uri.isLocalPath()){ + urlBuffer = (fullInit ? "" : ";") + uri.getFilePath(); + } + else{ + urlBuffer = (fullInit ? "" : ";") + uri.getUrl(); + } + INFO_MSG("Adding playlist(s): %s", urlBuffer.c_str()); tthread::thread runList(playlistRunner, (void *)urlBuffer.data()); runList.detach(); // Abandon the thread, it's now running independently uint32_t timeout = 0; @@ -1183,11 +1453,12 @@ namespace Mist{ { tthread::lock_guard guard(entryMutex); std::deque &curList = listEntries[currentPlaylist]; + INSANE_MSG("Current playlist contains %li entries. Current index is %li in playlist %li", curList.size(), currentIndex, currentPlaylist); if (!curList.size()){ WARN_MSG("no entries found in playlist: %" PRIu64 "!", currentPlaylist); return false; } - if (!streamIsLive){ + if (!streamIsLive || isLiveDVR){ // VoD advances the index by one and attempts to read // The playlist is not altered in this case, since we may need to seek back later currentIndex++; @@ -1220,11 +1491,14 @@ namespace Mist{ ERROR_MSG("Could not download segment: %s", ntry.filename.c_str()); return readNextFile(); // Attempt to read another, if possible. } - nUTC = ntry.mUTC; - // If we don't have a zero-time yet, guess an hour before this UTC time is probably fine - if (nUTC && !zUTC){zUTC = nUTC - 3600000;} - allowRemap = true; - allowSoftRemap = false; + // If we have an offset, load it + if (ntry.timeOffset){ + plsTimeOffset[currentPlaylist] = ntry.timeOffset; + // Else allow of the offset to be set by getPacketTime + }else{ + nUTC = ntry.mUTC; + allowRemap = true; + } return true; } diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 366a126c..265247bd 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -28,6 +28,7 @@ namespace Mist{ uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known float duration; uint64_t timestamp; + int64_t timeOffset; uint64_t wait; char ivec[16]; char keyAES[16]; @@ -98,6 +99,7 @@ namespace Mist{ protected: uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis + int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header unsigned int startTime; PlaylistType playlistType; SegmentDownloader segDowner; @@ -107,10 +109,10 @@ namespace Mist{ uint64_t currentPlaylist; bool allowRemap; ///< True if the next packet may remap the timestamps - bool allowSoftRemap; ///< True if the next packet may soft-remap the timestamps std::map pidMapping; std::map pidMappingR; std::map plsTimeOffset; + std::map DVRTimeOffsets; std::map plsLastTime; std::map plsInterval; @@ -122,13 +124,27 @@ namespace Mist{ Socket::Connection conn; TS::Packet tsBuf; + // Used to map packetId of packets in pidMapping + int pidCounter; + + /// HLS live VoD stream, set if: #EXT-X-PLAYLIST-TYPE:EVENT + bool isLiveDVR; + // Override userLeadOut to buffer new data as live packets + void userLeadOut(); + /// Tries to add as much live packets from a TS file at the given location + bool parseSegmentAsLive(uint64_t segmentIndex); + // Updates parsedSegmentIndex for all playlists + void setParsedSegments(); + // index of last playlist entry finished parsing + long previousSegmentIndex; + size_t firstSegment(); void waitForNextSegment(); void readPMT(); bool checkArguments(); bool preSetup(); bool readHeader(); - bool needHeader(){return true;} + bool readExistingHeader(); void getNext(size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); FILE *inFile; @@ -144,6 +160,8 @@ namespace Mist{ uint32_t getMappedTrackId(uint64_t id); uint32_t getMappedTrackPlaylist(uint64_t id); uint64_t getOriginalTrackId(uint32_t playlistId, uint32_t id); + uint64_t getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC = 0); + uint64_t getPacketID(uint64_t currentPlaylist, uint64_t trackId); size_t getEntryId(uint32_t playlistId, uint64_t bytePos); }; }// namespace Mist