diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 8a6fd001..b0494557 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -24,6 +24,14 @@ namespace Mist{ // remove trailing \r for windows generated playlist files + + + inputHLS* self = 0; + + bool callbackFunc(){ + return self->callback(); + } + int cleanLine(std::string &s){ if (s.length() > 0 && s.at(s.length() - 1) == '\r'){s.erase(s.size() - 1);} } @@ -39,6 +47,7 @@ namespace Mist{ uri = uriSrc; startTime = Util::bootSecs(); + if (uri.size()){ std::string line; std::string key; @@ -46,8 +55,10 @@ namespace Mist{ int count = 0; uint64_t totalBytes = 0; uri_root = uri.substr(0, uri.rfind("/") + 1); + root = HTTP::URL(uri_root); + playlistType = LIVE; // Temporary value - INFO_MSG("readplaylist: %s", uri.c_str()); + INFO_MSG("Readplaylist: %s", uri.c_str()); std::istringstream urlSource; std::ifstream fileSource; @@ -73,7 +84,7 @@ namespace Mist{ if (key == "VERSION"){version = atoi(val.c_str());} - if (key == "TARGETDURATION"){waitTime = atoi(val.c_str());} + if (key == "TARGETDURATION"){waitTime = atoi(val.c_str())/2 ;} if (key == "MEDIA-SEQUENCE"){ media_sequence = atoi(val.c_str()); @@ -103,6 +114,8 @@ namespace Mist{ float f = atof(line.c_str() + 8); std::string filename; std::getline(input, filename); + + DEBUG_MSG(DLVL_HIGH, "Adding entry %s", filename.c_str()); addEntry(filename, f, totalBytes); count++; } @@ -122,47 +135,64 @@ namespace Mist{ return (uri_root.size() ? uri_root.find("http://") == 0 : uri.find("http://") == 0); } + bool inputHLS::callback(){ + if(nProxy.userClient.isAlive()){ + nProxy.userClient.keepAlive(); + } + + return config->is_active; + } + bool Playlist::loadURL(const std::string &loadUrl){ - HIGH_MSG("opening URL: %s", loadUrl.c_str()); - HTTP::URL url(loadUrl); - if (url.protocol != "http"){ - FAIL_MSG("Protocol %s is not supported", url.protocol.c_str()); + + //root = HTTP::URL(loadUrl); + HTTP::URL root = HTTP::URL(loadUrl); + if (root.protocol != "http"){ + FAIL_MSG("Only http protocol is supported (%s not supported)", root.protocol.c_str()); return false; } - Socket::Connection conn(url.host, url.getPort(), false); - if (!conn){ - FAIL_MSG("Failed to reach %s on port %lu", url.host.c_str(), url.getPort()); - return false; - } + DL.progressCallback = callbackFunc; - HTTP::Parser http; - http.url = "/" + url.path; - http.method = "GET"; - http.SetHeader("Host", url.host); - http.SetHeader("X-MistServer", PACKAGE_VERSION); - - conn.SendNow(http.BuildRequest()); - http.Clean(); - - uint64_t startTime = Util::epoch(); - source.clear(); - packetPtr = 0; - while ((Util::epoch() - startTime < 10) && (conn || conn.Received().size())){ - if (conn.spool() || conn.Received().size()){ - if (http.Read(conn)){ - source = http.body; - packetPtr = source.data(); - conn.close(); - return true; + if (DL.get(root)){ + if (DL.isOk()){ + DEBUG_MSG(DLVL_HIGH, "statusCode: %d, statusText: ", DL.getStatusCode()); + // DL.getStatusText().c_str()); + if (DL.getHeader("Content-Length") != ""){ + if (DL.data().size() != atoi(DL.getHeader("Content-Length").c_str())){ + FAIL_MSG("Expected %s bytes of data, but only received %lu.", + DL.getHeader("Content-Length").c_str(), DL.data().size()); + return false; + } } + + //check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes + if(DL.data().data()[0] == 0x47) { + if (DL.data().size() % 188){ + FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s", DL.data().size(), loadUrl.c_str()); + } + }else if(DL.data().data()[5] == 0x47){ + if (DL.data().size() % 192){ + FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s", DL.data().size(), loadUrl.c_str()); + } + } + + }else{ + FAIL_MSG("HTTP response not OK!. statuscode: %d, statustext: %s", DL.getStatusCode(), DL.getStatusText().c_str()); } + }else{ + FAIL_MSG("failed url get()"); } - FAIL_MSG("Failed to load %s: %s", loadUrl.c_str(), conn ? "timeout" : "connection closed"); - if (conn){conn.close();} - return false; + + source.clear(); + source = DL.data(); + + packetPtr = 0; + packetPtr = source.data(); + + return DL.isOk(); } /// Function for reloading the playlist in case of live streams. @@ -174,6 +204,8 @@ namespace Mist{ std::string val; int count = 0; + std::string all; + uint64_t totalBytes = 0; std::istringstream urlSource; @@ -191,9 +223,11 @@ namespace Mist{ while (std::getline(input, line)){ cleanLine(line); + all.append(line); if (line.compare(0, 21, "#EXT-X-MEDIA-SEQUENCE") == 0){ media_sequence = atoi(line.c_str() + line.find(":") + 1); skip = (lastFileIndex - media_sequence); + INFO_MSG("media seq: %d, skip: %d", media_sequence, skip); continue; } if (line.compare(0, 7, "#EXTINF") != 0){continue;} @@ -202,11 +236,21 @@ namespace Mist{ std::string filename; std::getline(input, filename); + all.append(line); // check for already added segments if (skip){ + //INFO_MSG("skipping file: %s", filename.c_str()); skip--; }else{ + + if (count == 0){ + // clear the whole buffer with entries, and insert only new segments + // INFO_MSG("clear entries"); + // entries.clear(); + } + cleanLine(filename); + DEBUG_MSG(DLVL_HIGH, "Adding segment %s", filename.c_str()); addEntry(filename, f, totalBytes); count++; } @@ -219,6 +263,7 @@ namespace Mist{ if (ret){ noChangeCount = 0; }else{ + INFO_MSG("no changes.."); ++noChangeCount; if (noChangeCount > 3){VERYHIGH_MSG("enough!");} } @@ -226,8 +271,28 @@ namespace Mist{ return ret; } - /// function for adding segments to the playlist to be processed. used for VOD and live + bool Playlist::isSupportedFile(const std::string filename){ + //only ts files + if(filename.find_last_of(".") != std::string::npos){ + std::string ext = filename.substr(filename.find_last_of(".")+1); + + if (ext.compare(0, 2, "ts") == 0){ + return true; + }else{ + DEBUG_MSG(DLVL_HIGH, "Not supported extension: %s", ext.c_str()); + return false; + } + } + } + + /// function for adding segments to the playlist to be processed. used for VOD + /// and live void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes){ + if(!isSupportedFile(filename)){ + WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); + return; + } + playListEntries entry; entry.filename = filename; cleanLine(entry.filename); @@ -249,12 +314,13 @@ namespace Mist{ entry.duration = duration; if (!isUrl()){totalBytes += fileSource.tellg();} - if (initDone){ + if (initDone || (entryCount > 2)){ lastTimestamp += duration; - entry.timestamp = lastTimestamp + startTime; - entry.wait = entryCount * duration; + entry.timestamp = lastTimestamp + startTime ; }else{ + INFO_MSG("set timestamp ZERO, load immediatly!"); entry.timestamp = 0; // read all segments immediatly at the beginning, then use delays + //FAIL_MSG("e timestamp %llu", entry.timestamp); } ++entryCount; entries.push_back(entry); @@ -263,13 +329,14 @@ namespace Mist{ /// Constructor of HLS Input inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){ + self = this; currentPlaylist = 0; - + capa["name"] = "HLS"; capa["decs"] = "Enables HLS Input"; capa["source_match"].append("/*.m3u8"); capa["source_match"].append("http://*.m3u8"); - //These two can/may be set to always-on mode + // These two can/may be set to always-on mode capa["always_match"].append("/*.m3u8"); capa["always_match"].append("http://*.m3u8"); @@ -287,6 +354,7 @@ namespace Mist{ } bool inputHLS::checkArguments(){ + config->is_active = true; if (config->getString("input") == "-"){return false;} if (!initPlaylist(config->getString("input"))){return false;} @@ -323,7 +391,7 @@ namespace Mist{ void inputHLS::parseStreamHeader(){ bool hasHeader = false; if (!hasHeader){myMeta = DTSC::Meta();} - +INFO_MSG("parsestream"); TS::Packet packet; // to analyse and extract data int counter = 1; int packetId = 0; @@ -341,8 +409,13 @@ namespace Mist{ uint64_t lastBpos = entryIt->bytePos; if (pListIt->isUrl()){ - pListIt->loadURL(pListIt->uri_root + entryIt->filename); - + bool ret = false; + continueNegotiate(); + nProxy.userClient.keepAlive(); +//FAIL_MSG("parsestreamheader url: root: %s,uri_root: %s, entry: %s", pListIt->root.getUrl().c_str(), pListIt->uri_root.c_str(), entryIt->filename.c_str()); + ret = pListIt->loadURL(pListIt->root.link(entryIt->filename).getUrl().c_str()); + //ret = pListIt->loadURL(pListIt->uri_root + entryIt->filename); + keepReading = packet.FromPointer(pListIt->packetPtr); pListIt->packetPtr += 188; }else{ @@ -405,11 +478,7 @@ namespace Mist{ } tsStream.clear(); - INFO_MSG("end stream header tracks: %d", myMeta.tracks.size()); if (hasHeader){return;} - - // myMeta.live = true; - // myMeta.vod = false; in.close(); } @@ -586,7 +655,8 @@ namespace Mist{ bool inputHLS::openStreamSource(){return true;} int inputHLS::getFirstPlaylistToReload(){ - // at this point, we need to check which playlist we need to reload, and keep reading from that + // at this point, we need to check which playlist we need to reload, and keep + // reading from that // playlist until EndOfPlaylist std::vector::iterator result = std::min_element(reloadNext.begin(), reloadNext.end()); return std::distance(reloadNext.begin(), result); @@ -603,6 +673,7 @@ namespace Mist{ thisPacket.null(); while (!hasPacket && config->is_active && (needsLock() || nProxy.userClient.isAlive())){ + if (playlists[currentPlaylist].isUrl()){ endOfFile = playlists[currentPlaylist].atEnd(); @@ -630,46 +701,66 @@ namespace Mist{ } if (endOfFile && !hasPacket){ + //INFO_MSG("endoffile and no packet"); if (playlists[currentPlaylist].playlistType == LIVE){ - int a = getFirstPlaylistToReload(); int segmentTime = 30; - HIGH_MSG("need to reload playlist %d, time: %d", a, reloadNext[a] - Util::bootSecs()); + //INFO_MSG("need to reload playlist %d, time: %d", a, reloadNext[a] - Util::bootSecs()); int f = firstSegment(); if (f >= 0){segmentTime = playlists[f].entries.front().timestamp - Util::bootSecs();} int playlistTime = reloadNext.at(currentPlaylist) - Util::bootSecs() - 1; - if (playlistTime < segmentTime){ - while (playlistTime > 0 && (needsLock() || nProxy.userClient.isAlive())){ - Util::wait(900); - nProxy.userClient.keepAlive(); - playlistTime--; - } + playlistTime = reloadNext[a] - Util::bootSecs(); + if (playlistTime < 1 && playlists[currentPlaylist].playlistEnd == false){ // update reloadTime before reading the playlist reloadNext.at(playlists[a].id) = Util::bootSecs() + playlists[a].waitTime; playlists[a].reload(); } - waitForNextSegment(); - } +waitForNextSegment(); + if (f < 0){ + while (Util::bootSecs() < reloadNext[a] && + (needsLock() || nProxy.userClient.isAlive())){ + Util::wait(1000); + continueNegotiate(); + nProxy.userClient.keepAlive(); + } + INFO_MSG("reloading playlist"); + + reloadNext.at(playlists[a].id) = Util::bootSecs() + playlists[a].waitTime; + + if(playlists[a].playlistEnd && playlists[a].entries.size() == 0) + { + INFO_MSG("No more entries, stop reloading"); + thisPacket.null(); + return; + + } + playlists[a].reload(); + } + } + int b = Util::bootSecs(); if (!readNextFile()){ - +continueNegotiate(); + nProxy.userClient.keepAlive(); if (playlists[currentPlaylist].playlistType != LIVE){return;} - // need to reload all available playlists. update the map with the amount of ms to wait + // need to reload all available playlists. update the map with the + // amount of ms to wait // before the next check. // set specific elements with the correct bootsecs() reloadNext.at(currentPlaylist) = b + playlists[currentPlaylist].waitTime; int timeToWait = reloadNext.at(currentPlaylist) - Util::bootSecs(); - - // at this point, we need to check which playlist we need to reload, and keep reading from +//INFO_MSG("readnextfile if"); + // at this point, we need to check which playlist we need to reload, and + // keep reading from // that playlist until EndOfPlaylist std::vector::iterator result = std::min_element(reloadNext.begin(), reloadNext.end()); @@ -689,7 +780,7 @@ namespace Mist{ } } - if (playlists[currentPlaylist].playlistEnd){ + if (playlists[currentPlaylist].playlistEnd && playlists[currentPlaylist].playlistType != LIVE){ INFO_MSG("Playlist %d has reached his end!"); thisPacket.null(); return; @@ -735,6 +826,7 @@ namespace Mist{ } void inputHLS::readPMT(){ + INFO_MSG("readPMT()"); if (playlists[currentPlaylist].isUrl()){ size_t bpos; TS::Packet tsBuffer; @@ -758,7 +850,8 @@ namespace Mist{ } // tsStream.clear(); - tsStream.partialClear(); //?? partialclear gebruiken?, input raakt hierdoor inconsistent.. + tsStream.partialClear(); //?? partialclear gebruiken?, input raakt hierdoor + // inconsistent.. in.seekg(bpos, in.beg); } @@ -766,7 +859,6 @@ namespace Mist{ // Note: bpos is overloaded here for playlist entry! void inputHLS::seek(int seekTime){ - INFO_MSG("SEEK"); tsStream.clear(); readPMT(); int trackId = 0; @@ -829,7 +921,9 @@ namespace Mist{ startTime = Util::bootSecs(); std::string init_source; - std::string playlistRootPath = uri.substr(0, uri.rfind("/") + 1); +// std::string playlistRootPath = uri.substr(0, uri.rfind("/") + 1); + + HTTP::URL playlistRootPath(uri); std::istringstream urlSource; std::ifstream fileSource; @@ -849,31 +943,87 @@ namespace Mist{ std::getline(input, line); while (std::getline(input, line)){ + cleanLine(line); +// INFO_MSG("processing line: %s", line.c_str()); if (!line.empty()){// skip empty lines in the playlist if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){ - // this is a variant playlist file.. next line is an uri to a playlist file - std::getline(input, line); - ret = readPlaylist(playlistRootPath + line); + // this is a variant playlist file.. next line is an uri to a playlist + // file + size_t pos; + bool codecSupported = false; + + pos = line.find("CODECS=\""); + if(pos != std::string::npos){ + std::string codecs = line.substr(pos + 8); + transform(codecs.begin(), codecs.end(), codecs.begin(),::tolower); + + pos = codecs.find("\""); + + if(pos != std::string::npos){ + codecs = codecs.substr(0, pos); + codecs.append(","); + + std::string codec; + while ((pos = codecs.find(",")) != std::string::npos){ + codec = codecs.substr(0,pos); + codecs = codecs.substr(pos+1); + if((codec.compare(0, 4, "mp4a") == 0 ) || + (codec.compare(0, 4, "avc1") == 0 )|| + (codec.compare(0, 4, "h264") == 0 )|| + (codec.compare(0, 4, "mp3") == 0 )|| + (codec.compare(0, 4, "aac") == 0 )|| + (codec.compare(0, 4, "ac3") == 0 )){ + codecSupported = true; + }else{ + FAIL_MSG("codec: %s not supported!", codec.c_str()); + } + } + }else{ + codecSupported = true; + } + }else{ + codecSupported = true; + } + +// std::getline(input, line); + while(std::getline(input, line)){ + cleanLine(line); + if(!line.empty()){ + break; + } + } + + if(codecSupported){ + + ret = readPlaylist(playlistRootPath.link(line).getUrl()); + INFO_MSG("read variant playlist: %s",playlistRootPath.link(line).getUrl().c_str()); + }else{ + INFO_MSG("skipping variant playlist %s, none of the codecs are supported",playlistRootPath.link(line).getUrl().c_str()); + } + }else if (line.compare(0, 12, "#EXT-X-MEDIA") == 0){ - // this is also a variant playlist, but streams need to be processed another way + // this is also a variant playlist, but streams need to be processed + // another way std::string mediafile; if (line.compare(18, 5, "AUDIO") == 0){ // find URI attribute int pos = line.find("URI"); if (pos != std::string::npos){ - mediafile = line.substr(pos + 5, line.length() - pos - 6); - ret = readPlaylist(playlistRootPath + mediafile); + mediafile = line.substr(pos + 5, line.length() - pos - 6); + ret = readPlaylist(playlistRootPath.link(mediafile).getUrl()); } } }else if (line.compare(0, 7, "#EXTINF") == 0){ // current file is not a variant playlist, but regular playlist. + + DEBUG_MSG(DLVL_HIGH, "Read regular playlist: %s", uri.c_str()); ret = readPlaylist(uri); break; }else{ // ignore wrong lines - WARN_MSG("ignore wrong line: %s", line.c_str()); + VERYHIGH_MSG("ignore wrong line: %s", line.c_str()); } } } @@ -896,7 +1046,8 @@ namespace Mist{ return true; } - /// Read next .ts file from the playlist. (from the list of entries which needs to be processed) + /// Read next .ts file from the playlist. (from the list of entries which needs + /// to be processed) bool inputHLS::readNextFile(){ tsStream.clear(); Playlist &curList = playlists[currentPlaylist]; @@ -906,10 +1057,21 @@ namespace Mist{ return false; } - std::string url = (curList.uri_root + curList.entries.front().filename).c_str(); +// std::string url = (curList.uri_root + curList.entries.front().filename).c_str(); + std::string url = curList.root.link(curList.entries.front().filename).getUrl(); //use link - if (curList.isUrl() && curList.loadURL(url)){ - curList.entries.pop_front(); // remove the item which is opened for reading. +// std::string url = curList.root.getUrl().c_str(); + +//FAIL_MSG("url: %s, root: %s",url.c_str(), curList.root.getUrl().c_str()); + if (curList.isUrl()){ + if(curList.loadURL(url)){ + curList.entries.pop_front(); // remove the item which is opened for reading. + }else{ + if(curList.DL.getStatusCode() == 404){ + curList.entries.pop_front(); //remove files on 404 errors. + WARN_MSG("removing non existing file from the queue: %s", url.c_str()); + } + } } if (curList.playlistType == LIVE){ @@ -928,13 +1090,18 @@ namespace Mist{ return false; } in.close(); - url = curList.uri_root + curList.entries.at(currentIndex).filename; + + + url = curList.uri_root + curList.entries.at(currentIndex).filename; + url = curList.root.link(curList.entries.at(currentIndex).filename).getUrl(); //use link + in.open(url.c_str()); return true; } - /// return the playlist id from which we need to read the first upcoming segment by timestamp. + /// return the playlist id from which we need to read the first upcoming segment + /// by timestamp. /// this will keep the playlists in sync while reading segments. int inputHLS::firstSegment(){ uint64_t firstTimeStamp = 0; @@ -956,13 +1123,15 @@ namespace Mist{ void inputHLS::waitForNextSegment(){ uint32_t pListId = firstSegment(); if (pListId == -1){ + FAIL_MSG("no segments"); VERYHIGH_MSG("no segments found!"); return; } int segmentTime = playlists[pListId].entries.front().timestamp - Util::bootSecs(); + DEBUG_MSG(DLVL_HIGH, "segmenttime: %d, %llu, bootsecs: %llu",segmentTime, playlists[pListId].entries.front().timestamp , Util::bootSecs()); if (segmentTime){ - --segmentTime; - while (segmentTime > 1 && (needsLock() || nProxy.userClient.isAlive())){ + while (segmentTime > 0 && (needsLock() || nProxy.userClient.isAlive())){ + //INFO_MSG("waiting for segment..."); Util::wait(1000); --segmentTime; continueNegotiate(); @@ -970,5 +1139,9 @@ namespace Mist{ } } } + + + + } diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 52c875f9..ef5d2f57 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -9,6 +9,11 @@ #include #include //#include +#include +#include + + + #define BUFFERTIME 10 @@ -32,9 +37,15 @@ namespace Mist{ bool reload(); void addEntry(const std::string &filename, float duration, uint64_t &totalBytes); bool loadURL(const std::string &loadUrl); + bool isSupportedFile(const std::string filename); + + std::string uri; //link to the current playlistfile + std::string uri_root; + + HTTP::URL root; + + HTTP::Downloader DL; - std::string uri; - std::string uri_root; std::string source; const char *packetPtr; @@ -67,6 +78,7 @@ namespace Mist{ ~inputHLS(); bool needsLock(); bool openStreamSource(); + bool callback(); protected: // Private Functions