Added HLS DVR mode

Moved some duplicate code to seperate functions for readability
Fix EXT-X-PROGRAM-DATE-TIME tag for VoD
Set bootMSoffset for live DVR streams
Implemented readExistingHeader for HLS input
set zUTC time based on EXT-X-PROGRAM-DATE-TIME tag rather than guessing
This commit is contained in:
Marco van Dijk 2021-10-28 15:53:53 +02:00 committed by Thulinma
parent 3d9ed39396
commit 19d7c9fe07
4 changed files with 459 additions and 157 deletions

View file

@ -196,6 +196,15 @@ std::string HTTP::URL::getFilePath() const{
return "/" + path;
}
/// Returns whether the URL is probably pointing to a local file
bool HTTP::URL::isLocalPath() const{
// If we have no host, protocol or port we can assume it is a local path
if (host.size() || protocol.size() || port.size()){
return false;
}
return true;
}
/// Returns the URL in string format without auth and frag
std::string HTTP::URL::getProxyUrl() const{
std::string ret;

View file

@ -20,6 +20,7 @@ namespace HTTP{
std::string getFilePath() const;
std::string getBareUrl() const;
std::string getProxyUrl() const;
bool isLocalPath() const;
std::string host; ///< Hostname or IP address of URL
std::string protocol; ///< Protocol of URL
std::string port; ///< Port of URL

View file

@ -100,7 +100,10 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){
}
namespace Mist{
// Save playlist objects for manual reloading
static std::map<uint64_t, Playlist> playlistMapping;
// Track which segment numbers have been parsed
std::map<uint64_t, uint64_t> parsedSegments;
/// Mutex for accesses to listEntries
tthread::mutex entryMutex;
@ -145,8 +148,11 @@ namespace Mist{
}
pls.reload();
playlistMapping[plsTotalCount] = pls;
plsInitCount++;
if (initOnly){return;}// Exit because init-only mode
if (initOnly){
return;
}// Exit because init-only mode
while (self->config->is_active){
// If the timer has not expired yet, sleep up to a second. Otherwise, reload.
@ -172,7 +178,12 @@ namespace Mist{
noChangeCount = 0;
lastTimestamp = 0;
root = HTTP::URL(uriSrc);
uri = root.getUrl();
if (root.isLocalPath()){
uri = root.getFilePath();
}
else{
uri = root.getUrl();
}
memset(keyAES, 0, 16);
startTime = Util::bootSecs();
reloadNext = 0;
@ -378,7 +389,9 @@ namespace Mist{
std::istream &input = (isUrl() ? (std::istream &)urlSource : (std::istream &)fileSource);
std::getline(input, line);
DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str());
while (std::getline(input, line)){
DONTEVEN_MSG("Parsing line '%s'", line.c_str());
cleanLine(line);
if (line.empty()){continue;}// skip empty lines
@ -422,7 +435,9 @@ namespace Mist{
if (waitTime < 2){waitTime = 2;}
}
if (key == "MEDIA-SEQUENCE"){fileNo = atoll(val.c_str());}
if (key == "MEDIA-SEQUENCE"){
fileNo = atoll(val.c_str());
}
if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);}
if (key == "PLAYLIST-TYPE"){
@ -452,6 +467,7 @@ namespace Mist{
std::getline(input, filename);
// check for already added segments
DONTEVEN_MSG("Current file has index #%zu, last index was #%zu", fileNo, lastFileIndex);
if (fileNo >= lastFileIndex){
cleanLine(filename);
filename = root.link(filename).getUrl();
@ -540,9 +556,8 @@ namespace Mist{
// Note: This method requires never removing playlists, only adding.
// The mutex assures we have a unique count/number.
if (!id){id = listEntries.size() + 1;}
HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id);
listEntries[id].push_back(entry);
DONTEVEN_MSG("Added segment to variant %" PRIu32 " (#%" PRIu64 ", now %zu queued): %s", id,
lastFileIndex, listEntries[id].size(), filename.c_str());
}
}
@ -553,6 +568,13 @@ namespace Mist{
streamIsLive = false;
globalWaitTime = 0;
currentPlaylist = 0;
streamOffset = 0;
pidCounter = 1;
isLiveDVR = false;
previousSegmentIndex = -1;
currentIndex = 0;
capa["name"] = "HLS";
capa["desc"] = "This input allows you to both play Video on Demand and live HLS streams stored "
@ -565,6 +587,7 @@ namespace Mist{
capa["source_match"].append("https://*.m3u");
capa["source_match"].append("https-hls://*");
capa["source_match"].append("http-hls://*");
// All URLs can be set to always-on mode.
capa["always_match"] = capa["source_match"];
@ -583,12 +606,21 @@ namespace Mist{
bool inputHLS::checkArguments(){
config->is_active = true;
if (config->getString("input") == "-"){return false;}
HTTP::URL mainPls(config->getString("input"));
if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){
if (config->getString("input") == "-"){
return false;
}
if (!initPlaylist(config->getString("input"), false)){return false;}
// If the playlist is of event type, init the amount of segments in the playlist
if (isLiveDVR){
// Set the previousSegmentIndex by quickly going through the existing PLS files
setParsedSegments();
meta.setLive(true);
meta.setVod(true);
streamIsLive = true;
}
return true;
}
@ -600,14 +632,14 @@ namespace Mist{
meta.reInit(config->getString("streamname"), false);
INFO_MSG("Parsing live stream to create header...");
TS::Packet packet; // to analyse and extract data
int counter = 1;
int pidCounter = 1;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
// Skip empty playlists
if (!pListIt->second.size()){continue;}
int preCounter = counter;
int prepidCounter = pidCounter;
tsStream.clear();
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
@ -632,16 +664,7 @@ namespace Mist{
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
int tmpTrackId = headerPack.getTrackId();
uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId];
if (packetId == 0){
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
packetId = counter;
VERYHIGH_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(),
headerPack.getTrackId(), counter);
counter++;
}
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
size_t idx = M.trackIDToIndex(packetId, getpid());
if ((idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){
@ -656,31 +679,85 @@ namespace Mist{
break; // we have all tracks discovered, next playlist!
}
}while (!segDowner.atEnd());
if (preCounter < counter){break;}// We're done reading this playlist!
if (prepidCounter < pidCounter){break;}// We're done reading this playlist!
}
}
tsStream.clear();
currentPlaylist = 0;
segDowner.close(); // make sure we have nothing left over
INFO_MSG("header complete, beginning live ingest of %d tracks", counter - 1);
INFO_MSG("header complete, beginning live ingest of %d tracks", pidCounter - 1);
}
bool inputHLS::readExistingHeader(){
if (!Input::readExistingHeader()){return false;}
if (!M.inputLocalVars.isMember("version") || M.inputLocalVars["version"].asInt() < 2){
INFO_MSG("Header needs update, regenerating");
return false;
}
// Vars for parsing TS packets
TS::Packet packet;
bool hasPacket;
// Set internal variables based on existing header file
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
tsStream.clear();
uint32_t entId = 0;
// For each entry in the playlist, we need to parse the earliest packet in order to set the segment offset
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end(); entryIt++){
tsStream.partialClear();
if (!segDowner.loadSegment(*entryIt)){
FAIL_MSG("Failed to load segment - skipping to next");
continue;
}
// Flag to allow getPacketTime to set the offset
entId++;
allowRemap = true;
while (!segDowner.atEnd()){
hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket());
if (hasPacket){
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
while (headerPack){
size_t tmpTrackId = headerPack.getTrackId();
// Call getPacketID in order to set pidmapping
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
// Call getPacketTime in order to set segment offset
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
VERYHIGH_MSG("Parsed earliest TS packet with id '%lu' @ '%lu ms' for TS segment with index '%u'", packetId, packetTime, entId - 1);
// Keep parsing until we have called getPacketID for each track
tsStream.getEarliestPacket(headerPack);
}
// If we do not have a packet on each track, read the next TS packet
}else if (segDowner.readNext()){
packet.FromPointer(segDowner.packetPtr);
tsStream.parse(packet, entId);
}
}
// Finally save the offset as part of the TS segment. This is required for bufferframe
// to work correctly, since not every segment might have an UTC timestamp tag
std::deque<playListEntries> &curList = listEntries[pListIt->first];
VERYHIGH_MSG("Saving offset of '%" PRId64 "' to current TS segment", plsTimeOffset[pListIt->first]);
curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first];
}
}
tsStream.clear();
// set bootMsOffset in order to display the program time correctly in the player
meta.setBootMsOffset(streamOffset);
return true;
}
bool inputHLS::readHeader(){
if (streamIsLive){return true;}
bool hasHeader = false;
// See whether a separate header file exists.
meta.reInit(config->getString("streamname"), config->getString("input") + ".dtsh");
hasHeader = (bool)M;
if (!hasHeader){meta.reInit(config->getString("streamname"), true);}
TS::Packet packet; // to analyse and extract data
if (streamIsLive && !isLiveDVR){return true;}
// to analyse and extract data
TS::Packet packet;
char *data;
size_t dataLen;
int counter = 1;
bool hasPacket = false;
meta.reInit(config->getString("streamname"), true);
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
@ -697,41 +774,33 @@ namespace Mist{
continue;
}
entId++;
allowRemap = true;
while (!segDowner.atEnd()){
while (tsStream.hasPacketOnEachTrack()){
// Wait for packets on each track to make sure the offset is set based on the earliest packet
hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket());
if (hasPacket){
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
size_t tmpTrackId = headerPack.getTrackId();
uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId];
if (packetId == 0){
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
packetId = counter;
INFO_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(),
headerPack.getTrackId(), counter);
counter++;
}
size_t idx = M.trackIDToIndex(packetId, getpid());
if (!hasHeader && (idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
if (!hasHeader){
while (headerPack){
size_t tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
meta.update(headerPack.getTime(), packOffset, idx, dataLen, entId,
headerPack.hasMember("keyframe"), packSendSize);
VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize);
tsStream.getEarliestPacket(headerPack);
}
}
// No packets available, so read the next TS packet if available
if (segDowner.readNext()){
packet.FromPointer(segDowner.packetPtr);
tsStream.parse(packet, entId);
@ -743,39 +812,37 @@ namespace Mist{
tsStream.getEarliestPacket(headerPack);
while (headerPack){
int tmpTrackId = headerPack.getTrackId();
uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId];
if (packetId == 0){
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
packetId = counter;
INFO_MSG("Added file %s, trackid: %zu, mapped to: %d", entryIt->filename.c_str(),
headerPack.getTrackId(), counter);
counter++;
}
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
size_t idx = M.trackIDToIndex(packetId, getpid());
if (!hasHeader && (idx == INVALID_TRACK_ID || !M.getCodec(idx).size())){
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
if (!hasHeader){
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
meta.update(headerPack.getTime(), packOffset, idx, dataLen, entId,
headerPack.hasMember("keyframe"), packSendSize);
}
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %llu on track %zu", dataLen, packetTime, packOffset, idx);
meta.update(packetTime, packOffset, idx, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize);
tsStream.getEarliestPacket(headerPack);
}
if (hasHeader){break;}
// Finally save the offset as part of the TS segment. This is required for bufferframe
// to work correctly, since not every segment might have an UTC timestamp tag
std::deque<playListEntries> &curList = listEntries[pListIt->first];
INFO_MSG("Saving offset of '%" PRId64 "' to current TS segment", plsTimeOffset[pListIt->first]);
curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first];
}
}
if (streamIsLive){return true;}
// set bootMsOffset in order to display the program time correctly in the player
meta.setBootMsOffset(streamOffset);
if (streamIsLive || isLiveDVR){return true;}
// Set local vars used for parsing existing headers
meta.inputLocalVars["version"] = 2;
INFO_MSG("write header file...");
M.toFile((config->getString("input") + ".dtsh").c_str());
@ -783,7 +850,127 @@ namespace Mist{
return true;
}
bool inputHLS::needsLock(){return !streamIsLive;}
bool inputHLS::needsLock(){
if (isLiveDVR){
return true;
}
return !streamIsLive;
}
/// \brief Parses new segments added to playlist files as live data
/// \param segmentIndex: the index of the segment in the current playlist
/// \return True if the segment has been buffered successfully
bool inputHLS::parseSegmentAsLive(uint64_t segmentIndex){
bool hasOffset = false;
bool hasPacket = false;
// Keep our own variables to make sure buffering live data does not interfere with VoD pages loading
TS::Packet packet;
TS::Stream tsStream;
char *data;
size_t dataLen;
// Get the updated list of entries
std::deque<playListEntries> &curList = listEntries[currentPlaylist];
if (curList.size() <= segmentIndex){
FAIL_MSG("Tried to load segment with index '%lu', but the playlist only contains '%zu' entries!", segmentIndex, curList.size());
return false;
}
if (!segDowner.loadSegment(curList.at(segmentIndex))){
FAIL_MSG("Failed to load segment");
return false;
}
while (!segDowner.atEnd()){
// Wait for packets on each track to make sure the offset is set based on the earliest packet
hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket());
if (hasPacket){
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
while (headerPack){
size_t tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
uint64_t packetTime = headerPack.getTime();
// Set segment offset and save it
if (!hasOffset && curList.at(segmentIndex).mUTC){
hasOffset = true;
DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime;
MEDIUM_MSG("Setting current live segment time offset to '%ld'", DVRTimeOffsets[currentPlaylist]);
curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist];
}
if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){
hasOffset = true;
packetTime += DVRTimeOffsets[currentPlaylist];
HIGH_MSG("Adjusting current packet timestamp '%ld' -> '%ld'", headerPack.getTime(), packetTime);
}
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe"));
tsStream.getEarliestPacket(headerPack);
}
}
// No packets available, so read the next TS packet if available
if (segDowner.readNext()){
packet.FromPointer(segDowner.packetPtr);
tsStream.parse(packet, segmentIndex + 1);
}
}
// get last packets
tsStream.finish();
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
while (headerPack){
int tmpTrackId = headerPack.getTrackId();
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
uint64_t packetTime = headerPack.getTime();
if (DVRTimeOffsets.count(currentPlaylist)){
packetTime += DVRTimeOffsets[currentPlaylist];
VERYHIGH_MSG("Adjusting current packet timestamp '%ld' -> '%ld'", headerPack.getTime(), packetTime);
}
size_t idx = M.trackIDToIndex(packetId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
idx = M.trackIDToIndex(packetId, getpid());
}
headerPack.getString("data", data, dataLen);
// keyframe data exists, so always add 19 bytes keyframedata.
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
VERYHIGH_MSG("Adding packet (%zuB) at %zu with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, segmentIndex + 1, headerPack.hasMember("keyframe"));
tsStream.getEarliestPacket(headerPack);
}
return true;
}
/// \brief Override userLeadOut to buffer new data as live packets
void inputHLS::userLeadOut(){
if (!isLiveDVR){
return;
}
// Update all playlists to make sure listEntries contains all live segments
for (std::map<uint64_t, Playlist>::iterator pListIt = playlistMapping.begin();
pListIt != playlistMapping.end(); pListIt++){
pListIt->second.reload();
}
HIGH_MSG("Current playlist has parsed %lu/%lu entries", listEntries[currentPlaylist].size(), parsedSegments[currentPlaylist]);
for(uint64_t entryIt = parsedSegments[currentPlaylist]; entryIt < listEntries[currentPlaylist].size(); entryIt++){
MEDIUM_MSG("Adding entry #%lu as live data", entryIt);
if (parseSegmentAsLive(entryIt)){
parsedSegments[currentPlaylist]++;
}else{
break;
}
}
}
bool inputHLS::openStreamSource(){return true;}
@ -793,7 +980,6 @@ namespace Mist{
bool finished = false;
thisPacket.null();
while (config->is_active && (needsLock() || keepAlive())){
// Check if we have a packet
bool hasPacket = false;
if (idx == INVALID_TRACK_ID){
@ -813,70 +999,26 @@ namespace Mist{
continue;
}
}else{
tsStream.getPacket(getMappedTrackId(M.getID(idx)), thisPacket);
tid = getMappedTrackId(M.getID(idx));
tsStream.getPacket(tid, thisPacket);
}
if (!thisPacket){
FAIL_MSG("Could not getNext TS packet!");
return;
}
uint64_t newTime = thisPacket.getTime();
// Apply offset if any was set
if (plsTimeOffset.count(currentPlaylist)){newTime += plsTimeOffset[currentPlaylist];}
if (zUTC){
if (allowSoftRemap && thisPacket.getTime() < 1000){allowSoftRemap = false;}
// UTC based timestamp offsets
if ((allowRemap || allowSoftRemap) && nUTC){
allowRemap = false;
allowSoftRemap = !thisPacket.getTime();
int64_t prevOffset = plsTimeOffset[currentPlaylist];
plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - thisPacket.getTime();
newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist];
INFO_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu32 "@%" PRIu64
"ms -> %" PRIu64 "ms",
prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime);
}
}else{
// Non-UTC based
if (plsLastTime.count(currentPlaylist)){
if (plsInterval.count(currentPlaylist)){
if (allowRemap && (newTime < plsLastTime[currentPlaylist] ||
newTime > plsLastTime[currentPlaylist] + plsInterval[currentPlaylist] * 60)){
allowRemap = false;
// time difference too great, change offset to correct for it
int64_t prevOffset = plsTimeOffset[currentPlaylist];
plsTimeOffset[currentPlaylist] +=
(int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime;
newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist];
INFO_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu32 "@%" PRIu64
"ms -> %" PRIu64 "ms",
prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime);
}
}
// check if time increased, and no increase yet or is less than current, set new interval
if (newTime > plsLastTime[currentPlaylist] &&
(!plsInterval.count(currentPlaylist) ||
newTime - plsLastTime[currentPlaylist] < plsInterval[currentPlaylist])){
plsInterval[currentPlaylist] = newTime - plsLastTime[currentPlaylist];
}
}
// store last time for interval/offset calculations
plsLastTime[tid] = newTime;
}
DONTEVEN_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), newTime);
uint64_t packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC);
HIGH_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime);
// overwrite trackId on success
Bit::htobl(thisPacket.getData() + 8, tid);
Bit::htobll(thisPacket.getData() + 12, newTime);
Bit::htobll(thisPacket.getData() + 12, packetTime);
return; // Success!
}
// No? Let's read some more data and check again.
if (!segDowner.atEnd() && segDowner.readNext()){
tsBuf.FromPointer(segDowner.packetPtr);
tsStream.parse(tsBuf, streamIsLive ? 0 : currentIndex);
tsStream.parse(tsBuf, streamIsLive && !isLiveDVR ? 0 : currentIndex + 1);
continue; // check again
}
@ -898,7 +1040,6 @@ namespace Mist{
}
if (currentPlaylist == 0){
VERYHIGH_MSG("Waiting for segments...");
keepAlive();
Util::wait(500);
continue;
}
@ -939,7 +1080,11 @@ namespace Mist{
DTSC::Keys keys(M.keys(idx));
for (size_t i = keys.getFirstValid(); i < keys.getEndValid(); i++){
if (keys.getTime(i) > seekTime){break;}
if (keys.getTime(i) > seekTime){
VERYHIGH_MSG("Found elapsed key with a time of %lu ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1);
break;
}
VERYHIGH_MSG("Found valid key with a time of %lu ms at playlist index %zu while seeking", keys.getTime(i), keys.getBpos(i)-1);
plistEntry = keys.getBpos(i);
}
@ -966,6 +1111,11 @@ namespace Mist{
}
playListEntries &entry = curPlaylist.at(currentIndex);
segDowner.loadSegment(entry);
// If we have an offset, load it
if (entry.timeOffset){
HIGH_MSG("Setting time offset of this TS segment to '%ld'", entry.timeOffset);
plsTimeOffset[currentPlaylist] = entry.timeOffset;
}
}
HIGH_MSG("readPMT()");
@ -973,8 +1123,88 @@ namespace Mist{
while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){
if (!segDowner.readNext()){break;}
tsBuffer.FromPointer(segDowner.packetPtr);
tsStream.parse(tsBuffer, 0);
tsStream.parse(tsBuffer, streamIsLive && !isLiveDVR ? 0 : plistEntry);
}
}
/// \brief Applies any offset to the packets original timestamp
/// \param packetTime: the original timestamp of the packet
/// \param tid: the trackid corresponding to this track and playlist
/// \param currentPlaylist: the ID of the playlist we are currently trying to parse
/// \param nUTC: Defaults to 0. If larger than 0, sync the timestamp based on this value and zUTC
/// \return the (modified) packetTime, used for meta.updates and buffering packets
uint64_t inputHLS::getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC){
INSANE_MSG("Calculating adjusted packet time for track '%lu' on playlist '%lu' with current timestamp '%lu'. UTC timestamp is '%lu'", tid, currentPlaylist, packetTime, nUTC);
uint64_t newTime = packetTime;
// UTC based timestamp offsets
if (zUTC){
// Overwrite offset if we have an UTC timestamp
if (allowRemap && nUTC){
allowRemap = false;
int64_t prevOffset = plsTimeOffset[currentPlaylist];
plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - packetTime;
newTime = packetTime + plsTimeOffset[currentPlaylist];
MEDIUM_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu64 "@%" PRIu64
"ms -> %" PRIu64 "ms", prevOffset, plsTimeOffset[currentPlaylist], tid, packetTime, newTime);
}else if (plsTimeOffset.count(currentPlaylist)){
// Prevent integer overflow for large negative offsets, which can happen
// when the first time of another track is lower that the firsttime
if (plsTimeOffset[currentPlaylist] + int64_t(newTime) < 0){
newTime = 0;
FAIL_MSG("Time offset is too negative causing an integer overflow. Setting current packet time to 0.");
}else{
VERYHIGH_MSG("Adjusting timestamp %lu -> %lu (offset is %ld)", newTime, newTime + plsTimeOffset[currentPlaylist], plsTimeOffset[currentPlaylist]);
newTime += plsTimeOffset[currentPlaylist];
}
}
// Non-UTC based
}else{
// Apply offset if any was set
if (plsTimeOffset.count(currentPlaylist)){
VERYHIGH_MSG("Adjusting timestamp %lu -> %lu (offset is %ld)", newTime, newTime + plsTimeOffset[currentPlaylist], plsTimeOffset[currentPlaylist]);
newTime += plsTimeOffset[currentPlaylist];
}
if (plsLastTime.count(currentPlaylist)){
if (plsInterval.count(currentPlaylist)){
if (allowRemap && (newTime < plsLastTime[currentPlaylist] ||
newTime > plsLastTime[currentPlaylist] + plsInterval[currentPlaylist] * 60)){
allowRemap = false;
// time difference too great, change offset to correct for it
int64_t prevOffset = plsTimeOffset[currentPlaylist];
plsTimeOffset[currentPlaylist] +=
(int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime;
newTime = packetTime + plsTimeOffset[currentPlaylist];
MEDIUM_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %" PRIu64 "@%" PRIu64
"ms -> %" PRIu64 "ms",
prevOffset, plsTimeOffset[currentPlaylist], tid, packetTime, newTime);
}
}
// check if time increased, and no increase yet or is less than current, set new interval
if (newTime > plsLastTime[currentPlaylist] &&
(!plsInterval.count(currentPlaylist) ||
newTime - plsLastTime[currentPlaylist] < plsInterval[currentPlaylist])){
plsInterval[currentPlaylist] = newTime - plsLastTime[currentPlaylist];
}
}
// store last time for interval/offset calculations
plsLastTime[tid] = newTime;
}
return newTime;
}
/// \brief Returns the packet ID corresponding to this playlist and track
/// \param trackId: the trackid corresponding to this track and playlist
/// \param currentPlaylist: the ID of the playlist we are currently trying to parse
uint64_t inputHLS::getPacketID(uint64_t currentPlaylist, uint64_t trackId){
uint64_t packetId = pidMapping[(((uint64_t)currentPlaylist) << 32) + trackId];
if (packetId == 0){
pidMapping[(((uint64_t)currentPlaylist) << 32) + trackId] = pidCounter;
pidMappingR[pidCounter] = (((uint64_t)currentPlaylist) << 32) + trackId;
packetId = pidCounter;
pidCounter++;
}
return packetId;
}
size_t inputHLS::getEntryId(uint32_t playlistId, uint64_t bytePos){
@ -1014,8 +1244,21 @@ namespace Mist{
return lastOut;
}
/// \brief Sets parsedSegments for all playlists, specifying how many segments
/// have already been parsed. Additional segments can then be parsed as live data
void inputHLS::setParsedSegments(){
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
parsedSegments[pListIt->first] = pListIt->second.size();
INFO_MSG("Playlist %u already contains %li VOD segments", pListIt->first, parsedSegments[pListIt->first]);
}
}
/// Parses the main playlist, possibly containing variants.
bool inputHLS::initPlaylist(const std::string &uri, bool fullInit){
// Used to set zUTC, in case the first EXT-X-PROGRAM-DATE-TIME does not appear before the first segment
float timestampSum = 0;
bool isRegularPls = false;
plsInitCount = 0;
plsTotalCount = 0;
{
@ -1025,8 +1268,9 @@ namespace Mist{
std::string line;
bool ret = false;
startTime = Util::bootSecs();
std::string playlistLocation = uri;
HTTP::URL playlistRootPath(uri);
HTTP::URL playlistRootPath(playlistLocation);
// Convert custom http(s)-hls protocols into regular notation.
if (playlistRootPath.protocol == "http-hls"){playlistRootPath.protocol = "http";}
if (playlistRootPath.protocol == "https-hls"){playlistRootPath.protocol = "https";}
@ -1034,7 +1278,7 @@ namespace Mist{
std::istringstream urlSource;
std::ifstream fileSource;
bool isUrl = (uri.find("://") != std::string::npos);
bool isUrl = (playlistLocation.find("://") != std::string::npos);
if (isUrl){
INFO_MSG("Downloading main playlist file from '%s'", uri.c_str());
HTTP::Downloader plsDL;
@ -1047,16 +1291,16 @@ namespace Mist{
urlSource.str(plsDL.data());
}else{
// If we're not a URL and there is no / at the start, ensure we get the full absolute path.
if (uri[0] != '/'){
char *rp = realpath(uri.c_str(), 0);
if (playlistLocation[0] != '/'){
char *rp = realpath(playlistLocation.c_str(), 0);
if (rp){
playlistRootPath = HTTP::URL((std::string)rp);
free(rp);
}
}
fileSource.open(uri.c_str());
fileSource.open(playlistLocation.c_str());
if (!fileSource.good()){
FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), uri.c_str());
FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), playlistLocation.c_str());
}
}
@ -1069,6 +1313,9 @@ namespace Mist{
// skip empty lines in the playlist
continue;
}
if (line.compare(0, 26, "#EXT-X-PLAYLIST-TYPE:EVENT") == 0){
isLiveDVR = true;
}
if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){
// this is a variant playlist file.. next line is an uri to a playlist
// file
@ -1133,15 +1380,30 @@ namespace Mist{
}
}else if (line.compare(0, 7, "#EXTINF") == 0){
// current file is not a variant playlist, but regular playlist.
ret = readPlaylist(playlistRootPath.getUrl(), fullInit);
break;
// Read as regular playlist after we are done checking for UTC timestamps
isRegularPls = true;
// Sum the duration to make sure we set zUTC time right
float f = atof(line.c_str() + 8);
timestampSum += f * 1000;
}else if (line.compare(0, 24, "#EXT-X-PROGRAM-DATE-TIME") == 0 && !zUTC){
// Init UTC variables used to rewrite packet timestamps
size_t pos = line.find(":");
std::string val = line.c_str() + pos + 1;
zUTC = ISO8601toUnixmillis(val) - uint64_t(timestampSum);
nUTC = zUTC;
INFO_MSG("Setting program unix start time to '%s' (%" PRIu64 ")", line.substr(pos + 1).c_str(), zUTC);
// store offset so that we can set it after reading the header
streamOffset = zUTC - (Util::unixMS() - Util::bootMS());
}else{
// ignore wrong lines
VERYHIGH_MSG("ignore wrong line: %s", line.c_str());
}
}
if (isRegularPls){
ret = readPlaylist(playlistRootPath.getUrl(), fullInit);
}
if (!isUrl){fileSource.close();}
uint32_t maxWait = 0;
@ -1164,7 +1426,15 @@ namespace Mist{
/// Function for reading every playlist.
bool inputHLS::readPlaylist(const HTTP::URL &uri, bool fullInit){
std::string urlBuffer = (fullInit ? "" : ";") + uri.getUrl();
std::string urlBuffer;
// Wildcard streams can have a ' ' in the name, which getUrl converts to a '+'
if (uri.isLocalPath()){
urlBuffer = (fullInit ? "" : ";") + uri.getFilePath();
}
else{
urlBuffer = (fullInit ? "" : ";") + uri.getUrl();
}
INFO_MSG("Adding playlist(s): %s", urlBuffer.c_str());
tthread::thread runList(playlistRunner, (void *)urlBuffer.data());
runList.detach(); // Abandon the thread, it's now running independently
uint32_t timeout = 0;
@ -1183,11 +1453,12 @@ namespace Mist{
{
tthread::lock_guard<tthread::mutex> guard(entryMutex);
std::deque<playListEntries> &curList = listEntries[currentPlaylist];
INSANE_MSG("Current playlist contains %li entries. Current index is %li in playlist %li", curList.size(), currentIndex, currentPlaylist);
if (!curList.size()){
WARN_MSG("no entries found in playlist: %" PRIu64 "!", currentPlaylist);
return false;
}
if (!streamIsLive){
if (!streamIsLive || isLiveDVR){
// VoD advances the index by one and attempts to read
// The playlist is not altered in this case, since we may need to seek back later
currentIndex++;
@ -1220,11 +1491,14 @@ namespace Mist{
ERROR_MSG("Could not download segment: %s", ntry.filename.c_str());
return readNextFile(); // Attempt to read another, if possible.
}
nUTC = ntry.mUTC;
// If we don't have a zero-time yet, guess an hour before this UTC time is probably fine
if (nUTC && !zUTC){zUTC = nUTC - 3600000;}
allowRemap = true;
allowSoftRemap = false;
// If we have an offset, load it
if (ntry.timeOffset){
plsTimeOffset[currentPlaylist] = ntry.timeOffset;
// Else allow of the offset to be set by getPacketTime
}else{
nUTC = ntry.mUTC;
allowRemap = true;
}
return true;
}

View file

@ -28,6 +28,7 @@ namespace Mist{
uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known
float duration;
uint64_t timestamp;
int64_t timeOffset;
uint64_t wait;
char ivec[16];
char keyAES[16];
@ -98,6 +99,7 @@ namespace Mist{
protected:
uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis
uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis
int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header
unsigned int startTime;
PlaylistType playlistType;
SegmentDownloader segDowner;
@ -107,10 +109,10 @@ namespace Mist{
uint64_t currentPlaylist;
bool allowRemap; ///< True if the next packet may remap the timestamps
bool allowSoftRemap; ///< True if the next packet may soft-remap the timestamps
std::map<uint64_t, uint64_t> pidMapping;
std::map<uint64_t, uint64_t> pidMappingR;
std::map<int, int64_t> plsTimeOffset;
std::map<int, int64_t> DVRTimeOffsets;
std::map<int, uint64_t> plsLastTime;
std::map<int, uint64_t> plsInterval;
@ -122,13 +124,27 @@ namespace Mist{
Socket::Connection conn;
TS::Packet tsBuf;
// Used to map packetId of packets in pidMapping
int pidCounter;
/// HLS live VoD stream, set if: #EXT-X-PLAYLIST-TYPE:EVENT
bool isLiveDVR;
// Override userLeadOut to buffer new data as live packets
void userLeadOut();
/// Tries to add as much live packets from a TS file at the given location
bool parseSegmentAsLive(uint64_t segmentIndex);
// Updates parsedSegmentIndex for all playlists
void setParsedSegments();
// index of last playlist entry finished parsing
long previousSegmentIndex;
size_t firstSegment();
void waitForNextSegment();
void readPMT();
bool checkArguments();
bool preSetup();
bool readHeader();
bool needHeader(){return true;}
bool readExistingHeader();
void getNext(size_t idx = INVALID_TRACK_ID);
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);
FILE *inFile;
@ -144,6 +160,8 @@ namespace Mist{
uint32_t getMappedTrackId(uint64_t id);
uint32_t getMappedTrackPlaylist(uint64_t id);
uint64_t getOriginalTrackId(uint32_t playlistId, uint32_t id);
uint64_t getPacketTime(uint64_t packetTime, uint64_t tid, uint64_t currentPlaylist, uint64_t nUTC = 0);
uint64_t getPacketID(uint64_t currentPlaylist, uint64_t trackId);
size_t getEntryId(uint32_t playlistId, uint64_t bytePos);
};
}// namespace Mist