From 31403f268547f7d8453011cdae9d8d658f18366f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 9 May 2018 09:24:12 +0200 Subject: [PATCH] HLS input rewrite/optimize/clarify/fun-ify --- src/input/input_hls.cpp | 957 +++++++++++++++++----------------------- src/input/input_hls.h | 32 +- 2 files changed, 411 insertions(+), 578 deletions(-) diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index bc9e9b27..07011e07 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -23,199 +23,116 @@ #define SEM_TS_CLAIM "/MstTSIN%s" namespace Mist{ - // remove trailing \r for windows generated playlist files + // These are used in the HTTP::Downloader callback, to prevent timeouts when downloading + // segments/playlists. + inputHLS *self = 0; + bool callbackFunc(){return self->callback();} - inputHLS* self = 0; - - bool callbackFunc(){ - return self->callback(); + /// Called by the global callbackFunc, to prevent timeouts + bool inputHLS::callback(){ + if (nProxy.userClient.isAlive()){nProxy.userClient.keepAlive();} + return config->is_active; } + /// Helper function that removes trailing \r characters void cleanLine(std::string &s){ if (s.length() > 0 && s.at(s.length() - 1) == '\r'){s.erase(s.size() - 1);} } Playlist::Playlist(const std::string &uriSrc){ + INFO_MSG("Adding variant playlist: %s", uriSrc.c_str()); + segDL.progressCallback = callbackFunc; + segDL.dataTimeout = 5; + segDL.retryCount = 5; + plsDL.progressCallback = callbackFunc; + plsDL.dataTimeout = 15; + plsDL.retryCount = 8; lastFileIndex = 0; - entryCount = 0; waitTime = 2; playlistEnd = false; noChangeCount = 0; - initDone = false; lastTimestamp = 0; uri = uriSrc; + root = HTTP::URL(uri); startTime = Util::bootSecs(); - - - if (uri.size()){ - std::string line; - std::string key; - std::string val; - int count = 0; - uint64_t totalBytes = 0; - uri_root = uri.substr(0, uri.rfind("/") + 1); - root = HTTP::URL(uri_root); - - playlistType = LIVE; // Temporary value - INFO_MSG("Readplaylist: %s", uri.c_str()); - - std::istringstream urlSource; - std::ifstream fileSource; - - if (isUrl()){ - loadURL(uri); - urlSource.str(source); - }else{ - fileSource.open(uri.c_str()); - } - - std::istream &input = (isUrl() ? (std::istream &)urlSource : (std::istream &)fileSource); - std::getline(input, line); - - while (std::getline(input, line)){ - cleanLine(line); - - if (!line.empty()){ - if (line.compare(0, 7, "#EXT-X-") == 0){ - size_t pos = line.find(":"); - key = line.substr(7, pos - 7); - val = line.c_str() + pos + 1; - - if (key == "VERSION"){version = atoi(val.c_str());} - - if (key == "TARGETDURATION"){waitTime = atoi(val.c_str())/2 ;} - - if (key == "MEDIA-SEQUENCE"){ - media_sequence = atoi(val.c_str()); - lastFileIndex = media_sequence; - } - - if (key == "PLAYLIST-TYPE"){ - if (val == "VOD"){ - playlistType = VOD; - }else if (val == "LIVE"){ - playlistType = LIVE; - }else if (val == "EVENT"){ - playlistType = EVENT; - } - } - - if (key == "ENDLIST"){ - // end of playlist reached! - playlistEnd = true; - playlistType = VOD; - } - continue; - }else if (line.compare(0, 7, "#EXTINF") != 0){ - VERYHIGH_MSG("ignoring wrong line: %s.", line.c_str()); - continue; - } - float f = atof(line.c_str() + 8); - std::string filename; - std::getline(input, filename); - - DEBUG_MSG(DLVL_HIGH, "Adding entry %s", filename.c_str()); - addEntry(filename, f, totalBytes); - count++; - } - } - - if (isUrl()){ - playlistType = LIVE; // VOD over HTTP needs to be processed as LIVE. - fileSource.close(); - } - } - initDone = true; + if (uri.size()){reload();} } - bool Playlist::atEnd() const{return (packetPtr - source.data() + 188) > source.size();} - - bool Playlist::isUrl() const{ - return uri_root.find("://") != std::string::npos; + /// Returns true if packetPtr is at the end of the current segment. + bool Playlist::atEnd() const{ + return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); } - bool inputHLS::callback(){ - if(nProxy.userClient.isAlive()){ - nProxy.userClient.keepAlive(); - } + /// Returns true if there is no protocol defined in the playlist root URL. + bool Playlist::isUrl() const{return root.protocol.size();} - return config->is_active; - } - - bool Playlist::loadURL(const std::string &loadUrl){ - HTTP::URL url(loadUrl); - - //root = HTTP::URL(loadUrl); - HTTP::URL root = HTTP::URL(loadUrl); - if (root.protocol != "http" && root.protocol != "https"){ - FAIL_MSG("Only http(s) protocols are supported (%s not supported)", root.protocol.c_str()); + /// Loads the given segment URL into the segment buffer. + bool Playlist::loadSegment(const HTTP::URL &uri){ + if (!segDL.get(uri)){ + FAIL_MSG("failed download: %s", uri.getUrl().c_str()); return false; } - DL.progressCallback = callbackFunc; - - if (DL.get(root)){ - if (DL.isOk()){ - DEBUG_MSG(DLVL_HIGH, "statusCode: %d, statusText: ", DL.getStatusCode()); - // DL.getStatusText().c_str()); - if (DL.getHeader("Content-Length") != ""){ - if (DL.data().size() != atoi(DL.getHeader("Content-Length").c_str())){ - FAIL_MSG("Expected %s bytes of data, but only received %lu.", - DL.getHeader("Content-Length").c_str(), DL.data().size()); - return false; - } - } - - //check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes - if(DL.data().data()[0] == 0x47) { - if (DL.data().size() % 188){ - FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s", DL.data().size(), loadUrl.c_str()); - } - }else if(DL.data().data()[5] == 0x47){ - if (DL.data().size() % 192){ - FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s", DL.data().size(), loadUrl.c_str()); - } - } - - }else{ - FAIL_MSG("HTTP response not OK!. statuscode: %d, statustext: %s", DL.getStatusCode(), DL.getStatusText().c_str()); - } - }else{ - FAIL_MSG("failed url get()"); + if (!segDL.isOk()){ + FAIL_MSG("HTTP response not OK!. statuscode: %d, statustext: %s", segDL.getStatusCode(), + segDL.getStatusText().c_str()); + return false; } + if (segDL.getHeader("Content-Length") != ""){ + if (segDL.data().size() != atoi(segDL.getHeader("Content-Length").c_str())){ + FAIL_MSG("Expected %s bytes of data, but only received %lu.", + segDL.getHeader("Content-Length").c_str(), segDL.data().size()); + return false; + } + } - source.clear(); - source = DL.data(); + // check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes + if (segDL.data().data()[0] == 0x47){ + if (segDL.data().size() % 188){ + FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s", + segDL.data().size(), uri.getUrl().c_str()); + return false; + } + }else if (segDL.data().data()[5] == 0x47){ + if (segDL.data().size() % 192){ + FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s", + segDL.data().size(), uri.getUrl().c_str()); + return false; + } + } - packetPtr = 0; - packetPtr = source.data(); - - return DL.isOk(); + packetPtr = segDL.data().data(); + return true; } - /// Function for reloading the playlist in case of live streams. + /// Handles both initial load and future reloads. + /// Returns how many segments were added to the internal segment list. bool Playlist::reload(){ - int skip = lastFileIndex - media_sequence; - bool ret = false; + uint64_t fileNo = 0; std::string line; std::string key; std::string val; int count = 0; - - std::string all; - uint64_t totalBytes = 0; + playlistType = LIVE; // Temporary value + std::istringstream urlSource; std::ifstream fileSource; if (isUrl()){ - loadURL(uri.c_str()); // get size only! - urlSource.str(source); + if (!plsDL.get(uri) || !plsDL.isOk()){ + FAIL_MSG("Could not download playlist, aborting."); + return false; + } + urlSource.str(plsDL.data()); }else{ fileSource.open(uri.c_str()); + if (!fileSource.good()){ + FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), uri.c_str()); + } } std::istream &input = (isUrl() ? (std::istream &)urlSource : (std::istream &)fileSource); @@ -223,58 +140,71 @@ namespace Mist{ while (std::getline(input, line)){ cleanLine(line); - all.append(line); - if (line.compare(0, 21, "#EXT-X-MEDIA-SEQUENCE") == 0){ - media_sequence = atoi(line.c_str() + line.find(":") + 1); - skip = (lastFileIndex - media_sequence); - INFO_MSG("media seq: %d, skip: %d", media_sequence, skip); + if (line.empty()){continue;}// skip empty lines + + if (line.compare(0, 7, "#EXT-X-") == 0){ + size_t pos = line.find(":"); + key = line.substr(7, pos - 7); + val = line.c_str() + pos + 1; + + if (key == "TARGETDURATION"){ + waitTime = atoi(val.c_str()) / 2; + if (waitTime < 2){waitTime = 2;} + } + + if (key == "MEDIA-SEQUENCE"){fileNo = atoll(val.c_str());} + + if (key == "PLAYLIST-TYPE"){ + if (val == "VOD"){ + playlistType = VOD; + }else if (val == "LIVE"){ + playlistType = LIVE; + }else if (val == "EVENT"){ + playlistType = EVENT; + } + } + + if (key == "ENDLIST"){ + // end of playlist reached! + playlistEnd = true; + playlistType = VOD; + } continue; } - if (line.compare(0, 7, "#EXTINF") != 0){continue;} + if (line.compare(0, 7, "#EXTINF") != 0){ + VERYHIGH_MSG("ignoring line: %s.", line.c_str()); + continue; + } + float f = atof(line.c_str() + 8); - // next line belongs to this item std::string filename; std::getline(input, filename); - all.append(line); // check for already added segments - if (skip){ - //INFO_MSG("skipping file: %s", filename.c_str()); - skip--; - }else{ - - if (count == 0){ - // clear the whole buffer with entries, and insert only new segments - // INFO_MSG("clear entries"); - // entries.clear(); - } - + if (fileNo >= lastFileIndex){ cleanLine(filename); - DEBUG_MSG(DLVL_HIGH, "Adding segment %s", filename.c_str()); addEntry(filename, f, totalBytes); - count++; + lastFileIndex = fileNo + 1; + ++count; } + ++fileNo; } - if (!isUrl()){fileSource.close();} - - ret = (count > 0); - - if (ret){ - noChangeCount = 0; + // VOD over HTTP needs to be processed as LIVE. + if (isUrl()){ + playlistType = LIVE; }else{ - INFO_MSG("no changes.."); - ++noChangeCount; - if (noChangeCount > 3){VERYHIGH_MSG("enough!");} + fileSource.close(); } - return ret; + reloadNext = Util::bootSecs() + waitTime; + return (count > 0); } bool Playlist::isSupportedFile(const std::string filename){ - //only ts files - if(filename.find_last_of(".") != std::string::npos){ - std::string ext = filename.substr(filename.find_last_of(".")+1); + // only ts files + if (filename.find_last_of(".") != std::string::npos){ + std::string ext = filename.substr(filename.find_last_of(".") + 1); if (ext.compare(0, 2, "ts") == 0){ return true; @@ -283,72 +213,56 @@ namespace Mist{ return false; } } - //No extension. We assume it's fine. + // No extension. We assume it's fine. return true; } - /// function for adding segments to the playlist to be processed. used for VOD - /// and live + /// Adds playlist segments to be processed void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes){ - if(!isSupportedFile(filename)){ + if (!isSupportedFile(filename)){ WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); return; } + MEDIUM_MSG("Adding segment (%d): %s", lastFileIndex, filename.c_str()); + playListEntries entry; entry.filename = filename; cleanLine(entry.filename); - std::string test = uri_root + entry.filename; - - std::istringstream urlSource; - std::ifstream fileSource; - - if (isUrl()){ - urlSource.str(source); - }else{ - fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); - if ((fileSource.rdstate() & std::ifstream::failbit) != 0){ - WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno)); - } - } - entry.bytePos = totalBytes; entry.duration = duration; - if (!isUrl()){totalBytes += fileSource.tellg();} - if (initDone || (entryCount > 2)){ - lastTimestamp += duration; - entry.timestamp = lastTimestamp + startTime ; - }else{ - INFO_MSG("set timestamp ZERO, load immediately!"); - entry.timestamp = 0; // read all segments immediatly at the beginning, then use delays - //FAIL_MSG("e timestamp %llu", entry.timestamp); + if (!isUrl()){ + std::ifstream fileSource; + std::string test = root.link(entry.filename).getFilePath(); + fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); + if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));} + totalBytes += fileSource.tellg(); } - ++entryCount; + + entry.timestamp = lastTimestamp + startTime; + lastTimestamp += duration; entries.push_back(entry); - ++lastFileIndex; } /// Constructor of HLS Input inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){ self = this; currentPlaylist = 0; - + capa["name"] = "HLS"; - capa["desc"] = "This input allows you to both play Video on Demand and live HLS streams stored on the filesystem, as well as pull live HLS streams over HTTP and HTTPS."; + capa["desc"] = "This input allows you to both play Video on Demand and live HLS streams stored " + "on the filesystem, as well as pull live HLS streams over HTTP and HTTPS."; capa["source_match"].append("/*.m3u8"); capa["source_match"].append("/*.m3u"); capa["source_match"].append("http://*.m3u8"); capa["source_match"].append("http://*.m3u"); capa["source_match"].append("https://*.m3u8"); capa["source_match"].append("https://*.m3u"); - // These two can/may be set to always-on mode - capa["always_match"].append("/*.m3u8"); - capa["always_match"].append("/*.m3u"); - capa["always_match"].append("http://*.m3u8"); - capa["always_match"].append("http://*.m3u"); - capa["always_match"].append("https://*.m3u8"); - capa["always_match"].append("https://*.m3u"); + capa["source_match"].append("https-hls://*"); + capa["source_match"].append("http-hls://*"); + // All URLs can be set to always-on mode. + capa["always_match"] = capa["source_match"]; capa["priority"] = 9ll; capa["codecs"][0u][0u].append("H264"); @@ -366,18 +280,7 @@ namespace Mist{ bool inputHLS::checkArguments(){ config->is_active = true; if (config->getString("input") == "-"){return false;} - if (!initPlaylist(config->getString("input"))){return false;} - - if (Util::Config::printDebugLevel >= DLVL_HIGH){ - for (std::vector::iterator pListIt = playlists.begin(); pListIt != playlists.end(); - pListIt++){ - int j = 0; - for (std::deque::iterator entryIt = pListIt->entries.begin(); - entryIt != pListIt->entries.end(); entryIt++){ - } - } - } return true; } @@ -398,7 +301,7 @@ namespace Mist{ void inputHLS::parseStreamHeader(){ bool hasHeader = false; if (!hasHeader){myMeta = DTSC::Meta();} -INFO_MSG("parsestream"); + VERYHIGH_MSG("parsestream"); TS::Packet packet; // to analyse and extract data int counter = 1; int packetId = 0; @@ -417,24 +320,25 @@ INFO_MSG("parsestream"); if (pListIt->isUrl()){ bool ret = false; - continueNegotiate(); - nProxy.userClient.keepAlive(); -//FAIL_MSG("parsestreamheader url: root: %s,uri_root: %s, entry: %s", pListIt->root.getUrl().c_str(), pListIt->uri_root.c_str(), entryIt->filename.c_str()); - ret = pListIt->loadURL(pListIt->root.link(entryIt->filename).getUrl().c_str()); - //ret = pListIt->loadURL(pListIt->uri_root + entryIt->filename); - + continueNegotiate(); + nProxy.userClient.keepAlive(); + ret = pListIt->loadSegment(pListIt->root.link(entryIt->filename)); keepReading = packet.FromPointer(pListIt->packetPtr); pListIt->packetPtr += 188; }else{ - in.open((pListIt->uri_root + entryIt->filename).c_str()); + in.open(pListIt->root.link(entryIt->filename).getUrl().c_str()); + if (!in.good()){ + FAIL_MSG("Could not open segment (%s): %s", strerror(errno), + pListIt->root.link(entryIt->filename).getFilePath().c_str()); + continue; // skip to the next one + } keepReading = packet.FromStream(in); } while (keepReading){ tsStream.parse(packet, lastBpos); if (pListIt->isUrl()){ - lastBpos = entryIt->bytePos + pListIt->source.size(); - ///\todo get size... + lastBpos = entryIt->bytePos + pListIt->segDL.data().size(); }else{ lastBpos = entryIt->bytePos + in.tellg(); } @@ -450,17 +354,11 @@ INFO_MSG("parsestream"); pidMappingR[counter] = (pListIt->id << 16) + headerPack.getTrackId(); packetId = counter; HIGH_MSG("Added file %s, trackid: %d, mapped to: %d", - (pListIt->uri_root + entryIt->filename).c_str(), headerPack.getTrackId(), - counter); + pListIt->root.link(entryIt->filename).getUrl().c_str(), + headerPack.getTrackId(), counter); counter++; } - myMeta.live = (playlists.size() && playlists[0].playlistType == LIVE); - myMeta.vod = !myMeta.live; - - // myMeta.live = true; - // myMeta.vod = false; - myMeta.live = false; myMeta.vod = true; @@ -468,7 +366,8 @@ INFO_MSG("parsestream"); (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){ tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); myMeta.tracks[packetId].minKeepAway = pListIt->waitTime * 2000; - FAIL_MSG("setting minKeepAway = %d for track: %d", myMeta.tracks[packetId].minKeepAway, packetId); + VERYHIGH_MSG("setting minKeepAway = %d for track: %d", + myMeta.tracks[packetId].minKeepAway, packetId); } } @@ -524,20 +423,22 @@ INFO_MSG("parsestream"); for (std::deque::iterator entryIt = pListIt->entries.begin(); entryIt != pListIt->entries.end(); entryIt++){ - // WORK tsStream.partialClear(); endOfFile = false; if (pListIt->isUrl()){ - pListIt->loadURL(pListIt->uri_root + entryIt->filename); - urlSource.str(pListIt->source); - + pListIt->loadSegment(pListIt->root.link(entryIt->filename)); endOfFile = !pListIt->atEnd(); if (!endOfFile){packet.FromPointer(pListIt->packetPtr);} pListIt->packetPtr += 188; }else{ in.close(); - in.open((pListIt->uri_root + entryIt->filename).c_str()); + in.open(pListIt->root.link(entryIt->filename).getFilePath().c_str()); + if (!in.good()){ + FAIL_MSG("Could not open segment (%s): %s", strerror(errno), + pListIt->root.link(entryIt->filename).getFilePath().c_str()); + continue; // skip to the next one + } packet.FromStream(in); endOfFile = in.eof(); } @@ -548,7 +449,7 @@ INFO_MSG("parsestream"); tsStream.parse(packet, lastBpos); if (pListIt->isUrl()){ - lastBpos = entryIt->bytePos + pListIt->source.size(); + lastBpos = entryIt->bytePos + pListIt->segDL.data().size(); }else{ lastBpos = entryIt->bytePos + in.tellg(); } @@ -564,9 +465,6 @@ INFO_MSG("parsestream"); pidMapping[(pListIt->id << 16) + headerPack.getTrackId()] = counter; pidMappingR[counter] = (pListIt->id << 16) + headerPack.getTrackId(); packetId = counter; - INFO_MSG("Added file %s, trackid: %d, mapped to: %d", - (pListIt->uri_root + entryIt->filename).c_str(), headerPack.getTrackId(), - counter); counter++; } @@ -613,8 +511,8 @@ INFO_MSG("parsestream"); pidMappingR[counter] = (pListIt->id << 16) + headerPack.getTrackId(); packetId = counter; INFO_MSG("Added file %s, trackid: %d, mapped to: %d", - (pListIt->uri_root + entryIt->filename).c_str(), headerPack.getTrackId(), - counter); + pListIt->root.link(entryIt->filename).getUrl().c_str(), + headerPack.getTrackId(), counter); counter++; } @@ -664,186 +562,148 @@ INFO_MSG("parsestream"); bool inputHLS::openStreamSource(){return true;} int inputHLS::getFirstPlaylistToReload(){ - // at this point, we need to check which playlist we need to reload, and keep - // reading from that - // playlist until EndOfPlaylist - std::vector::iterator result = std::min_element(reloadNext.begin(), reloadNext.end()); - return std::distance(reloadNext.begin(), result); + int plsNum = 0; + int bestPls = 0; + uint64_t earRld = 0; + for (std::vector::iterator it = playlists.begin(); it != playlists.end(); ++it){ + if (!plsNum || it->reloadNext < earRld){ + bestPls = plsNum; + earRld = it->reloadNext; + } + ++plsNum; + } + return bestPls; } void inputHLS::getNext(bool smart){ + currentPlaylist = firstSegment(); INSANE_MSG("Getting next"); - uint32_t tid; - bool hasPacket = false; - bool keepReading = false; + uint32_t tid = 0; bool endOfFile = false; - bool doReload = false; - + if (selectedTracks.size()){tid = *selectedTracks.begin();} thisPacket.null(); + while (config->is_active && (needsLock() || nProxy.userClient.isAlive())){ + int oldPlaylist = currentPlaylist; + currentPlaylist = firstSegment(); - while (!hasPacket && config->is_active && (needsLock() || nProxy.userClient.isAlive())){ - - if (playlists[currentPlaylist].isUrl()){ - - endOfFile = playlists[currentPlaylist].atEnd(); - if (!endOfFile){ - tsBuf.FromPointer(playlists[currentPlaylist].packetPtr); - playlists[currentPlaylist].packetPtr += 188; - } - - }else{ - tsBuf.FromStream(in); - endOfFile = in.eof(); + // If we have a new playlist, print that info. + if (currentPlaylist >= 0 && oldPlaylist != currentPlaylist){ + MEDIUM_MSG("Switched to playlist %d", currentPlaylist); } - // eof flag is set after unsuccesful read, so check again - if (endOfFile){tsStream.finish();} + // No segments? Wait until next playlist reloading time. + if (currentPlaylist < 0){ + int a = getFirstPlaylistToReload(); + MEDIUM_MSG("Waiting for %d seconds until next playlist reload...", + playlists[a].reloadNext - Util::bootSecs()); + while (Util::bootSecs() < playlists[a].reloadNext && + (needsLock() || nProxy.userClient.isAlive())){ + Util::wait(1000); + continueNegotiate(); + nProxy.userClient.keepAlive(); + } + MEDIUM_MSG("Reloading playlist %d", a); + playlists[a].reload(); + currentPlaylist = firstSegment(); + // Continue regular parsing, in case we need another reload. + currentPlaylist = oldPlaylist; + continue; + } + // Check if we have a packet + bool hasPacket = false; if (playlists[currentPlaylist].playlistType == LIVE){ hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket()); }else{ - - if (!selectedTracks.size()){return;} - - tid = *selectedTracks.begin(); hasPacket = tsStream.hasPacket(getMappedTrackId(tid)); } - if (endOfFile && !hasPacket){ - //INFO_MSG("endoffile and no packet"); + // Yes? Excellent! Read and return it. + if (hasPacket){ + // Read if (playlists[currentPlaylist].playlistType == LIVE){ - int a = getFirstPlaylistToReload(); - int segmentTime = 30; - //INFO_MSG("need to reload playlist %d, time: %d", a, reloadNext[a] - Util::bootSecs()); - - int f = firstSegment(); - if (f >= 0){segmentTime = playlists[f].entries.front().timestamp - Util::bootSecs();} - - int playlistTime = reloadNext.at(currentPlaylist) - Util::bootSecs() - 1; - - playlistTime = reloadNext[a] - Util::bootSecs(); - - if (playlistTime < 1 && playlists[currentPlaylist].playlistEnd == false){ - // update reloadTime before reading the playlist - reloadNext.at(playlists[a].id) = Util::bootSecs() + playlists[a].waitTime; - playlists[a].reload(); - } - -waitForNextSegment(); - - if (f < 0){ - while (Util::bootSecs() < reloadNext[a] && - (needsLock() || nProxy.userClient.isAlive())){ - Util::wait(1000); - continueNegotiate(); - nProxy.userClient.keepAlive(); - } - INFO_MSG("reloading playlist"); - - reloadNext.at(playlists[a].id) = Util::bootSecs() + playlists[a].waitTime; - - if(playlists[a].playlistEnd && playlists[a].entries.size() == 0) - { - INFO_MSG("No more entries, stop reloading"); - thisPacket.null(); - return; - - } - playlists[a].reload(); - } - } - - int b = Util::bootSecs(); - - if (!readNextFile()){ -continueNegotiate(); - nProxy.userClient.keepAlive(); - if (playlists[currentPlaylist].playlistType != LIVE){return;} - // need to reload all available playlists. update the map with the - // amount of ms to wait - // before the next check. - - // set specific elements with the correct bootsecs() - reloadNext.at(currentPlaylist) = b + playlists[currentPlaylist].waitTime; - - int timeToWait = reloadNext.at(currentPlaylist) - Util::bootSecs(); -//INFO_MSG("readnextfile if"); - // at this point, we need to check which playlist we need to reload, and - // keep reading from - // that playlist until EndOfPlaylist - std::vector::iterator result = - std::min_element(reloadNext.begin(), reloadNext.end()); - int playlistToReload = std::distance(reloadNext.begin(), result); - currentPlaylist = playlistToReload; - - // dont wait the first time. - if (timeToWait > 0 && playlists[currentPlaylist].initDone && - playlists[currentPlaylist].noChangeCount > 0){ - if (timeToWait > playlists[currentPlaylist].waitTime){ - WARN_MSG("something is not right..."); - return; - } - - if (playlists[currentPlaylist].noChangeCount < 2){ - timeToWait /= 2; // wait half of the segment size when no segments are found. - } - } - - if (playlists[currentPlaylist].playlistEnd && playlists[currentPlaylist].playlistType != LIVE){ - INFO_MSG("Playlist %d has reached his end!"); - thisPacket.null(); - return; - } - } - - if (playlists[currentPlaylist].isUrl()){ - endOfFile = playlists[currentPlaylist].atEnd(); - if (!endOfFile){ - tsBuf.FromPointer(playlists[currentPlaylist].packetPtr); - playlists[currentPlaylist].packetPtr += 188; - } + tsStream.getEarliestPacket(thisPacket); + tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); }else{ + tsStream.getPacket(getMappedTrackId(tid), thisPacket); + } + if (!thisPacket){ + FAIL_MSG("Could not getNext TS packet!"); + }else{ + // overwrite trackId on success + Bit::htobl(thisPacket.getData() + 8, tid); + } + return; // Success! + } + + // No? Let's read some more data and check again. + if (playlists[currentPlaylist].isUrl()){ + if (!playlists[currentPlaylist].atEnd()){ + tsBuf.FromPointer(playlists[currentPlaylist].packetPtr); + playlists[currentPlaylist].packetPtr += 188; + tsStream.parse(tsBuf, 0); + continue; // check again + } + }else{ + if (in.good()){ tsBuf.FromStream(in); - endOfFile = in.eof(); + tsStream.parse(tsBuf, 0); + continue; // check again } } + // Okay, reading more is not possible. Let's call finish() and check again. if (!endOfFile){ - tsStream.parse(tsBuf, 0); - if (playlists[currentPlaylist].playlistType == LIVE){ - hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket()); - }else{ - hasPacket = tsStream.hasPacket(getMappedTrackId(tid)); + endOfFile = true; // we reached the end of file + tsStream.finish(); + MEDIUM_MSG("Finishing reading TS segment"); + continue; // Check again! + } + + // No? Then we try to read the next file. + + // First we handle live playlist reloads, if needed + if (playlists[currentPlaylist].playlistType == LIVE){ + // Reload the first playlist that needs it, if the time is right + int a = getFirstPlaylistToReload(); + if (playlists[a].reloadNext <= Util::bootSecs()){ + MEDIUM_MSG("Reloading playlist %d", a); + playlists[a].reload(); + continue; } } - } - if (playlists[currentPlaylist].playlistType == LIVE){ - tsStream.getEarliestPacket(thisPacket); - tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); - }else{ - tsStream.getPacket(getMappedTrackId(tid), thisPacket); - } + // Now that we know our playlist is up-to-date, actually try to read the file. + MEDIUM_MSG("Moving on to next TS segment"); + if (readNextFile()){ + MEDIUM_MSG("Next segment read successfully"); + endOfFile = false; // no longer at end of file + // Prevent timeouts, we may have just finished a download after all. + if (playlists[currentPlaylist].playlistType == LIVE){ + continueNegotiate(); + nProxy.userClient.keepAlive(); + } + continue; // Success! Continue regular parsing. + } - if (!thisPacket){ - FAIL_MSG("Could not getNExt TS packet!"); + // Nothing works! + // HLS input will now quit trying to prevent severe mental depression. + INFO_MSG("No packets can be read - exhausted all playlists"); + thisPacket.null(); return; } - - // overwrite trackId - Bit::htobl(thisPacket.getData() + 8, tid); } void inputHLS::readPMT(){ - INFO_MSG("readPMT()"); + HIGH_MSG("readPMT()"); if (playlists[currentPlaylist].isUrl()){ size_t bpos; TS::Packet tsBuffer; - const char *tmpPtr = playlists[currentPlaylist].source.data(); + const char *tmpPtr = playlists[currentPlaylist].segDL.data().data(); while (!tsStream.hasPacketOnEachTrack() && - (tmpPtr - playlists[currentPlaylist].source.c_str() + 188 <= - playlists[currentPlaylist].source.size())){ + (tmpPtr - playlists[currentPlaylist].segDL.data().data() + 188 <= + playlists[currentPlaylist].segDL.data().size())){ tsBuffer.FromPointer(tmpPtr); tsStream.parse(tsBuffer, 0); tmpPtr += 188; @@ -857,11 +717,8 @@ continueNegotiate(); while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromStream(in)){ tsStream.parse(tsBuffer, 0); } - - // tsStream.clear(); - tsStream.partialClear(); //?? partialclear gebruiken?, input raakt hierdoor - // inconsistent.. - + tsStream.partialClear(); + in.clear(); in.seekg(bpos, in.beg); } } @@ -869,7 +726,6 @@ continueNegotiate(); // Note: bpos is overloaded here for playlist entry! void inputHLS::seek(int seekTime){ tsStream.clear(); - readPMT(); int trackId = 0; unsigned long plistEntry = 0xFFFFFFFFull; @@ -893,16 +749,25 @@ continueNegotiate(); } currentIndex = plistEntry - 1; + INFO_MSG("Current index = %d", currentIndex); + currentPlaylist = getMappedTrackPlaylist(trackId); Playlist &curPlaylist = playlists[currentPlaylist]; playListEntries &entry = curPlaylist.entries.at(currentIndex); if (curPlaylist.isUrl()){ - curPlaylist.loadURL(curPlaylist.uri_root + entry.filename); + curPlaylist.loadSegment(curPlaylist.root.link(entry.filename)); }else{ in.close(); - in.open((curPlaylist.uri_root + entry.filename).c_str()); + in.open(curPlaylist.root.link(entry.filename).getFilePath().c_str()); + MEDIUM_MSG("Opening segment: %s", + curPlaylist.root.link(entry.filename).getFilePath().c_str()); + if (!in.good()){ + FAIL_MSG("Could not open segment (%s): %s", strerror(errno), + curPlaylist.root.link(entry.filename).getUrl().c_str()); + } } + readPMT(); } int inputHLS::getEntryId(int playlistId, uint64_t bytePos){ @@ -923,117 +788,124 @@ continueNegotiate(); int inputHLS::getMappedTrackPlaylist(int id){return (pidMappingR[id] >> 16);} - /// Very first function to be called on a regular playlist or variant playlist. + /// Parses the main playlist, possibly containing variants. bool inputHLS::initPlaylist(const std::string &uri){ std::string line; bool ret = false; startTime = Util::bootSecs(); - std::string init_source; - -// std::string playlistRootPath = uri.substr(0, uri.rfind("/") + 1); HTTP::URL playlistRootPath(uri); + // Convert custom http(s)-hls protocols into regular notation. + if (playlistRootPath.protocol == "http-hls"){playlistRootPath.protocol = "http";} + if (playlistRootPath.protocol == "https-hls"){playlistRootPath.protocol = "https";} std::istringstream urlSource; std::ifstream fileSource; - bool isUrl = false; - if (uri.find("://") != std::string::npos){ - isUrl = true; - Playlist p; - p.loadURL(uri); - init_source = p.source; - urlSource.str(init_source); + bool isUrl = (uri.find("://") != std::string::npos); + if (isUrl){ + HTTP::Downloader plsDL; + plsDL.dataTimeout = 15; + plsDL.retryCount = 8; + if (!plsDL.get(playlistRootPath) || !plsDL.isOk()){ + FAIL_MSG("Could not download main playlist, aborting."); + return false; + } + urlSource.str(plsDL.data()); }else{ + // If we're not a URL and there is no / at the start, ensure we get the full absolute path. + if (uri[0] != '/'){ + char *rp = realpath(uri.c_str(), 0); + if (rp){ + playlistRootPath = HTTP::URL((std::string)rp); + free(rp); + } + } fileSource.open(uri.c_str()); + if (!fileSource.good()){ + FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), uri.c_str()); + } } std::istream &input = (isUrl ? (std::istream &)urlSource : (std::istream &)fileSource); std::getline(input, line); while (std::getline(input, line)){ - cleanLine(line); -// INFO_MSG("processing line: %s", line.c_str()); - if (!line.empty()){// skip empty lines in the playlist - if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){ - // this is a variant playlist file.. next line is an uri to a playlist - // file - size_t pos; - bool codecSupported = false; + cleanLine(line); + if (line.empty()){ + // skip empty lines in the playlist + continue; + } + if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){ + // this is a variant playlist file.. next line is an uri to a playlist + // file + size_t pos; + bool codecSupported = false; - pos = line.find("CODECS=\""); - if(pos != std::string::npos){ - std::string codecs = line.substr(pos + 8); - transform(codecs.begin(), codecs.end(), codecs.begin(),::tolower); + pos = line.find("CODECS=\""); + if (pos != std::string::npos){ + std::string codecs = line.substr(pos + 8); + transform(codecs.begin(), codecs.end(), codecs.begin(), ::tolower); - pos = codecs.find("\""); + pos = codecs.find("\""); - if(pos != std::string::npos){ - codecs = codecs.substr(0, pos); - codecs.append(","); + if (pos != std::string::npos){ + codecs = codecs.substr(0, pos); + codecs.append(","); - std::string codec; - while ((pos = codecs.find(",")) != std::string::npos){ - codec = codecs.substr(0,pos); - codecs = codecs.substr(pos+1); - if((codec.compare(0, 4, "mp4a") == 0 ) || - (codec.compare(0, 4, "avc1") == 0 )|| - (codec.compare(0, 4, "h264") == 0 )|| - (codec.compare(0, 4, "mp3") == 0 )|| - (codec.compare(0, 4, "aac") == 0 )|| - (codec.compare(0, 4, "ac3") == 0 )){ - codecSupported = true; - }else{ - FAIL_MSG("codec: %s not supported!", codec.c_str()); - } + std::string codec; + while ((pos = codecs.find(",")) != std::string::npos){ + codec = codecs.substr(0, pos); + codecs = codecs.substr(pos + 1); + if ((codec.compare(0, 4, "mp4a") == 0) || (codec.compare(0, 4, "avc1") == 0) || + (codec.compare(0, 4, "h264") == 0) || (codec.compare(0, 4, "mp3") == 0) || + (codec.compare(0, 4, "aac") == 0) || (codec.compare(0, 4, "ac3") == 0)){ + codecSupported = true; + }else{ + FAIL_MSG("codec: %s not supported!", codec.c_str()); } - }else{ - codecSupported = true; } }else{ codecSupported = true; } - -// std::getline(input, line); - while(std::getline(input, line)){ - cleanLine(line); - if(!line.empty()){ - break; - } - } - - if(codecSupported){ - - ret = readPlaylist(playlistRootPath.link(line).getUrl()); - INFO_MSG("read variant playlist: %s",playlistRootPath.link(line).getUrl().c_str()); - }else{ - INFO_MSG("skipping variant playlist %s, none of the codecs are supported",playlistRootPath.link(line).getUrl().c_str()); - } - - }else if (line.compare(0, 12, "#EXT-X-MEDIA") == 0){ - // this is also a variant playlist, but streams need to be processed - // another way - - std::string mediafile; - if (line.compare(18, 5, "AUDIO") == 0){ - // find URI attribute - int pos = line.find("URI"); - if (pos != std::string::npos){ - mediafile = line.substr(pos + 5, line.length() - pos - 6); - ret = readPlaylist(playlistRootPath.link(mediafile).getUrl()); - } - } - - }else if (line.compare(0, 7, "#EXTINF") == 0){ - // current file is not a variant playlist, but regular playlist. - - DEBUG_MSG(DLVL_HIGH, "Read regular playlist: %s", uri.c_str()); - ret = readPlaylist(uri); - break; }else{ - // ignore wrong lines - VERYHIGH_MSG("ignore wrong line: %s", line.c_str()); + codecSupported = true; } + + while (std::getline(input, line)){ + cleanLine(line); + if (!line.empty()){break;} + } + + if (codecSupported){ + + ret = readPlaylist(playlistRootPath.link(line)); + }else{ + INFO_MSG("skipping variant playlist %s, none of the codecs are supported", + playlistRootPath.link(line).getUrl().c_str()); + } + + }else if (line.compare(0, 12, "#EXT-X-MEDIA") == 0){ + // this is also a variant playlist, but streams need to be processed + // another way + + std::string mediafile; + if (line.compare(18, 5, "AUDIO") == 0){ + // find URI attribute + int pos = line.find("URI"); + if (pos != std::string::npos){ + mediafile = line.substr(pos + 5, line.length() - pos - 6); + ret = readPlaylist(playlistRootPath.link(mediafile)); + } + } + + }else if (line.compare(0, 7, "#EXTINF") == 0){ + // current file is not a variant playlist, but regular playlist. + ret = readPlaylist(uri); + break; + }else{ + // ignore wrong lines + VERYHIGH_MSG("ignore wrong line: %s", line.c_str()); } } @@ -1043,15 +915,11 @@ continueNegotiate(); } /// Function for reading every playlist. - bool inputHLS::readPlaylist(const std::string &uri){ - Playlist p(uri); + bool inputHLS::readPlaylist(const HTTP::URL &uri){ + Playlist p(uri.protocol.size() ? uri.getUrl() : uri.getFilePath()); p.id = playlists.size(); // set size of reloadNext to playlist count with default value 0 playlists.push_back(p); - - if (reloadNext.size() < playlists.size()){reloadNext.resize(playlists.size());} - - reloadNext.at(p.id) = Util::bootSecs() + p.waitTime; return true; } @@ -1062,57 +930,58 @@ continueNegotiate(); Playlist &curList = playlists[currentPlaylist]; if (!curList.entries.size()){ - VERYHIGH_MSG("no entries found in playlist: %d!", currentPlaylist); + WARN_MSG("no entries found in playlist: %d!", currentPlaylist); return false; } -// std::string url = (curList.uri_root + curList.entries.front().filename).c_str(); - std::string url = curList.root.link(curList.entries.front().filename).getUrl(); //use link - -// std::string url = curList.root.getUrl().c_str(); - -//FAIL_MSG("url: %s, root: %s",url.c_str(), curList.root.getUrl().c_str()); + // URL-based if (curList.isUrl()){ - if(curList.loadURL(url)){ - curList.entries.pop_front(); // remove the item which is opened for reading. - }else{ - if(curList.DL.getStatusCode() == 404){ - curList.entries.pop_front(); //remove files on 404 errors. - WARN_MSG("removing non existing file from the queue: %s", url.c_str()); - } + if (!curList.loadSegment(curList.root.link(curList.entries.front().filename))){ + ERROR_MSG("Could not download segment: %s", + curList.root.link(curList.entries.front().filename).getUrl().c_str()); + curList.entries.pop_front(); + return readNextFile(); // Attempt to read another, if possible. } + curList.entries.pop_front(); + return true; } + // file-based, live if (curList.playlistType == LIVE){ in.close(); - in.open(url.c_str()); - - if (in.good()){ - curList.entries.pop_front(); // remove the item which is opened for reading. - return true; - } - return false; + std::string filepath = + curList.root.link(curList.entries.at(currentIndex).filename).getFilePath(); + curList.entries.pop_front(); // remove the item from the playlist + in.open(filepath.c_str()); + if (in.good()){return true;} + FAIL_MSG("Could not open segment (%s): %s", strerror(errno), filepath.c_str()); + return readNextFile(); // Attempt to read another, if possible. } + + // file-based, VoD ++currentIndex; if (curList.entries.size() <= currentIndex){ - INFO_MSG("end of playlist reached!"); + HIGH_MSG("end of playlist reached (%u of %u)!", currentIndex, curList.entries.size()); return false; } in.close(); - - - - url = curList.uri_root + curList.entries.at(currentIndex).filename; - url = curList.root.link(curList.entries.at(currentIndex).filename).getUrl(); //use link - - in.open(url.c_str()); - return true; + std::string filepath = + curList.root.link(curList.entries.at(currentIndex).filename).getFilePath(); + in.open(filepath.c_str()); + if (in.good()){ + readPMT(); + return true; + } + FAIL_MSG("Could not open segment (%s): %s", strerror(errno), filepath.c_str()); + return readNextFile(); } /// return the playlist id from which we need to read the first upcoming segment /// by timestamp. /// this will keep the playlists in sync while reading segments. int inputHLS::firstSegment(){ + // Only one selected? Immediately return the right playlist. + if (selectedTracks.size() == 1){return getMappedTrackPlaylist(*selectedTracks.begin());} uint64_t firstTimeStamp = 0; int tmpId = -1; @@ -1128,29 +997,5 @@ continueNegotiate(); return tmpId; } - // read the next segment - void inputHLS::waitForNextSegment(){ - uint32_t pListId = firstSegment(); - if (pListId == -1){ - FAIL_MSG("no segments"); - VERYHIGH_MSG("no segments found!"); - return; - } - int segmentTime = playlists[pListId].entries.front().timestamp - Util::bootSecs(); - DEBUG_MSG(DLVL_HIGH, "segmenttime: %d, %llu, bootsecs: %llu",segmentTime, playlists[pListId].entries.front().timestamp , Util::bootSecs()); - if (segmentTime){ - while (segmentTime > 0 && (needsLock() || nProxy.userClient.isAlive())){ - //INFO_MSG("waiting for segment..."); - Util::wait(1000); - --segmentTime; - continueNegotiate(); - nProxy.userClient.keepAlive(); - } - } - } - - - - -} +}// namespace Mist diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 9fe745ef..7188bd5c 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -9,11 +9,8 @@ #include #include //#include -#include #include - - - +#include #define BUFFERTIME 10 @@ -36,32 +33,26 @@ namespace Mist{ bool isUrl() const; bool reload(); void addEntry(const std::string &filename, float duration, uint64_t &totalBytes); - bool loadURL(const std::string &loadUrl); + bool loadSegment(const HTTP::URL &uri); bool isSupportedFile(const std::string filename); - std::string uri; //link to the current playlistfile - std::string uri_root; - + std::string uri; // link to the current playlistfile HTTP::URL root; - HTTP::Downloader DL; + HTTP::Downloader segDL; + HTTP::Downloader plsDL; - - std::string source; const char *packetPtr; + uint64_t reloadNext; int id; - bool initDone; bool playlistEnd; int noChangeCount; - int version; - uint64_t media_sequence; - int lastFileIndex; + uint64_t lastFileIndex; int waitTime; PlaylistType playlistType; std::deque entries; - int entryCount; unsigned int lastTimestamp; unsigned int startTime; }; @@ -87,7 +78,6 @@ namespace Mist{ PlaylistType playlistType; int version; int targetDuration; - int media_sequence; bool endPlaylist; int currentPlaylist; @@ -97,13 +87,11 @@ namespace Mist{ std::map pidMapping; std::map pidMappingR; - std::vector reloadNext; - int currentIndex; std::string currentFile; std::ifstream in; - TS::Stream tsStream; ///