From 9d1b3cfe98be40bd018183de9327f095cf8f15c9 Mon Sep 17 00:00:00 2001 From: Ramkoemar Date: Thu, 31 Oct 2019 15:11:35 +0100 Subject: [PATCH] Support for clearkey encrypted HLS input and multithreaded HLS input playlist updating --- src/input/input_hls.cpp | 655 ++++++++++++++++++++++------------------ src/input/input_hls.h | 49 +-- 2 files changed, 389 insertions(+), 315 deletions(-) diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index d09be69c..6fd26fce 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -19,11 +19,22 @@ #include #include #include +#include "mbedtls/aes.h" #define SEM_TS_CLAIM "/MstTSIN%s" namespace Mist{ + ///Mutex for accesses to listEntries + tthread::mutex entryMutex; + + static unsigned int plsTotalCount = 0;///Total playlists active + static unsigned int plsInitCount = 0;///Count of playlists fully inited + + bool streamIsLive; + uint32_t globalWaitTime; + std::map > listEntries; + // These are used in the HTTP::Downloader callback, to prevent timeouts when downloading // segments/playlists. inputHLS *self = 0; @@ -40,12 +51,48 @@ namespace Mist{ if (s.length() > 0 && s.at(s.length() - 1) == '\r'){s.erase(s.size() - 1);} } - Playlist::Playlist(const std::string &uriSrc){ - INFO_MSG("Adding variant playlist: %s", uriSrc.c_str()); + /// Helper function that is used to run the playlist downloaders + /// Expects character array with playlist URL as argument, sets the first byte of the pointer to zero when loaded. + void playlistRunner(void * ptr){ + if (!ptr){return;}//abort if we received a null pointer - something is seriously wrong + bool initOnly = false; + if (((char*)ptr)[0] == ';'){initOnly = true;} + + Playlist pls(initOnly?((char*)ptr)+1:(char*)ptr); + plsTotalCount++; + //signal that we have now copied the URL and no longer need it + ((char*)ptr)[0] = 0; + + if (!pls.uri.size()){ + FAIL_MSG("Variant playlist URL is empty, aborting update thread."); + return; + } + + pls.reload(); + plsInitCount++; + if (initOnly){return;}//Exit because init-only mode + + while (self->config->is_active){ + //If the timer has not expired yet, sleep up to a second. Otherwise, reload. + /// \TODO Sleep longer if that makes sense? + if (pls.reloadNext > Util::bootSecs()){ + Util::sleep(1000); + }else{ + pls.reload(); + } + } + INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); + } + + SegmentDownloader::SegmentDownloader(){ segDL.progressCallback = callbackFunc; segDL.dataTimeout = 5; segDL.retryCount = 5; - plsDL.progressCallback = callbackFunc; + } + + Playlist::Playlist(const std::string &uriSrc){ + id = 0;//to be set later + INFO_MSG("Adding variant playlist: %s", uriSrc.c_str()); plsDL.dataTimeout = 15; plsDL.retryCount = 8; lastFileIndex = 0; @@ -55,12 +102,44 @@ namespace Mist{ lastTimestamp = 0; uri = uriSrc; root = HTTP::URL(uri); + memset(keyAES, 0, 16); startTime = Util::bootSecs(); - if (uri.size()){reload();} + reloadNext = 0; + } + + void parseKey(std::string key, char * newKey, unsigned int len){ + memset(newKey, 0, len); + for (size_t i = 0; i < key.size() && i < (len << 1); ++i){ + char c = key[i]; + newKey[i>>1] |= ((c&15) + (((c&64)>>6) | ((c&64)>>3))) << ((~i&1) << 2); + } } + void flipKey(char * d){ + for(size_t i = 0; i< 8; i++){ + char tmp = d[i]; + d[i] = d[15-i]; + d[15-i]=tmp; + } + + } + +static std::string printhex(const char * data, size_t len) +{ + static const char* const lut = "0123456789ABCDEF"; + + std::string output; + output.reserve(2 * len); + for (size_t i = 0; i < len; ++i) + { + const unsigned char c = data[i]; + output.push_back(lut[c >> 4]); + output.push_back(lut[c & 15]); + } + return output; +} /// Returns true if packetPtr is at the end of the current segment. - bool Playlist::atEnd() const{ + bool SegmentDownloader::atEnd() const{ return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); } @@ -68,9 +147,13 @@ namespace Mist{ bool Playlist::isUrl() const{return root.protocol.size();} /// Loads the given segment URL into the segment buffer. - bool Playlist::loadSegment(const HTTP::URL &uri){ - if (!segDL.get(uri)){ - FAIL_MSG("failed download: %s", uri.getUrl().c_str()); + bool SegmentDownloader::loadSegment(const playListEntries & entry){ + std::string hexKey = printhex(entry.keyAES,16); + std::string hexIvec = printhex(entry.ivec, 16); + + MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), hexIvec.c_str()); + if (!segDL.get(entry.filename)){ + FAIL_MSG("failed download: %s", entry.filename.c_str()); return false; } @@ -88,25 +171,59 @@ namespace Mist{ } } + //If we have a non-null key, decrypt + if (entry.keyAES[ 0] != 0 || entry.keyAES[ 1] != 0 || entry.keyAES[ 2] != 0 || entry.keyAES[ 3] != 0 || \ + entry.keyAES[ 4] != 0 || entry.keyAES[ 5] != 0 || entry.keyAES[ 6] != 0 || entry.keyAES[ 7] != 0 || \ + entry.keyAES[ 8] != 0 || entry.keyAES[ 9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || \ + entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){ + //Setup AES context + mbedtls_aes_context aes; + //Load key for decryption + mbedtls_aes_setkey_dec(&aes, (const unsigned char*)entry.keyAES, 128); + //Allocate a pointer for writing the decrypted data to + static Util::ResizeablePointer outdata; + outdata.allocate(segDL.data().size()); + //Actually decrypt the data + unsigned char tmpIvec[16]; + memcpy(tmpIvec, entry.ivec, 16); + + + mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec, (const unsigned char*)segDL.data().data(), (unsigned char*)(char*)outdata); + //Data is now still padded, the padding consists of X bytes of padding, all containing the raw value X. + //Since padding is mandatory, we can simply read the last byte and remove X bytes from the length. + if (segDL.data().size() <= outdata[segDL.data().size()-1]){ + FAIL_MSG("Encryption padding is >= entire segment. Considering download failed."); + return false; + } + size_t newSize = segDL.data().size() - outdata[segDL.data().size()-1]; + //Finally, overwrite the original data buffer with the new one + segDL.data().assign(outdata, newSize); + } + // check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes if (segDL.data().data()[0] == 0x47){ if (segDL.data().size() % 188){ FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s", - segDL.data().size(), uri.getUrl().c_str()); + segDL.data().size(), entry.filename.c_str()); return false; } }else if (segDL.data().data()[5] == 0x47){ if (segDL.data().size() % 192){ FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s", - segDL.data().size(), uri.getUrl().c_str()); + segDL.data().size(), entry.filename.c_str()); return false; } + }else{ + FAIL_MSG("Segment does not appear to contain TS data. Considering download failed."); + return false; } packetPtr = segDL.data().data(); + HIGH_MSG("Segment download complete and passed sanity checks"); return true; } + /// Handles both initial load and future reloads. /// Returns how many segments were added to the internal segment list. bool Playlist::reload(){ @@ -114,6 +231,11 @@ namespace Mist{ std::string line; std::string key; std::string val; + + std::string keyMethod; + std::string keyUri; + std::string keyIV; + int count = 0; uint64_t totalBytes = 0; @@ -147,6 +269,29 @@ namespace Mist{ key = line.substr(7, pos - 7); val = line.c_str() + pos + 1; + if(key == "KEY" ){ + size_t tmpPos = val.find("METHOD="); + size_t tmpPos2 = val.substr(tmpPos).find(","); + keyMethod = val.substr(tmpPos +7, tmpPos2-tmpPos-7); + + tmpPos = val.find("URI=\""); + tmpPos2 = val.substr(tmpPos+5).find("\""); + keyUri = val.substr(tmpPos + 5, tmpPos2); + + tmpPos = val.find("IV="); + keyIV = val.substr(tmpPos+5, 32); + + //when key not found, download and store it in the map + if (!keys.count(keyUri)) { + HTTP::Downloader keyDL; + if (!keyDL.get(root.link(keyUri)) || !keyDL.isOk()){ + FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str()); + continue; + } + keys.insert(std::pair(keyUri, keyDL.data())); + } + } + if (key == "TARGETDURATION"){ waitTime = atoi(val.c_str()) / 2; if (waitTime < 5){waitTime = 5;} @@ -183,7 +328,10 @@ namespace Mist{ // check for already added segments if (fileNo >= lastFileIndex){ cleanLine(filename); - addEntry(filename, f, totalBytes); + filename = root.link(filename).getUrl(); + char ivec[16]; + parseKey(keyIV, ivec, 16); + addEntry(filename, f, totalBytes,keys[keyUri],std::string(ivec,16)); lastFileIndex = fileNo + 1; ++count; } @@ -196,6 +344,10 @@ namespace Mist{ }else{ fileSource.close(); } + //Set the global live/vod bool to live if this playlist looks like a live playlist + if (playlistType == LIVE){streamIsLive = true;} + + if (globalWaitTime < waitTime){globalWaitTime = waitTime;} reloadNext = Util::bootSecs() + waitTime; return (count > 0); @@ -218,13 +370,12 @@ namespace Mist{ } /// Adds playlist segments to be processed - void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes){ + void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &iv){ if (!isSupportedFile(filename)){ WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); return; } - MEDIUM_MSG("Adding segment (%d): %s", lastFileIndex, filename.c_str()); playListEntries entry; entry.filename = filename; @@ -232,6 +383,16 @@ namespace Mist{ entry.bytePos = totalBytes; entry.duration = duration; + if(key.size() && iv.size()){ + memcpy(entry.ivec, iv.data(), 16); + memcpy(entry.keyAES, key.data(), 16); + }else{ + memset(entry.ivec, 0, 16); + memset(entry.keyAES, 0, 16); + } + + + if (!isUrl()){ std::ifstream fileSource; std::string test = root.link(entry.filename).getFilePath(); @@ -242,12 +403,22 @@ namespace Mist{ entry.timestamp = lastTimestamp + startTime; lastTimestamp += duration; - entries.push_back(entry); + { + tthread::lock_guard guard(entryMutex); + //Set a playlist ID if we haven't assigned one yet. + //Note: This method requires never removing playlists, only adding. + //The mutex assures we have a unique count/number. + if (!id){id = listEntries.size()+1;} + listEntries[id].push_back(entry); + MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %d queued): %s", id, lastFileIndex, listEntries[id].size(), filename.c_str()); + } } /// Constructor of HLS Input inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){ self = this; + streamIsLive = false; + globalWaitTime = 0; currentPlaylist = 0; capa["name"] = "HLS"; @@ -280,7 +451,9 @@ namespace Mist{ bool inputHLS::checkArguments(){ config->is_active = true; if (config->getString("input") == "-"){return false;} - if (!initPlaylist(config->getString("input"))){return false;} + HTTP::URL mainPls(config->getString("input")); + if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){return false;} + if (!initPlaylist(config->getString("input"), false)){return false;} return true; } @@ -299,10 +472,15 @@ namespace Mist{ } void inputHLS::parseStreamHeader(){ + if (!initPlaylist(config->getString("input"))){ + FAIL_MSG("Failed to load HLS playlist, aborting"); + myMeta = DTSC::Meta(); + return; + } myMeta = DTSC::Meta(); myMeta.live = false; myMeta.vod = true; - VERYHIGH_MSG("parsestream"); + INFO_MSG("Parsing live stream to create header..."); TS::Packet packet; // to analyse and extract data int counter = 1; @@ -310,89 +488,69 @@ namespace Mist{ unsigned int dataLen; bool keepReading = false; - for (std::vector::iterator pListIt = playlists.begin(); pListIt != playlists.end(); + tthread::lock_guard guard(entryMutex); + for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); pListIt++){ - if (!pListIt->entries.size()){continue;} + //Skip empty playlists + if (!pListIt->second.size()){continue;} int preCounter = counter; tsStream.clear(); - std::deque::iterator entryIt = pListIt->entries.begin(); - while (true){ + for (std::deque::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end(); ++entryIt){ uint64_t lastBpos = entryIt->bytePos; - - if (pListIt->isUrl()){ - bool ret = false; - nProxy.userClient.keepAlive(); - ret = pListIt->loadSegment(pListIt->root.link(entryIt->filename)); - keepReading = packet.FromPointer(pListIt->packetPtr); - pListIt->packetPtr += 188; - }else{ - in.open(pListIt->root.link(entryIt->filename).getUrl().c_str()); - if (!in.good()){ - FAIL_MSG("Could not open segment (%s): %s", strerror(errno), - pListIt->root.link(entryIt->filename).getFilePath().c_str()); - continue; // skip to the next one - } - keepReading = packet.FromStream(in); + nProxy.userClient.keepAlive(); + if (!segDowner.loadSegment(*entryIt)){ + WARN_MSG("Skipping segment that could not be loaded in an attempt to recover"); + tsStream.clear(); + continue; } - while (keepReading){ + do{ + if (!packet.FromPointer(segDowner.packetPtr)){ + WARN_MSG("Could not load TS packet, aborting segment parse"); + tsStream.clear(); + break;//Abort load + } tsStream.parse(packet, lastBpos); - if (pListIt->isUrl()){ - lastBpos = entryIt->bytePos + pListIt->segDL.data().size(); - }else{ - lastBpos = entryIt->bytePos + in.tellg(); - } + segDowner.packetPtr += 188; - while (tsStream.hasPacketOnEachTrack()){ - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId]; + if (tsStream.hasPacketOnEachTrack()){ + while (tsStream.hasPacket()){ + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + int tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId]; - if (packetId == 0){ - pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId(); - packetId = counter; - HIGH_MSG("Added file %s, trackid: %d, mapped to: %d", - pListIt->root.link(entryIt->filename).getUrl().c_str(), - headerPack.getTrackId(), counter); - counter++; - } - - if ((!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ - tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); - myMeta.tracks[packetId].minKeepAway = pListIt->waitTime * 2000; - VERYHIGH_MSG("setting minKeepAway = %d for track: %d", - myMeta.tracks[packetId].minKeepAway, packetId); + if (packetId == 0){ + pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; + pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); + packetId = counter; + VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), headerPack.getTrackId(), counter); + counter++; + } + + if ((!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ + tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); + myMeta.tracks[packetId].minKeepAway = globalWaitTime * 2000; + VERYHIGH_MSG("setting minKeepAway = %d for track: %d", + myMeta.tracks[packetId].minKeepAway, packetId); + } } + break;//we have all tracks discovered, next playlist! } - - if (pListIt->isUrl()){ - keepReading = !pListIt->atEnd(); - if (keepReading){ - packet.FromPointer(pListIt->packetPtr); - pListIt->packetPtr += 188; - } - }else{ - keepReading = packet.FromStream(in); - } - } - - in.close(); - - //Go to next segment, abort if we found at least one track or ran out of segments. - entryIt++; - if (counter != preCounter || entryIt == pListIt->entries.end()){break;} + }while(!segDowner.atEnd()); + if (preCounter < counter){break;}//We're done reading this playlist! } } tsStream.clear(); - in.close(); + currentPlaylist = 0; + segDowner.segDL.data().clear();//make sure we have nothing left over + INFO_MSG("header complete, beginning live ingest of %d tracks", counter-1); } bool inputHLS::readHeader(){ - if (playlists.size() && playlists[0].playlistType == LIVE){return true;} + if (streamIsLive){return true;} std::istringstream urlSource; std::ifstream fileSource; @@ -416,54 +574,43 @@ namespace Mist{ char *data; size_t dataLen; - for (std::vector::iterator pListIt = playlists.begin(); pListIt != playlists.end(); + tthread::lock_guard guard(entryMutex); + for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); pListIt++){ tsStream.clear(); uint32_t entId = 0; - for (std::deque::iterator entryIt = pListIt->entries.begin(); - entryIt != pListIt->entries.end(); entryIt++){ + for (std::deque::iterator entryIt = pListIt->second.begin(); + entryIt != pListIt->second.end(); entryIt++){ tsStream.partialClear(); endOfFile = false; - if (pListIt->isUrl()){ - pListIt->loadSegment(pListIt->root.link(entryIt->filename)); - endOfFile = !pListIt->atEnd(); - if (!endOfFile){packet.FromPointer(pListIt->packetPtr);} - pListIt->packetPtr += 188; - }else{ - in.close(); - in.open(pListIt->root.link(entryIt->filename).getFilePath().c_str()); - if (!in.good()){ - FAIL_MSG("Could not open segment (%s): %s", strerror(errno), - pListIt->root.link(entryIt->filename).getFilePath().c_str()); - continue; // skip to the next one - } - packet.FromStream(in); - endOfFile = in.eof(); - } + segDowner.loadSegment(*entryIt); + endOfFile = !segDowner.atEnd(); + if (!endOfFile){packet.FromPointer(segDowner.packetPtr);} + segDowner.packetPtr += 188; entId++; uint64_t lastBpos = entryIt->bytePos; while (!endOfFile){ tsStream.parse(packet, lastBpos); - if (pListIt->isUrl()){ - lastBpos = entryIt->bytePos + pListIt->segDL.data().size(); - }else{ - lastBpos = entryIt->bytePos + in.tellg(); - } + //if (pListIt->isUrl()){ + lastBpos = entryIt->bytePos + segDowner.segDL.data().size(); + //}else{ + // lastBpos = entryIt->bytePos + in.tellg(); + //} while (tsStream.hasPacketOnEachTrack()){ DTSC::Packet headerPack; tsStream.getEarliestPacket(headerPack); int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId]; + uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId]; if (packetId == 0){ - pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId(); + pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; + pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); packetId = counter; counter++; } @@ -487,16 +634,16 @@ namespace Mist{ } } - if (pListIt->isUrl()){ - endOfFile = pListIt->atEnd(); + //if (pListIt->isUrl()){ + endOfFile = segDowner.atEnd(); if (!endOfFile){ - packet.FromPointer(pListIt->packetPtr); - pListIt->packetPtr += 188; + packet.FromPointer(segDowner.packetPtr); + segDowner.packetPtr += 188; } - }else{ - packet.FromStream(in); - endOfFile = in.eof(); - } + //}else{ + // packet.FromStream(in); + // endOfFile = in.eof(); + //} } // get last packets tsStream.finish(); @@ -504,14 +651,14 @@ namespace Mist{ tsStream.getEarliestPacket(headerPack); while (headerPack){ int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId]; + uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId]; if (packetId == 0){ - pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId(); + pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter; + pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId(); packetId = counter; INFO_MSG("Added file %s, trackid: %d, mapped to: %d", - pListIt->root.link(entryIt->filename).getUrl().c_str(), + entryIt->filename.c_str(), headerPack.getTrackId(), counter); counter++; } @@ -535,83 +682,40 @@ namespace Mist{ tsStream.getEarliestPacket(headerPack); } - if (!pListIt->isUrl()){in.close();} + //if (!pListIt->isUrl()){in.close();} if (hasHeader){break;} } } - if (hasHeader || (playlists.size() && playlists[0].isUrl())){return true;} + if (streamIsLive){return true;} INFO_MSG("write header file..."); std::ofstream oFile((config->getString("input") + ".dtsh").c_str()); oFile << myMeta.toJSON().toNetPacked(); oFile.close(); - in.close(); return true; } bool inputHLS::needsLock(){ - if (playlists.size() && playlists[0].isUrl()){return false;} - return (playlists.size() <= currentPlaylist) || - !(playlists[currentPlaylist].playlistType == LIVE); + return !streamIsLive; } bool inputHLS::openStreamSource(){return true;} - int inputHLS::getFirstPlaylistToReload(){ - int plsNum = 0; - int bestPls = 0; - uint64_t earRld = 0; - for (std::vector::iterator it = playlists.begin(); it != playlists.end(); ++it){ - if (!plsNum || it->reloadNext < earRld){ - bestPls = plsNum; - earRld = it->reloadNext; - } - ++plsNum; - } - return bestPls; - } - void inputHLS::getNext(bool smart){ - currentPlaylist = firstSegment(); INSANE_MSG("Getting next"); uint32_t tid = 0; - bool endOfFile = false; + static bool endOfFile = false; if (selectedTracks.size()){tid = *selectedTracks.begin();} thisPacket.null(); while (config->is_active && (needsLock() || nProxy.userClient.isAlive())){ - int oldPlaylist = currentPlaylist; - currentPlaylist = firstSegment(); - - // If we have a new playlist, print that info. - if (currentPlaylist >= 0 && oldPlaylist != currentPlaylist){ - MEDIUM_MSG("Switched to playlist %d", currentPlaylist); - } - - // No segments? Wait until next playlist reloading time. - if (currentPlaylist < 0){ - int a = getFirstPlaylistToReload(); - MEDIUM_MSG("Waiting for %d seconds until next playlist reload...", - playlists[a].reloadNext - Util::bootSecs()); - while (Util::bootSecs() < playlists[a].reloadNext && - (needsLock() || nProxy.userClient.isAlive())){ - Util::wait(1000); - nProxy.userClient.keepAlive(); - } - MEDIUM_MSG("Reloading playlist %d", a); - playlists[a].reload(); - currentPlaylist = firstSegment(); - // Continue regular parsing, in case we need another reload. - currentPlaylist = oldPlaylist; - continue; - } // Check if we have a packet bool hasPacket = false; - if (playlists[currentPlaylist].playlistType == LIVE){ + if (streamIsLive){ hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket()); }else{ hasPacket = tsStream.hasPacket(getMappedTrackId(tid)); @@ -637,51 +741,37 @@ namespace Mist{ } // No? Let's read some more data and check again. - if (playlists[currentPlaylist].isUrl()){ - if (!playlists[currentPlaylist].atEnd()){ - tsBuf.FromPointer(playlists[currentPlaylist].packetPtr); - playlists[currentPlaylist].packetPtr += 188; - tsStream.parse(tsBuf, 0); - continue; // check again - } - }else{ - if (in.good()){ - tsBuf.FromStream(in); - tsStream.parse(tsBuf, 0); - continue; // check again - } + if (!segDowner.atEnd()){ + tsBuf.FromPointer(segDowner.packetPtr); + segDowner.packetPtr += 188; + tsStream.parse(tsBuf, 0); + continue; // check again } // Okay, reading more is not possible. Let's call finish() and check again. if (!endOfFile){ endOfFile = true; // we reached the end of file tsStream.finish(); - MEDIUM_MSG("Finishing reading TS segment"); + VERYHIGH_MSG("Finishing reading TS segment"); continue; // Check again! } // No? Then we try to read the next file. - - // First we handle live playlist reloads, if needed - if (playlists[currentPlaylist].playlistType == LIVE){ - // Reload the first playlist that needs it, if the time is right - int a = getFirstPlaylistToReload(); - if (playlists[a].reloadNext <= Util::bootSecs()){ - MEDIUM_MSG("Reloading playlist %d", a); - playlists[a].reload(); - continue; - } + // + currentPlaylist = firstSegment(); + // No segments? Wait until next playlist reloading time. + if (currentPlaylist < 0){ + VERYHIGH_MSG("Waiting for segments..."); + if (nProxy.userClient.isAlive()){nProxy.userClient.keepAlive();} + Util::wait(500); + continue; } // Now that we know our playlist is up-to-date, actually try to read the file. - MEDIUM_MSG("Moving on to next TS segment"); + VERYHIGH_MSG("Moving on to next TS segment (variant %u)", currentPlaylist); if (readNextFile()){ MEDIUM_MSG("Next segment read successfully"); endOfFile = false; // no longer at end of file - // Prevent timeouts, we may have just finished a download after all. - if (playlists[currentPlaylist].playlistType == LIVE){ - nProxy.userClient.keepAlive(); - } continue; // Success! Continue regular parsing. } @@ -695,31 +785,18 @@ namespace Mist{ void inputHLS::readPMT(){ HIGH_MSG("readPMT()"); - if (playlists[currentPlaylist].isUrl()){ - size_t bpos; - TS::Packet tsBuffer; - const char *tmpPtr = playlists[currentPlaylist].segDL.data().data(); + size_t bpos; + TS::Packet tsBuffer; + const char *tmpPtr = segDowner.segDL.data().data(); - while (!tsStream.hasPacketOnEachTrack() && - (tmpPtr - playlists[currentPlaylist].segDL.data().data() + 188 <= - playlists[currentPlaylist].segDL.data().size())){ - tsBuffer.FromPointer(tmpPtr); - tsStream.parse(tsBuffer, 0); - tmpPtr += 188; - } - tsStream.partialClear(); - - }else{ - size_t bpos = in.tellg(); - in.seekg(0, in.beg); - TS::Packet tsBuffer; - while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromStream(in)){ - tsStream.parse(tsBuffer, 0); - } - tsStream.partialClear(); - in.clear(); - in.seekg(bpos, in.beg); + while (!tsStream.hasPacketOnEachTrack() && + (tmpPtr - segDowner.segDL.data().data() + 188 <= + segDowner.segDL.data().size())){ + tsBuffer.FromPointer(tmpPtr); + tsStream.parse(tsBuffer, 0); + tmpPtr += 188; } + tsStream.partialClear(); } // Note: bpos is overloaded here for playlist entry! @@ -752,31 +829,22 @@ namespace Mist{ currentPlaylist = getMappedTrackPlaylist(trackId); - Playlist &curPlaylist = playlists[currentPlaylist]; - playListEntries &entry = curPlaylist.entries.at(currentIndex); - if (curPlaylist.isUrl()){ - curPlaylist.loadSegment(curPlaylist.root.link(entry.filename)); - }else{ - in.close(); - in.open(curPlaylist.root.link(entry.filename).getFilePath().c_str()); - MEDIUM_MSG("Opening segment: %s", - curPlaylist.root.link(entry.filename).getFilePath().c_str()); - if (!in.good()){ - FAIL_MSG("Could not open segment (%s): %s", strerror(errno), - curPlaylist.root.link(entry.filename).getUrl().c_str()); - } + {//Lock mutex for listEntries + tthread::lock_guard guard(entryMutex); + std::deque &curPlaylist = listEntries[currentPlaylist]; + playListEntries &entry = curPlaylist.at(currentIndex); + segDowner.loadSegment(entry); } readPMT(); } int inputHLS::getEntryId(int playlistId, uint64_t bytePos){ if (bytePos == 0){return 0;} - - for (int i = 0; i < playlists[playlistId].entries.size(); i++){ - if (playlists[playlistId].entries.at(i).bytePos > bytePos){return i - 1;} + tthread::lock_guard guard(entryMutex); + for (int i = 0; i < listEntries[playlistId].size(); i++){ + if (listEntries[playlistId].at(i).bytePos > bytePos){return i - 1;} } - - return playlists[playlistId].entries.size() - 1; + return listEntries[playlistId].size() - 1; } uint64_t inputHLS::getOriginalTrackId(uint32_t playlistId, uint32_t id){ @@ -804,7 +872,13 @@ namespace Mist{ } /// Parses the main playlist, possibly containing variants. - bool inputHLS::initPlaylist(const std::string &uri){ + bool inputHLS::initPlaylist(const std::string &uri, bool fullInit){ + plsInitCount = 0; + plsTotalCount = 0; + { + tthread::lock_guard guard(entryMutex); + listEntries.clear(); + } std::string line; bool ret = false; startTime = Util::bootSecs(); @@ -894,7 +968,7 @@ namespace Mist{ if (codecSupported){ - ret = readPlaylist(playlistRootPath.link(line)); + ret = readPlaylist(playlistRootPath.link(line), fullInit); }else{ INFO_MSG("skipping variant playlist %s, none of the codecs are supported", playlistRootPath.link(line).getUrl().c_str()); @@ -910,13 +984,13 @@ namespace Mist{ int pos = line.find("URI"); if (pos != std::string::npos){ mediafile = line.substr(pos + 5, line.length() - pos - 6); - ret = readPlaylist(playlistRootPath.link(mediafile)); + ret = readPlaylist(playlistRootPath.link(mediafile), fullInit); } } }else if (line.compare(0, 7, "#EXTINF") == 0){ // current file is not a variant playlist, but regular playlist. - ret = readPlaylist(uri); + ret = readPlaylist(uri, fullInit); break; }else{ // ignore wrong lines @@ -926,15 +1000,36 @@ namespace Mist{ if (!isUrl){fileSource.close();} + uint32_t maxWait = 0; + unsigned int lastCount = 9999; + while (plsTotalCount != plsInitCount && ++maxWait < 50){ + if (plsInitCount != lastCount){ + lastCount = plsInitCount; + INFO_MSG("Waiting for variant playlists to load... %u/%u", lastCount, plsTotalCount); + } + Util::sleep(1000); + } + if (maxWait >= 50){ + WARN_MSG("Timeout waiting for variant playlists (%u/%u)", plsInitCount, plsTotalCount); + } + plsInitCount = 0; + plsTotalCount = 0; + return ret; } /// Function for reading every playlist. - bool inputHLS::readPlaylist(const HTTP::URL &uri){ - Playlist p(uri.protocol.size() ? uri.getUrl() : uri.getFilePath()); - p.id = playlists.size(); - // set size of reloadNext to playlist count with default value 0 - playlists.push_back(p); + bool inputHLS::readPlaylist(const HTTP::URL &uri, bool fullInit){ + std::string urlBuffer = (fullInit?"":";")+uri.getUrl(); + tthread::thread runList(playlistRunner, (void *)urlBuffer.data()); + runList.detach(); //Abandon the thread, it's now running independently + uint32_t timeout = 0; + while (urlBuffer.data()[0] && ++timeout < 100){ + Util::sleep(100); + } + if (timeout >= 100){ + WARN_MSG("Thread start timed out for: %s", urlBuffer.c_str()); + } return true; } @@ -942,53 +1037,25 @@ namespace Mist{ /// to be processed) bool inputHLS::readNextFile(){ tsStream.clear(); - Playlist &curList = playlists[currentPlaylist]; - if (!curList.entries.size()){ - WARN_MSG("no entries found in playlist: %d!", currentPlaylist); - return false; - } - - // URL-based - if (curList.isUrl()){ - if (!curList.loadSegment(curList.root.link(curList.entries.front().filename))){ - ERROR_MSG("Could not download segment: %s", - curList.root.link(curList.entries.front().filename).getUrl().c_str()); - curList.entries.pop_front(); - return readNextFile(); // Attempt to read another, if possible. + playListEntries ntry; + //This scope limiter prevents the recursion down below from deadlocking us + { + tthread::lock_guard guard(entryMutex); + std::deque &curList = listEntries[currentPlaylist]; + if (!curList.size()){ + WARN_MSG("no entries found in playlist: %d!", currentPlaylist); + return false; } - curList.entries.pop_front(); - return true; + ntry = curList.front(); + curList.pop_front(); } - // file-based, live - if (curList.playlistType == LIVE){ - in.close(); - std::string filepath = - curList.root.link(curList.entries.at(currentIndex).filename).getFilePath(); - curList.entries.pop_front(); // remove the item from the playlist - in.open(filepath.c_str()); - if (in.good()){return true;} - FAIL_MSG("Could not open segment (%s): %s", strerror(errno), filepath.c_str()); + if (!segDowner.loadSegment(ntry)){ + ERROR_MSG("Could not download segment: %s", ntry.filename.c_str()); return readNextFile(); // Attempt to read another, if possible. } - - // file-based, VoD - ++currentIndex; - if (curList.entries.size() <= currentIndex){ - HIGH_MSG("end of playlist reached (%u of %u)!", currentIndex, curList.entries.size()); - return false; - } - in.close(); - std::string filepath = - curList.root.link(curList.entries.at(currentIndex).filename).getFilePath(); - in.open(filepath.c_str()); - if (in.good()){ - readPMT(); - return true; - } - FAIL_MSG("Could not open segment (%s): %s", strerror(errno), filepath.c_str()); - return readNextFile(); + return true; } /// return the playlist id from which we need to read the first upcoming segment @@ -999,16 +1066,20 @@ namespace Mist{ if (selectedTracks.size() == 1){return getMappedTrackPlaylist(*selectedTracks.begin());} uint64_t firstTimeStamp = 0; int tmpId = -1; + int segCount = 0; - for (std::vector::iterator pListIt = playlists.begin(); pListIt != playlists.end(); + tthread::lock_guard guard(entryMutex); + for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end(); pListIt++){ - if (pListIt->entries.size()){ - if (pListIt->entries.front().timestamp < firstTimeStamp || tmpId < 0){ - firstTimeStamp = pListIt->entries.front().timestamp; - tmpId = pListIt->id; + segCount += pListIt->second.size(); + if (pListIt->second.size()){ + if (pListIt->second.front().timestamp < firstTimeStamp || tmpId < 0){ + firstTimeStamp = pListIt->second.front().timestamp; + tmpId = pListIt->first; } } } + MEDIUM_MSG("Active playlist: %d (%d segments total in queue)", tmpId, segCount); return tmpId; } diff --git a/src/input/input_hls.h b/src/input/input_hls.h index fcb88c92..6387d852 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -17,6 +17,10 @@ namespace Mist{ enum PlaylistType{VOD, LIVE, EVENT}; + + extern bool streamIsLive; + extern uint32_t globalWaitTime;//largest waitTime for any playlist we're loading - used to update minKeepAway + void parseKey(std::string key, char * newKey, unsigned int len); struct playListEntries{ std::string filename; @@ -24,25 +28,35 @@ namespace Mist{ float duration; unsigned int timestamp; unsigned int wait; + char ivec[16]; + char keyAES[16]; + }; + + /// Keeps the segment entry list by playlist ID + extern std::map > listEntries; + + class SegmentDownloader{ + public: + SegmentDownloader(); + HTTP::Downloader segDL; + const char *packetPtr; + bool loadSegment(const playListEntries & entry); + bool atEnd() const; }; class Playlist{ public: Playlist(const std::string &uriSrc = ""); - bool atEnd() const; bool isUrl() const; bool reload(); - void addEntry(const std::string &filename, float duration, uint64_t &totalBytes); - bool loadSegment(const HTTP::URL &uri); + void addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &keyIV); bool isSupportedFile(const std::string filename); std::string uri; // link to the current playlistfile HTTP::URL root; - HTTP::Downloader segDL; HTTP::Downloader plsDL; - const char *packetPtr; uint64_t reloadNext; uint32_t id; @@ -52,16 +66,13 @@ namespace Mist{ int waitTime; PlaylistType playlistType; - std::deque entries; unsigned int lastTimestamp; unsigned int startTime; + char keyAES[16]; + std::map keys; }; - struct entryBuffer{ - int timestamp; - playListEntries entry; - int playlistIndex; - }; + void playlistRunner(void * ptr); class inputHLS : public Input{ public: @@ -70,34 +81,26 @@ namespace Mist{ bool needsLock(); bool openStreamSource(); bool callback(); - protected: - // Private Functions - unsigned int startTime; PlaylistType playlistType; + SegmentDownloader segDowner; int version; int targetDuration; bool endPlaylist; int currentPlaylist; - - // std::vector entries; - std::vector playlists; - // std::vector pidMapping; + std::map pidMapping; std::map pidMappingR; int currentIndex; std::string currentFile; - std::ifstream in; TS::Stream tsStream; ///< Used for parsing the incoming ts stream Socket::Connection conn; TS::Packet tsBuf; - int getFirstPlaylistToReload(); - int firstSegment(); void waitForNextSegment(); void readPMT(); @@ -112,8 +115,8 @@ namespace Mist{ FILE *tsFile; bool readIndex(); - bool initPlaylist(const std::string &uri); - bool readPlaylist(const HTTP::URL &uri); + bool initPlaylist(const std::string &uri, bool fullInit = true); + bool readPlaylist(const HTTP::URL &uri, bool fullInit = true); bool readNextFile(); void parseStreamHeader();