From a7a09d22bd408c4f074584e57f01433fa0b57a9b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 20 Jan 2019 13:45:52 +0100 Subject: [PATCH] Fixed ts_stream class, improved HLS input, added https support to HLS analyser --- lib/ts_stream.cpp | 6 +++-- src/analysers/analyser_hls.cpp | 4 +-- src/input/input_hls.cpp | 47 ++++++++++++++-------------------- src/input/input_hls.h | 12 ++++----- src/io.cpp | 4 +++ 5 files changed, 35 insertions(+), 38 deletions(-) diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index d8984d50..b7678f49 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -302,7 +302,7 @@ namespace TS{ } std::deque &inStream = pesStreams[tid]; if (inStream.size() <= 1){ - FAIL_MSG("No PES packets to parse"); + if (!finished){FAIL_MSG("No PES packets to parse");} return; } // Find number of packets before unit Start @@ -842,11 +842,13 @@ namespace TS{ void Stream::initializeMetadata(DTSC::Meta &meta, size_t tid, size_t mappingId){ tthread::lock_guard guard(tMutex); + size_t mId = mappingId; + for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){ if (tid && it->first != tid){continue;} - size_t mId = it->first; + if (mId == 0){mId = it->first;} if (meta.tracks.count(mId) && meta.tracks[mId].codec.size()){continue;} diff --git a/src/analysers/analyser_hls.cpp b/src/analysers/analyser_hls.cpp index d06ecb1b..b9e331cc 100644 --- a/src/analysers/analyser_hls.cpp +++ b/src/analysers/analyser_hls.cpp @@ -67,8 +67,8 @@ void AnalyserHLS::stop(){ bool AnalyserHLS::open(const std::string &url){ root = HTTP::URL(url); - if (root.protocol != "http"){ - FAIL_MSG("Only http protocol is supported (%s not supported)", root.protocol.c_str()); + if (root.protocol != "http" && root.protocol != "https"){ + FAIL_MSG("Only http(s) protocol is supported (%s not supported)", root.protocol.c_str()); return false; } return true; diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 58af4b02..aaf64b8f 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -299,12 +299,12 @@ namespace Mist{ } void inputHLS::parseStreamHeader(){ - bool hasHeader = false; - if (!hasHeader){myMeta = DTSC::Meta();} + myMeta = DTSC::Meta(); + myMeta.live = false; + myMeta.vod = true; VERYHIGH_MSG("parsestream"); TS::Packet packet; // to analyse and extract data int counter = 1; - int packetId = 0; char *data; unsigned int dataLen; @@ -320,7 +320,6 @@ namespace Mist{ if (pListIt->isUrl()){ bool ret = false; - continueNegotiate(); nProxy.userClient.keepAlive(); ret = pListIt->loadSegment(pListIt->root.link(entryIt->filename)); keepReading = packet.FromPointer(pListIt->packetPtr); @@ -347,11 +346,11 @@ namespace Mist{ DTSC::Packet headerPack; tsStream.getEarliestPacket(headerPack); int tmpTrackId = headerPack.getTrackId(); - packetId = pidMapping[(pListIt->id << 16) + tmpTrackId]; + uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId]; if (packetId == 0){ - pidMapping[(pListIt->id << 16) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (pListIt->id << 16) + headerPack.getTrackId(); + pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter; + pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId(); packetId = counter; HIGH_MSG("Added file %s, trackid: %d, mapped to: %d", pListIt->root.link(entryIt->filename).getUrl().c_str(), @@ -359,11 +358,7 @@ namespace Mist{ counter++; } - myMeta.live = false; - myMeta.vod = true; - - if (!hasHeader && - (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ + if ((!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); myMeta.tracks[packetId].minKeepAway = pListIt->waitTime * 2000; VERYHIGH_MSG("setting minKeepAway = %d for track: %d", @@ -385,8 +380,6 @@ namespace Mist{ in.close(); } tsStream.clear(); - - if (hasHeader){return;} in.close(); } @@ -411,7 +404,6 @@ namespace Mist{ TS::Packet packet; // to analyse and extract data int counter = 1; - int packetId = 0; char *data; size_t dataLen; @@ -459,11 +451,11 @@ namespace Mist{ tsStream.getEarliestPacket(headerPack); int tmpTrackId = headerPack.getTrackId(); - packetId = pidMapping[(pListIt->id << 16) + tmpTrackId]; + uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId]; if (packetId == 0){ - pidMapping[(pListIt->id << 16) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (pListIt->id << 16) + headerPack.getTrackId(); + pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter; + pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId(); packetId = counter; counter++; } @@ -504,11 +496,11 @@ namespace Mist{ tsStream.getEarliestPacket(headerPack); while (headerPack){ int tmpTrackId = headerPack.getTrackId(); - packetId = pidMapping[(pListIt->id << 16) + tmpTrackId]; + uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId]; if (packetId == 0){ - pidMapping[(pListIt->id << 16) + headerPack.getTrackId()] = counter; - pidMappingR[counter] = (pListIt->id << 16) + headerPack.getTrackId(); + pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter; + pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId(); packetId = counter; INFO_MSG("Added file %s, trackid: %d, mapped to: %d", pListIt->root.link(entryIt->filename).getUrl().c_str(), @@ -599,7 +591,6 @@ namespace Mist{ while (Util::bootSecs() < playlists[a].reloadNext && (needsLock() || nProxy.userClient.isAlive())){ Util::wait(1000); - continueNegotiate(); nProxy.userClient.keepAlive(); } MEDIUM_MSG("Reloading playlist %d", a); @@ -621,7 +612,7 @@ namespace Mist{ // Yes? Excellent! Read and return it. if (hasPacket){ // Read - if (playlists[currentPlaylist].playlistType == LIVE){ + if (myMeta.live){ tsStream.getEarliestPacket(thisPacket); tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); }else{ @@ -630,6 +621,7 @@ namespace Mist{ if (!thisPacket){ FAIL_MSG("Could not getNext TS packet!"); }else{ + DONTEVEN_MSG("Packet track %lu @ time %" PRIu64 " ms", tid, thisPacket.getTime()); // overwrite trackId on success Bit::htobl(thisPacket.getData() + 8, tid); } @@ -680,7 +672,6 @@ namespace Mist{ endOfFile = false; // no longer at end of file // Prevent timeouts, we may have just finished a download after all. if (playlists[currentPlaylist].playlistType == LIVE){ - continueNegotiate(); nProxy.userClient.keepAlive(); } continue; // Success! Continue regular parsing. @@ -780,13 +771,13 @@ namespace Mist{ return playlists[playlistId].entries.size() - 1; } - int inputHLS::getOriginalTrackId(int playlistId, int id){ - return pidMapping[(playlistId << 16) + id]; + uint64_t inputHLS::getOriginalTrackId(uint32_t playlistId, uint32_t id){ + return pidMapping[(((uint64_t)playlistId) << 32) + id]; } - int inputHLS::getMappedTrackId(int id){return (pidMappingR[id] & 0xFFFF);} + uint32_t inputHLS::getMappedTrackId(uint64_t id){return (pidMappingR[id] & 0xFFFFFFFFull);} - int inputHLS::getMappedTrackPlaylist(int id){return (pidMappingR[id] >> 16);} + uint32_t inputHLS::getMappedTrackPlaylist(uint64_t id){return (pidMappingR[id] >> 32);} /// Parses the main playlist, possibly containing variants. bool inputHLS::initPlaylist(const std::string &uri){ diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 7188bd5c..fcb88c92 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -45,7 +45,7 @@ namespace Mist{ const char *packetPtr; uint64_t reloadNext; - int id; + uint32_t id; bool playlistEnd; int noChangeCount; uint64_t lastFileIndex; @@ -84,8 +84,8 @@ namespace Mist{ // std::vector entries; std::vector playlists; // std::vector pidMapping; - std::map pidMapping; - std::map pidMappingR; + std::map pidMapping; + std::map pidMappingR; int currentIndex; std::string currentFile; @@ -118,9 +118,9 @@ namespace Mist{ void parseStreamHeader(); - int getMappedTrackId(int id); - int getMappedTrackPlaylist(int id); - int getOriginalTrackId(int playlistId, int id); + uint32_t getMappedTrackId(uint64_t id); + uint32_t getMappedTrackPlaylist(uint64_t id); + uint64_t getOriginalTrackId(uint32_t playlistId, uint32_t id); int getEntryId(int playlistId, uint64_t bytePos); }; }// namespace Mist diff --git a/src/io.cpp b/src/io.cpp index ece62c48..17eb4fc5 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -733,6 +733,10 @@ namespace Mist { tmpMeta.tracks[newTid] = myMeta.tracks[tid]; tmpMeta.tracks[newTid].trackID = newTid; JSON::Value tmpVal = tmpMeta.toJSON(); + if (!myMeta.tracks[tid].type.size() || !myMeta.tracks[tid].codec.size()){ + FAIL_MSG("Negotiating a track without metadata. This is a serious issue, please report this to the developers."); + BACKTRACE; + } std::string tmpStr = tmpVal.toNetPacked(); memcpy(metaPages[tid].mapped, tmpStr.data(), tmpStr.size()); HIGH_MSG("Temporary metadata written for incoming track %lu, handling as track %lu", tid, newTid);