From d58e860a2c87b9e8e74ab201ce41ce0756b4a9e8 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 16 Nov 2019 19:07:16 +0100 Subject: [PATCH] HLS: support for handling and syncing on ISO8601 timestamps in input --- src/input/input_hls.cpp | 380 +++++++++++++++++++++++++--------------- src/input/input_hls.h | 25 ++- 2 files changed, 260 insertions(+), 145 deletions(-) diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 6fd26fce..5b39dad7 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -1,4 +1,5 @@ #include "input_hls.h" +#include "mbedtls/aes.h" #include #include #include @@ -19,17 +20,80 @@ #include #include #include -#include "mbedtls/aes.h" #define SEM_TS_CLAIM "/MstTSIN%s" +static uint64_t ISO8601toUnixmillis(const std::string &ts){ + // Format examples: + // 2019-12-05T09:41:16.765000+00:00 + // 2019-12-05T09:41:16.765Z + uint64_t unxTime = 0; + const size_t T = ts.find('T'); + if (T == std::string::npos){ + ERROR_MSG("Timestamp is date-only (no time marker): %s", ts.c_str()); + return 0; + } + const size_t Z = ts.find_first_of("Z+-", T); + const std::string date = ts.substr(0, T); + const std::string time = ts.substr(T + 1, Z - T - 1); + const std::string zone = ts.substr(Z); + unsigned long year, month, day; + if (sscanf(date.c_str(), "%lu-%lu-%lu", &year, &month, &day) != 3){ + ERROR_MSG("Could not parse date: %s", date.c_str()); + return 0; + } + unsigned int hour, minute; + double seconds; + if (sscanf(time.c_str(), "%u:%d:%lf", &hour, &minute, &seconds) != 3){ + ERROR_MSG("Could not parse time: %s", time.c_str()); + return 0; + } + // Fill the tm struct with the values we just read. + // We're ignoring time zone for now, and forcing seconds to zero since we add them in later with more precision + struct tm tParts; + tParts.tm_sec = 0; + tParts.tm_min = minute; + tParts.tm_hour = hour; + tParts.tm_mon = month - 1; + tParts.tm_year = year - 1900; + tParts.tm_mday = day; + tParts.tm_isdst = 0; + // convert to unix time, in seconds + unxTime = timegm(&tParts); + // convert to milliseconds + unxTime *= 1000; + // finally add the seconds (and milliseconds) + unxTime += (seconds * 1000); + + // Now, adjust for time zone if needed + if (zone.size() && zone[0] != 'Z'){ + bool sign = (zone[0] == '+'); + { + unsigned long hrs, mins; + if (sscanf(zone.c_str() + 1, "%lu:%lu", &hrs, &mins) == 2){ + unxTime += mins * 60000 + hrs * 3600000; + }else if (sscanf(zone.c_str() + 1, "%lu", &hrs) == 1){ + if (hrs > 100){ + unxTime += (hrs % 100) * 60000 + ((uint64_t)(hrs / 100)) * 3600000; + }else{ + unxTime += hrs * 3600000; + } + }else{ + WARN_MSG("Could not parse time zone '%s'; assuming UTC", zone.c_str()); + } + } + } + DONTEVEN_MSG("Time '%s' = %" PRIu64, ts.c_str(), unxTime); + return unxTime; +} + namespace Mist{ - ///Mutex for accesses to listEntries + /// Mutex for accesses to listEntries tthread::mutex entryMutex; - static unsigned int plsTotalCount = 0;///Total playlists active - static unsigned int plsInitCount = 0;///Count of playlists fully inited + static unsigned int plsTotalCount = 0; /// Total playlists active + static unsigned int plsInitCount = 0; /// Count of playlists fully inited bool streamIsLive; uint32_t globalWaitTime; @@ -53,15 +117,15 @@ namespace Mist{ /// Helper function that is used to run the playlist downloaders /// Expects character array with playlist URL as argument, sets the first byte of the pointer to zero when loaded. - void playlistRunner(void * ptr){ - if (!ptr){return;}//abort if we received a null pointer - something is seriously wrong + void playlistRunner(void *ptr){ + if (!ptr){return;}// abort if we received a null pointer - something is seriously wrong bool initOnly = false; - if (((char*)ptr)[0] == ';'){initOnly = true;} + if (((char *)ptr)[0] == ';'){initOnly = true;} - Playlist pls(initOnly?((char*)ptr)+1:(char*)ptr); + Playlist pls(initOnly ? ((char *)ptr) + 1 : (char *)ptr); plsTotalCount++; - //signal that we have now copied the URL and no longer need it - ((char*)ptr)[0] = 0; + // signal that we have now copied the URL and no longer need it + ((char *)ptr)[0] = 0; if (!pls.uri.size()){ FAIL_MSG("Variant playlist URL is empty, aborting update thread."); @@ -70,10 +134,10 @@ namespace Mist{ pls.reload(); plsInitCount++; - if (initOnly){return;}//Exit because init-only mode + if (initOnly){return;}// Exit because init-only mode while (self->config->is_active){ - //If the timer has not expired yet, sleep up to a second. Otherwise, reload. + // If the timer has not expired yet, sleep up to a second. Otherwise, reload. /// \TODO Sleep longer if that makes sense? if (pls.reloadNext > Util::bootSecs()){ Util::sleep(1000); @@ -81,7 +145,7 @@ namespace Mist{ pls.reload(); } } - INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); + MEDIUM_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); } SegmentDownloader::SegmentDownloader(){ @@ -91,7 +155,8 @@ namespace Mist{ } Playlist::Playlist(const std::string &uriSrc){ - id = 0;//to be set later + nextUTC = 0; + id = 0; // to be set later INFO_MSG("Adding variant playlist: %s", uriSrc.c_str()); plsDL.dataTimeout = 15; plsDL.retryCount = 8; @@ -100,44 +165,42 @@ namespace Mist{ playlistEnd = false; noChangeCount = 0; lastTimestamp = 0; - uri = uriSrc; - root = HTTP::URL(uri); + root = HTTP::URL(uriSrc); + uri = root.getUrl(); memset(keyAES, 0, 16); startTime = Util::bootSecs(); reloadNext = 0; } - - void parseKey(std::string key, char * newKey, unsigned int len){ + + void parseKey(std::string key, char *newKey, unsigned int len){ memset(newKey, 0, len); for (size_t i = 0; i < key.size() && i < (len << 1); ++i){ char c = key[i]; - newKey[i>>1] |= ((c&15) + (((c&64)>>6) | ((c&64)>>3))) << ((~i&1) << 2); + newKey[i >> 1] |= ((c & 15) + (((c & 64) >> 6) | ((c & 64) >> 3))) << ((~i & 1) << 2); } } - void flipKey(char * d){ - for(size_t i = 0; i< 8; i++){ + void flipKey(char *d){ + for (size_t i = 0; i < 8; i++){ char tmp = d[i]; - d[i] = d[15-i]; - d[15-i]=tmp; + d[i] = d[15 - i]; + d[15 - i] = tmp; } - } -static std::string printhex(const char * data, size_t len) -{ - static const char* const lut = "0123456789ABCDEF"; + static std::string printhex(const char *data, size_t len){ + static const char *const lut = "0123456789ABCDEF"; std::string output; output.reserve(2 * len); - for (size_t i = 0; i < len; ++i) - { - const unsigned char c = data[i]; - output.push_back(lut[c >> 4]); - output.push_back(lut[c & 15]); + for (size_t i = 0; i < len; ++i){ + const unsigned char c = data[i]; + output.push_back(lut[c >> 4]); + output.push_back(lut[c & 15]); } return output; -} + } + /// Returns true if packetPtr is at the end of the current segment. bool SegmentDownloader::atEnd() const{ return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); @@ -147,11 +210,12 @@ static std::string printhex(const char * data, size_t len) bool Playlist::isUrl() const{return root.protocol.size();} /// Loads the given segment URL into the segment buffer. - bool SegmentDownloader::loadSegment(const playListEntries & entry){ - std::string hexKey = printhex(entry.keyAES,16); + bool SegmentDownloader::loadSegment(const playListEntries &entry){ + std::string hexKey = printhex(entry.keyAES, 16); std::string hexIvec = printhex(entry.ivec, 16); - MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), hexIvec.c_str()); + MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), + hexIvec.c_str()); if (!segDL.get(entry.filename)){ FAIL_MSG("failed download: %s", entry.filename.c_str()); return false; @@ -171,32 +235,32 @@ static std::string printhex(const char * data, size_t len) } } - //If we have a non-null key, decrypt - if (entry.keyAES[ 0] != 0 || entry.keyAES[ 1] != 0 || entry.keyAES[ 2] != 0 || entry.keyAES[ 3] != 0 || \ - entry.keyAES[ 4] != 0 || entry.keyAES[ 5] != 0 || entry.keyAES[ 6] != 0 || entry.keyAES[ 7] != 0 || \ - entry.keyAES[ 8] != 0 || entry.keyAES[ 9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || \ + // If we have a non-null key, decrypt + if (entry.keyAES[0] != 0 || entry.keyAES[1] != 0 || entry.keyAES[2] != 0 || entry.keyAES[3] != 0 || + entry.keyAES[4] != 0 || entry.keyAES[5] != 0 || entry.keyAES[6] != 0 || entry.keyAES[7] != 0 || + entry.keyAES[8] != 0 || entry.keyAES[9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){ - //Setup AES context + // Setup AES context mbedtls_aes_context aes; - //Load key for decryption - mbedtls_aes_setkey_dec(&aes, (const unsigned char*)entry.keyAES, 128); - //Allocate a pointer for writing the decrypted data to + // Load key for decryption + mbedtls_aes_setkey_dec(&aes, (const unsigned char *)entry.keyAES, 128); + // Allocate a pointer for writing the decrypted data to static Util::ResizeablePointer outdata; outdata.allocate(segDL.data().size()); - //Actually decrypt the data + // Actually decrypt the data unsigned char tmpIvec[16]; memcpy(tmpIvec, entry.ivec, 16); - - mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec, (const unsigned char*)segDL.data().data(), (unsigned char*)(char*)outdata); - //Data is now still padded, the padding consists of X bytes of padding, all containing the raw value X. - //Since padding is mandatory, we can simply read the last byte and remove X bytes from the length. - if (segDL.data().size() <= outdata[segDL.data().size()-1]){ + mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec, + (const unsigned char *)segDL.data().data(), (unsigned char *)(char *)outdata); + // Data is now still padded, the padding consists of X bytes of padding, all containing the raw value X. + // Since padding is mandatory, we can simply read the last byte and remove X bytes from the length. + if (segDL.data().size() <= outdata[segDL.data().size() - 1]){ FAIL_MSG("Encryption padding is >= entire segment. Considering download failed."); return false; } - size_t newSize = segDL.data().size() - outdata[segDL.data().size()-1]; - //Finally, overwrite the original data buffer with the new one + size_t newSize = segDL.data().size() - outdata[segDL.data().size() - 1]; + // Finally, overwrite the original data buffer with the new one segDL.data().assign(outdata, newSize); } @@ -223,11 +287,11 @@ static std::string printhex(const char * data, size_t len) return true; } - /// Handles both initial load and future reloads. /// Returns how many segments were added to the internal segment list. bool Playlist::reload(){ uint64_t fileNo = 0; + nextUTC = 0; // Make sure we don't use old timestamps std::string line; std::string key; std::string val; @@ -269,20 +333,20 @@ static std::string printhex(const char * data, size_t len) key = line.substr(7, pos - 7); val = line.c_str() + pos + 1; - if(key == "KEY" ){ + if (key == "KEY"){ size_t tmpPos = val.find("METHOD="); size_t tmpPos2 = val.substr(tmpPos).find(","); - keyMethod = val.substr(tmpPos +7, tmpPos2-tmpPos-7); + keyMethod = val.substr(tmpPos + 7, tmpPos2 - tmpPos - 7); tmpPos = val.find("URI=\""); - tmpPos2 = val.substr(tmpPos+5).find("\""); + tmpPos2 = val.substr(tmpPos + 5).find("\""); keyUri = val.substr(tmpPos + 5, tmpPos2); tmpPos = val.find("IV="); - keyIV = val.substr(tmpPos+5, 32); + keyIV = val.substr(tmpPos + 5, 32); - //when key not found, download and store it in the map - if (!keys.count(keyUri)) { + // when key not found, download and store it in the map + if (!keys.count(keyUri)){ HTTP::Downloader keyDL; if (!keyDL.get(root.link(keyUri)) || !keyDL.isOk()){ FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); @@ -298,6 +362,7 @@ static std::string printhex(const char * data, size_t len) } if (key == "MEDIA-SEQUENCE"){fileNo = atoll(val.c_str());} + if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);} if (key == "PLAYLIST-TYPE"){ if (val == "VOD"){ @@ -331,10 +396,11 @@ static std::string printhex(const char * data, size_t len) filename = root.link(filename).getUrl(); char ivec[16]; parseKey(keyIV, ivec, 16); - addEntry(filename, f, totalBytes,keys[keyUri],std::string(ivec,16)); + addEntry(filename, f, totalBytes, keys[keyUri], std::string(ivec, 16)); lastFileIndex = fileNo + 1; ++count; } + nextUTC = 0; ++fileNo; } @@ -344,7 +410,7 @@ static std::string printhex(const char * data, size_t len) }else{ fileSource.close(); } - //Set the global live/vod bool to live if this playlist looks like a live playlist + // Set the global live/vod bool to live if this playlist looks like a live playlist if (playlistType == LIVE){streamIsLive = true;} if (globalWaitTime < waitTime){globalWaitTime = waitTime;} @@ -370,28 +436,27 @@ static std::string printhex(const char * data, size_t len) } /// Adds playlist segments to be processed - void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &iv){ - if (!isSupportedFile(filename)){ - WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); - return; - } - + void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes, + const std::string &key, const std::string &iv){ + // if (!isSupportedFile(filename)){ + // WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); + // return; + //} playListEntries entry; entry.filename = filename; cleanLine(entry.filename); entry.bytePos = totalBytes; entry.duration = duration; + entry.mUTC = nextUTC; - if(key.size() && iv.size()){ + if (key.size() && iv.size()){ memcpy(entry.ivec, iv.data(), 16); memcpy(entry.keyAES, key.data(), 16); }else{ memset(entry.ivec, 0, 16); memset(entry.keyAES, 0, 16); } - - if (!isUrl()){ std::ifstream fileSource; @@ -405,17 +470,19 @@ static std::string printhex(const char * data, size_t len) lastTimestamp += duration; { tthread::lock_guard guard(entryMutex); - //Set a playlist ID if we haven't assigned one yet. - //Note: This method requires never removing playlists, only adding. - //The mutex assures we have a unique count/number. - if (!id){id = listEntries.size()+1;} + // Set a playlist ID if we haven't assigned one yet. + // Note: This method requires never removing playlists, only adding. + // The mutex assures we have a unique count/number. + if (!id){id = listEntries.size() + 1;} listEntries[id].push_back(entry); - MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %d queued): %s", id, lastFileIndex, listEntries[id].size(), filename.c_str()); + MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %d queued): %s", id, lastFileIndex, + listEntries[id].size(), filename.c_str()); } } /// Constructor of HLS Input inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){ + zUTC = nUTC = 0; self = this; streamIsLive = false; globalWaitTime = 0; @@ -452,7 +519,9 @@ static std::string printhex(const char * data, size_t len) config->is_active = true; if (config->getString("input") == "-"){return false;} HTTP::URL mainPls(config->getString("input")); - if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){return false;} + if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){ + return false; + } if (!initPlaylist(config->getString("input"), false)){return false;} return true; } @@ -489,15 +558,15 @@ static std::string printhex(const char * data, size_t len) bool keepReading = false; tthread::lock_guard guard(entryMutex); - for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); - pListIt++){ - //Skip empty playlists + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); pListIt++){ + // Skip empty playlists if (!pListIt->second.size()){continue;} int preCounter = counter; tsStream.clear(); - - for (std::deque::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end(); ++entryIt){ + for (std::deque::iterator entryIt = pListIt->second.begin(); + entryIt != pListIt->second.end(); ++entryIt){ uint64_t lastBpos = entryIt->bytePos; nProxy.userClient.keepAlive(); if (!segDowner.loadSegment(*entryIt)){ @@ -510,7 +579,7 @@ static std::string printhex(const char * data, size_t len) if (!packet.FromPointer(segDowner.packetPtr)){ WARN_MSG("Could not load TS packet, aborting segment parse"); tsStream.clear(); - break;//Abort load + break; // Abort load } tsStream.parse(packet, lastBpos); segDowner.packetPtr += 188; @@ -526,7 +595,8 @@ static std::string printhex(const char * data, size_t len) pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); packetId = counter; - VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), headerPack.getTrackId(), counter); + VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), + headerPack.getTrackId(), counter); counter++; } @@ -537,16 +607,16 @@ static std::string printhex(const char * data, size_t len) myMeta.tracks[packetId].minKeepAway, packetId); } } - break;//we have all tracks discovered, next playlist! + break; // we have all tracks discovered, next playlist! } - }while(!segDowner.atEnd()); - if (preCounter < counter){break;}//We're done reading this playlist! + }while (!segDowner.atEnd()); + if (preCounter < counter){break;}// We're done reading this playlist! } } tsStream.clear(); currentPlaylist = 0; - segDowner.segDL.data().clear();//make sure we have nothing left over - INFO_MSG("header complete, beginning live ingest of %d tracks", counter-1); + segDowner.segDL.data().clear(); // make sure we have nothing left over + INFO_MSG("header complete, beginning live ingest of %d tracks", counter - 1); } bool inputHLS::readHeader(){ @@ -575,8 +645,8 @@ static std::string printhex(const char * data, size_t len) size_t dataLen; tthread::lock_guard guard(entryMutex); - for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); - pListIt++){ + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); pListIt++){ tsStream.clear(); uint32_t entId = 0; @@ -595,8 +665,8 @@ static std::string printhex(const char * data, size_t len) while (!endOfFile){ tsStream.parse(packet, lastBpos); - //if (pListIt->isUrl()){ - lastBpos = entryIt->bytePos + segDowner.segDL.data().size(); + // if (pListIt->isUrl()){ + lastBpos = entryIt->bytePos + segDowner.segDL.data().size(); //}else{ // lastBpos = entryIt->bytePos + in.tellg(); //} @@ -615,8 +685,7 @@ static std::string printhex(const char * data, size_t len) counter++; } - if (!hasHeader && - (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ + if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); } @@ -625,21 +694,19 @@ static std::string printhex(const char * data, size_t len) uint64_t pBPos = headerPack.getInt("bpos"); // keyframe data exists, so always add 19 bytes keyframedata. - long long packOffset = - headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - long long packSendSize = - 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; + long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; myMeta.update(headerPack.getTime(), packOffset, packetId, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize); } } - //if (pListIt->isUrl()){ - endOfFile = segDowner.atEnd(); - if (!endOfFile){ - packet.FromPointer(segDowner.packetPtr); - segDowner.packetPtr += 188; - } + // if (pListIt->isUrl()){ + endOfFile = segDowner.atEnd(); + if (!endOfFile){ + packet.FromPointer(segDowner.packetPtr); + segDowner.packetPtr += 188; + } //}else{ // packet.FromStream(in); // endOfFile = in.eof(); @@ -657,14 +724,12 @@ static std::string printhex(const char * data, size_t len) pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); packetId = counter; - INFO_MSG("Added file %s, trackid: %d, mapped to: %d", - entryIt->filename.c_str(), + INFO_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), headerPack.getTrackId(), counter); counter++; } - if (!hasHeader && - (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ + if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); } @@ -674,15 +739,14 @@ static std::string printhex(const char * data, size_t len) // keyframe data exists, so always add 19 bytes keyframedata. long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - long long packSendSize = - 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; + long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; myMeta.update(headerPack.getTime(), packOffset, packetId, dataLen, entId, headerPack.hasMember("keyframe"), packSendSize); } tsStream.getEarliestPacket(headerPack); } - //if (!pListIt->isUrl()){in.close();} + // if (!pListIt->isUrl()){in.close();} if (hasHeader){break;} } @@ -699,9 +763,7 @@ static std::string printhex(const char * data, size_t len) return true; } - bool inputHLS::needsLock(){ - return !streamIsLive; - } + bool inputHLS::needsLock(){return !streamIsLive;} bool inputHLS::openStreamSource(){return true;} @@ -727,16 +789,63 @@ static std::string printhex(const char * data, size_t len) if (myMeta.live){ tsStream.getEarliestPacket(thisPacket); tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); + if (!tid){ + INFO_MSG("Track %" PRIu64 " on PLS %u -> %" PRIu32, thisPacket.getTrackId(), currentPlaylist, tid); + continue; + } }else{ tsStream.getPacket(getMappedTrackId(tid), thisPacket); } if (!thisPacket){ FAIL_MSG("Could not getNext TS packet!"); - }else{ - DONTEVEN_MSG("Packet track %lu @ time %" PRIu64 " ms", tid, thisPacket.getTime()); - // overwrite trackId on success - Bit::htobl(thisPacket.getData() + 8, tid); + return; } + + uint64_t newTime = thisPacket.getTime(); + + // Apply offset if any was set + if (plsTimeOffset.count(currentPlaylist)){newTime += plsTimeOffset[currentPlaylist];} + + if (zUTC){ + //UTC based timestamp offsets + if (allowRemap && nUTC){ + allowRemap = false; + int64_t prevOffset = plsTimeOffset[currentPlaylist]; + plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - thisPacket.getTime(); + newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist]; + INFO_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %lu@%" PRIu64 + "ms -> %" PRIu64 "ms", + prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime); + } + }else{ + //Non-UTC based + if (plsLastTime.count(currentPlaylist)){ + if (plsInterval.count(currentPlaylist)){ + if (allowRemap && (newTime < plsLastTime[currentPlaylist] || newTime > plsLastTime[currentPlaylist] + plsInterval[currentPlaylist] * 60)){ + allowRemap = false; + // time difference too great, change offset to correct for it + int64_t prevOffset = plsTimeOffset[currentPlaylist]; + plsTimeOffset[currentPlaylist] += (int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime; + newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist]; + INFO_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %lu@%" PRIu64 + "ms -> %" PRIu64 "ms", + prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime); + } + } + // check if time increased, and no increase yet or is less than current, set new interval + if (newTime > plsLastTime[currentPlaylist] && + (!plsInterval.count(currentPlaylist) || newTime - plsLastTime[currentPlaylist] < plsInterval[currentPlaylist])){ + plsInterval[currentPlaylist] = newTime - plsLastTime[currentPlaylist]; + } + } + // store last time for interval/offset calculations + plsLastTime[tid] = newTime; + } + + DONTEVEN_MSG("Packet %lu@%" PRIu64 "ms -> %ms" PRIu64, tid, thisPacket.getTime(), newTime); + // overwrite trackId on success + Bit::htobl(thisPacket.getData() + 8, tid); + Bit::htobll(thisPacket.getData() + 12, newTime); return; // Success! } @@ -772,7 +881,7 @@ static std::string printhex(const char * data, size_t len) if (readNextFile()){ MEDIUM_MSG("Next segment read successfully"); endOfFile = false; // no longer at end of file - continue; // Success! Continue regular parsing. + continue; // Success! Continue regular parsing. } // Nothing works! @@ -790,8 +899,7 @@ static std::string printhex(const char * data, size_t len) const char *tmpPtr = segDowner.segDL.data().data(); while (!tsStream.hasPacketOnEachTrack() && - (tmpPtr - segDowner.segDL.data().data() + 188 <= - segDowner.segDL.data().size())){ + (tmpPtr - segDowner.segDL.data().data() + 188 <= segDowner.segDL.data().size())){ tsBuffer.FromPointer(tmpPtr); tsStream.parse(tsBuffer, 0); tmpPtr += 188; @@ -805,8 +913,7 @@ static std::string printhex(const char * data, size_t len) int trackId = 0; unsigned long plistEntry = 0xFFFFFFFFull; - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); - it++){ + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ unsigned long thisBPos = 0; for (std::deque::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++){ @@ -829,7 +936,7 @@ static std::string printhex(const char * data, size_t len) currentPlaylist = getMappedTrackPlaylist(trackId); - {//Lock mutex for listEntries + {// Lock mutex for listEntries tthread::lock_guard guard(entryMutex); std::deque &curPlaylist = listEntries[currentPlaylist]; playListEntries &entry = curPlaylist.at(currentIndex); @@ -990,7 +1097,7 @@ static std::string printhex(const char * data, size_t len) }else if (line.compare(0, 7, "#EXTINF") == 0){ // current file is not a variant playlist, but regular playlist. - ret = readPlaylist(uri, fullInit); + ret = readPlaylist(playlistRootPath.getUrl(), fullInit); break; }else{ // ignore wrong lines @@ -1020,16 +1127,12 @@ static std::string printhex(const char * data, size_t len) /// Function for reading every playlist. bool inputHLS::readPlaylist(const HTTP::URL &uri, bool fullInit){ - std::string urlBuffer = (fullInit?"":";")+uri.getUrl(); + std::string urlBuffer = (fullInit ? "" : ";") + uri.getUrl(); tthread::thread runList(playlistRunner, (void *)urlBuffer.data()); - runList.detach(); //Abandon the thread, it's now running independently + runList.detach(); // Abandon the thread, it's now running independently uint32_t timeout = 0; - while (urlBuffer.data()[0] && ++timeout < 100){ - Util::sleep(100); - } - if (timeout >= 100){ - WARN_MSG("Thread start timed out for: %s", urlBuffer.c_str()); - } + while (urlBuffer.data()[0] && ++timeout < 100){Util::sleep(100);} + if (timeout >= 100){WARN_MSG("Thread start timed out for: %s", urlBuffer.c_str());} return true; } @@ -1039,7 +1142,7 @@ static std::string printhex(const char * data, size_t len) tsStream.clear(); playListEntries ntry; - //This scope limiter prevents the recursion down below from deadlocking us + // This scope limiter prevents the recursion down below from deadlocking us { tthread::lock_guard guard(entryMutex); std::deque &curList = listEntries[currentPlaylist]; @@ -1055,6 +1158,10 @@ static std::string printhex(const char * data, size_t len) ERROR_MSG("Could not download segment: %s", ntry.filename.c_str()); return readNextFile(); // Attempt to read another, if possible. } + nUTC = ntry.mUTC; + //If we don't have a zero-time yet, guess an hour before this UTC time is probably fine + if (nUTC && !zUTC){zUTC = nUTC - 3600000;} + allowRemap = true; return true; } @@ -1069,8 +1176,8 @@ static std::string printhex(const char * data, size_t len) int segCount = 0; tthread::lock_guard guard(entryMutex); - for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); - pListIt++){ + for (std::map >::iterator pListIt = listEntries.begin(); + pListIt != listEntries.end(); pListIt++){ segCount += pListIt->second.size(); if (pListIt->second.size()){ if (pListIt->second.front().timestamp < firstTimeStamp || tmpId < 0){ @@ -1084,4 +1191,3 @@ static std::string printhex(const char * data, size_t len) } }// namespace Mist - diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 6387d852..4c9773c0 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -17,14 +17,15 @@ namespace Mist{ enum PlaylistType{VOD, LIVE, EVENT}; - + extern bool streamIsLive; - extern uint32_t globalWaitTime;//largest waitTime for any playlist we're loading - used to update minKeepAway - void parseKey(std::string key, char * newKey, unsigned int len); + extern uint32_t globalWaitTime; // largest waitTime for any playlist we're loading - used to update minKeepAway + void parseKey(std::string key, char *newKey, unsigned int len); struct playListEntries{ std::string filename; uint64_t bytePos; + uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known float duration; unsigned int timestamp; unsigned int wait; @@ -40,7 +41,7 @@ namespace Mist{ SegmentDownloader(); HTTP::Downloader segDL; const char *packetPtr; - bool loadSegment(const playListEntries & entry); + bool loadSegment(const playListEntries &entry); bool atEnd() const; }; @@ -49,7 +50,8 @@ namespace Mist{ Playlist(const std::string &uriSrc = ""); bool isUrl() const; bool reload(); - void addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &keyIV); + void addEntry(const std::string &filename, float duration, uint64_t &totalBytes, + const std::string &key, const std::string &keyIV); bool isSupportedFile(const std::string filename); std::string uri; // link to the current playlistfile @@ -68,11 +70,12 @@ namespace Mist{ PlaylistType playlistType; unsigned int lastTimestamp; unsigned int startTime; + uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist char keyAES[16]; std::map keys; }; - void playlistRunner(void * ptr); + void playlistRunner(void *ptr); class inputHLS : public Input{ public: @@ -81,7 +84,10 @@ namespace Mist{ bool needsLock(); bool openStreamSource(); bool callback(); + protected: + uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis + uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis unsigned int startTime; PlaylistType playlistType; SegmentDownloader segDowner; @@ -89,9 +95,13 @@ namespace Mist{ int targetDuration; bool endPlaylist; int currentPlaylist; - + + bool allowRemap; ///< True if the next packet may remap the timestamps std::map pidMapping; std::map pidMappingR; + std::map plsTimeOffset; + std::map plsLastTime; + std::map plsInterval; int currentIndex; std::string currentFile; @@ -129,4 +139,3 @@ namespace Mist{ }// namespace Mist typedef Mist::inputHLS mistIn; -