bugfixes hls input

This commit is contained in:
Ramkoemar 2017-07-25 14:46:03 +02:00 committed by Thulinma
parent 7ac6388dbb
commit 0f92664bb4
2 changed files with 267 additions and 82 deletions

View file

@ -24,6 +24,14 @@
namespace Mist{
// remove trailing \r for windows generated playlist files
inputHLS* self = 0;
bool callbackFunc(){
return self->callback();
}
int cleanLine(std::string &s){
if (s.length() > 0 && s.at(s.length() - 1) == '\r'){s.erase(s.size() - 1);}
}
@ -39,6 +47,7 @@ namespace Mist{
uri = uriSrc;
startTime = Util::bootSecs();
if (uri.size()){
std::string line;
std::string key;
@ -46,8 +55,10 @@ namespace Mist{
int count = 0;
uint64_t totalBytes = 0;
uri_root = uri.substr(0, uri.rfind("/") + 1);
root = HTTP::URL(uri_root);
playlistType = LIVE; // Temporary value
INFO_MSG("readplaylist: %s", uri.c_str());
INFO_MSG("Readplaylist: %s", uri.c_str());
std::istringstream urlSource;
std::ifstream fileSource;
@ -73,7 +84,7 @@ namespace Mist{
if (key == "VERSION"){version = atoi(val.c_str());}
if (key == "TARGETDURATION"){waitTime = atoi(val.c_str());}
if (key == "TARGETDURATION"){waitTime = atoi(val.c_str())/2 ;}
if (key == "MEDIA-SEQUENCE"){
media_sequence = atoi(val.c_str());
@ -103,6 +114,8 @@ namespace Mist{
float f = atof(line.c_str() + 8);
std::string filename;
std::getline(input, filename);
DEBUG_MSG(DLVL_HIGH, "Adding entry %s", filename.c_str());
addEntry(filename, f, totalBytes);
count++;
}
@ -122,47 +135,64 @@ namespace Mist{
return (uri_root.size() ? uri_root.find("http://") == 0 : uri.find("http://") == 0);
}
bool inputHLS::callback(){
if(nProxy.userClient.isAlive()){
nProxy.userClient.keepAlive();
}
return config->is_active;
}
bool Playlist::loadURL(const std::string &loadUrl){
HIGH_MSG("opening URL: %s", loadUrl.c_str());
HTTP::URL url(loadUrl);
if (url.protocol != "http"){
FAIL_MSG("Protocol %s is not supported", url.protocol.c_str());
//root = HTTP::URL(loadUrl);
HTTP::URL root = HTTP::URL(loadUrl);
if (root.protocol != "http"){
FAIL_MSG("Only http protocol is supported (%s not supported)", root.protocol.c_str());
return false;
}
Socket::Connection conn(url.host, url.getPort(), false);
if (!conn){
FAIL_MSG("Failed to reach %s on port %lu", url.host.c_str(), url.getPort());
return false;
}
DL.progressCallback = callbackFunc;
HTTP::Parser http;
http.url = "/" + url.path;
http.method = "GET";
http.SetHeader("Host", url.host);
http.SetHeader("X-MistServer", PACKAGE_VERSION);
conn.SendNow(http.BuildRequest());
http.Clean();
uint64_t startTime = Util::epoch();
source.clear();
packetPtr = 0;
while ((Util::epoch() - startTime < 10) && (conn || conn.Received().size())){
if (conn.spool() || conn.Received().size()){
if (http.Read(conn)){
source = http.body;
packetPtr = source.data();
conn.close();
return true;
if (DL.get(root)){
if (DL.isOk()){
DEBUG_MSG(DLVL_HIGH, "statusCode: %d, statusText: ", DL.getStatusCode());
// DL.getStatusText().c_str());
if (DL.getHeader("Content-Length") != ""){
if (DL.data().size() != atoi(DL.getHeader("Content-Length").c_str())){
FAIL_MSG("Expected %s bytes of data, but only received %lu.",
DL.getHeader("Content-Length").c_str(), DL.data().size());
return false;
}
}
//check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes
if(DL.data().data()[0] == 0x47) {
if (DL.data().size() % 188){
FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s", DL.data().size(), loadUrl.c_str());
}
}else if(DL.data().data()[5] == 0x47){
if (DL.data().size() % 192){
FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s", DL.data().size(), loadUrl.c_str());
}
}
}else{
FAIL_MSG("HTTP response not OK!. statuscode: %d, statustext: %s", DL.getStatusCode(), DL.getStatusText().c_str());
}
}else{
FAIL_MSG("failed url get()");
}
FAIL_MSG("Failed to load %s: %s", loadUrl.c_str(), conn ? "timeout" : "connection closed");
if (conn){conn.close();}
return false;
source.clear();
source = DL.data();
packetPtr = 0;
packetPtr = source.data();
return DL.isOk();
}
/// Function for reloading the playlist in case of live streams.
@ -174,6 +204,8 @@ namespace Mist{
std::string val;
int count = 0;
std::string all;
uint64_t totalBytes = 0;
std::istringstream urlSource;
@ -191,9 +223,11 @@ namespace Mist{
while (std::getline(input, line)){
cleanLine(line);
all.append(line);
if (line.compare(0, 21, "#EXT-X-MEDIA-SEQUENCE") == 0){
media_sequence = atoi(line.c_str() + line.find(":") + 1);
skip = (lastFileIndex - media_sequence);
INFO_MSG("media seq: %d, skip: %d", media_sequence, skip);
continue;
}
if (line.compare(0, 7, "#EXTINF") != 0){continue;}
@ -202,11 +236,21 @@ namespace Mist{
std::string filename;
std::getline(input, filename);
all.append(line);
// check for already added segments
if (skip){
//INFO_MSG("skipping file: %s", filename.c_str());
skip--;
}else{
if (count == 0){
// clear the whole buffer with entries, and insert only new segments
// INFO_MSG("clear entries");
// entries.clear();
}
cleanLine(filename);
DEBUG_MSG(DLVL_HIGH, "Adding segment %s", filename.c_str());
addEntry(filename, f, totalBytes);
count++;
}
@ -219,6 +263,7 @@ namespace Mist{
if (ret){
noChangeCount = 0;
}else{
INFO_MSG("no changes..");
++noChangeCount;
if (noChangeCount > 3){VERYHIGH_MSG("enough!");}
}
@ -226,8 +271,28 @@ namespace Mist{
return ret;
}
/// function for adding segments to the playlist to be processed. used for VOD and live
bool Playlist::isSupportedFile(const std::string filename){
//only ts files
if(filename.find_last_of(".") != std::string::npos){
std::string ext = filename.substr(filename.find_last_of(".")+1);
if (ext.compare(0, 2, "ts") == 0){
return true;
}else{
DEBUG_MSG(DLVL_HIGH, "Not supported extension: %s", ext.c_str());
return false;
}
}
}
/// function for adding segments to the playlist to be processed. used for VOD
/// and live
void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes){
if(!isSupportedFile(filename)){
WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
return;
}
playListEntries entry;
entry.filename = filename;
cleanLine(entry.filename);
@ -249,12 +314,13 @@ namespace Mist{
entry.duration = duration;
if (!isUrl()){totalBytes += fileSource.tellg();}
if (initDone){
if (initDone || (entryCount > 2)){
lastTimestamp += duration;
entry.timestamp = lastTimestamp + startTime;
entry.wait = entryCount * duration;
entry.timestamp = lastTimestamp + startTime ;
}else{
INFO_MSG("set timestamp ZERO, load immediatly!");
entry.timestamp = 0; // read all segments immediatly at the beginning, then use delays
//FAIL_MSG("e timestamp %llu", entry.timestamp);
}
++entryCount;
entries.push_back(entry);
@ -263,13 +329,14 @@ namespace Mist{
/// Constructor of HLS Input
inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){
self = this;
currentPlaylist = 0;
capa["name"] = "HLS";
capa["decs"] = "Enables HLS Input";
capa["source_match"].append("/*.m3u8");
capa["source_match"].append("http://*.m3u8");
//These two can/may be set to always-on mode
// These two can/may be set to always-on mode
capa["always_match"].append("/*.m3u8");
capa["always_match"].append("http://*.m3u8");
@ -287,6 +354,7 @@ namespace Mist{
}
bool inputHLS::checkArguments(){
config->is_active = true;
if (config->getString("input") == "-"){return false;}
if (!initPlaylist(config->getString("input"))){return false;}
@ -323,7 +391,7 @@ namespace Mist{
void inputHLS::parseStreamHeader(){
bool hasHeader = false;
if (!hasHeader){myMeta = DTSC::Meta();}
INFO_MSG("parsestream");
TS::Packet packet; // to analyse and extract data
int counter = 1;
int packetId = 0;
@ -341,8 +409,13 @@ namespace Mist{
uint64_t lastBpos = entryIt->bytePos;
if (pListIt->isUrl()){
pListIt->loadURL(pListIt->uri_root + entryIt->filename);
bool ret = false;
continueNegotiate();
nProxy.userClient.keepAlive();
//FAIL_MSG("parsestreamheader url: root: %s,uri_root: %s, entry: %s", pListIt->root.getUrl().c_str(), pListIt->uri_root.c_str(), entryIt->filename.c_str());
ret = pListIt->loadURL(pListIt->root.link(entryIt->filename).getUrl().c_str());
//ret = pListIt->loadURL(pListIt->uri_root + entryIt->filename);
keepReading = packet.FromPointer(pListIt->packetPtr);
pListIt->packetPtr += 188;
}else{
@ -405,11 +478,7 @@ namespace Mist{
}
tsStream.clear();
INFO_MSG("end stream header tracks: %d", myMeta.tracks.size());
if (hasHeader){return;}
// myMeta.live = true;
// myMeta.vod = false;
in.close();
}
@ -586,7 +655,8 @@ namespace Mist{
bool inputHLS::openStreamSource(){return true;}
int inputHLS::getFirstPlaylistToReload(){
// at this point, we need to check which playlist we need to reload, and keep reading from that
// at this point, we need to check which playlist we need to reload, and keep
// reading from that
// playlist until EndOfPlaylist
std::vector<int>::iterator result = std::min_element(reloadNext.begin(), reloadNext.end());
return std::distance(reloadNext.begin(), result);
@ -603,6 +673,7 @@ namespace Mist{
thisPacket.null();
while (!hasPacket && config->is_active && (needsLock() || nProxy.userClient.isAlive())){
if (playlists[currentPlaylist].isUrl()){
endOfFile = playlists[currentPlaylist].atEnd();
@ -630,46 +701,66 @@ namespace Mist{
}
if (endOfFile && !hasPacket){
//INFO_MSG("endoffile and no packet");
if (playlists[currentPlaylist].playlistType == LIVE){
int a = getFirstPlaylistToReload();
int segmentTime = 30;
HIGH_MSG("need to reload playlist %d, time: %d", a, reloadNext[a] - Util::bootSecs());
//INFO_MSG("need to reload playlist %d, time: %d", a, reloadNext[a] - Util::bootSecs());
int f = firstSegment();
if (f >= 0){segmentTime = playlists[f].entries.front().timestamp - Util::bootSecs();}
int playlistTime = reloadNext.at(currentPlaylist) - Util::bootSecs() - 1;
if (playlistTime < segmentTime){
while (playlistTime > 0 && (needsLock() || nProxy.userClient.isAlive())){
Util::wait(900);
nProxy.userClient.keepAlive();
playlistTime--;
}
playlistTime = reloadNext[a] - Util::bootSecs();
if (playlistTime < 1 && playlists[currentPlaylist].playlistEnd == false){
// update reloadTime before reading the playlist
reloadNext.at(playlists[a].id) = Util::bootSecs() + playlists[a].waitTime;
playlists[a].reload();
}
waitForNextSegment();
}
waitForNextSegment();
if (f < 0){
while (Util::bootSecs() < reloadNext[a] &&
(needsLock() || nProxy.userClient.isAlive())){
Util::wait(1000);
continueNegotiate();
nProxy.userClient.keepAlive();
}
INFO_MSG("reloading playlist");
reloadNext.at(playlists[a].id) = Util::bootSecs() + playlists[a].waitTime;
if(playlists[a].playlistEnd && playlists[a].entries.size() == 0)
{
INFO_MSG("No more entries, stop reloading");
thisPacket.null();
return;
}
playlists[a].reload();
}
}
int b = Util::bootSecs();
if (!readNextFile()){
continueNegotiate();
nProxy.userClient.keepAlive();
if (playlists[currentPlaylist].playlistType != LIVE){return;}
// need to reload all available playlists. update the map with the amount of ms to wait
// need to reload all available playlists. update the map with the
// amount of ms to wait
// before the next check.
// set specific elements with the correct bootsecs()
reloadNext.at(currentPlaylist) = b + playlists[currentPlaylist].waitTime;
int timeToWait = reloadNext.at(currentPlaylist) - Util::bootSecs();
// at this point, we need to check which playlist we need to reload, and keep reading from
//INFO_MSG("readnextfile if");
// at this point, we need to check which playlist we need to reload, and
// keep reading from
// that playlist until EndOfPlaylist
std::vector<int>::iterator result =
std::min_element(reloadNext.begin(), reloadNext.end());
@ -689,7 +780,7 @@ namespace Mist{
}
}
if (playlists[currentPlaylist].playlistEnd){
if (playlists[currentPlaylist].playlistEnd && playlists[currentPlaylist].playlistType != LIVE){
INFO_MSG("Playlist %d has reached his end!");
thisPacket.null();
return;
@ -735,6 +826,7 @@ namespace Mist{
}
void inputHLS::readPMT(){
INFO_MSG("readPMT()");
if (playlists[currentPlaylist].isUrl()){
size_t bpos;
TS::Packet tsBuffer;
@ -758,7 +850,8 @@ namespace Mist{
}
// tsStream.clear();
tsStream.partialClear(); //?? partialclear gebruiken?, input raakt hierdoor inconsistent..
tsStream.partialClear(); //?? partialclear gebruiken?, input raakt hierdoor
// inconsistent..
in.seekg(bpos, in.beg);
}
@ -766,7 +859,6 @@ namespace Mist{
// Note: bpos is overloaded here for playlist entry!
void inputHLS::seek(int seekTime){
INFO_MSG("SEEK");
tsStream.clear();
readPMT();
int trackId = 0;
@ -829,7 +921,9 @@ namespace Mist{
startTime = Util::bootSecs();
std::string init_source;
std::string playlistRootPath = uri.substr(0, uri.rfind("/") + 1);
// std::string playlistRootPath = uri.substr(0, uri.rfind("/") + 1);
HTTP::URL playlistRootPath(uri);
std::istringstream urlSource;
std::ifstream fileSource;
@ -849,31 +943,87 @@ namespace Mist{
std::getline(input, line);
while (std::getline(input, line)){
cleanLine(line);
// INFO_MSG("processing line: %s", line.c_str());
if (!line.empty()){// skip empty lines in the playlist
if (line.compare(0, 17, "#EXT-X-STREAM-INF") == 0){
// this is a variant playlist file.. next line is an uri to a playlist file
std::getline(input, line);
ret = readPlaylist(playlistRootPath + line);
// this is a variant playlist file.. next line is an uri to a playlist
// file
size_t pos;
bool codecSupported = false;
pos = line.find("CODECS=\"");
if(pos != std::string::npos){
std::string codecs = line.substr(pos + 8);
transform(codecs.begin(), codecs.end(), codecs.begin(),::tolower);
pos = codecs.find("\"");
if(pos != std::string::npos){
codecs = codecs.substr(0, pos);
codecs.append(",");
std::string codec;
while ((pos = codecs.find(",")) != std::string::npos){
codec = codecs.substr(0,pos);
codecs = codecs.substr(pos+1);
if((codec.compare(0, 4, "mp4a") == 0 ) ||
(codec.compare(0, 4, "avc1") == 0 )||
(codec.compare(0, 4, "h264") == 0 )||
(codec.compare(0, 4, "mp3") == 0 )||
(codec.compare(0, 4, "aac") == 0 )||
(codec.compare(0, 4, "ac3") == 0 )){
codecSupported = true;
}else{
FAIL_MSG("codec: %s not supported!", codec.c_str());
}
}
}else{
codecSupported = true;
}
}else{
codecSupported = true;
}
// std::getline(input, line);
while(std::getline(input, line)){
cleanLine(line);
if(!line.empty()){
break;
}
}
if(codecSupported){
ret = readPlaylist(playlistRootPath.link(line).getUrl());
INFO_MSG("read variant playlist: %s",playlistRootPath.link(line).getUrl().c_str());
}else{
INFO_MSG("skipping variant playlist %s, none of the codecs are supported",playlistRootPath.link(line).getUrl().c_str());
}
}else if (line.compare(0, 12, "#EXT-X-MEDIA") == 0){
// this is also a variant playlist, but streams need to be processed another way
// this is also a variant playlist, but streams need to be processed
// another way
std::string mediafile;
if (line.compare(18, 5, "AUDIO") == 0){
// find URI attribute
int pos = line.find("URI");
if (pos != std::string::npos){
mediafile = line.substr(pos + 5, line.length() - pos - 6);
ret = readPlaylist(playlistRootPath + mediafile);
mediafile = line.substr(pos + 5, line.length() - pos - 6);
ret = readPlaylist(playlistRootPath.link(mediafile).getUrl());
}
}
}else if (line.compare(0, 7, "#EXTINF") == 0){
// current file is not a variant playlist, but regular playlist.
DEBUG_MSG(DLVL_HIGH, "Read regular playlist: %s", uri.c_str());
ret = readPlaylist(uri);
break;
}else{
// ignore wrong lines
WARN_MSG("ignore wrong line: %s", line.c_str());
VERYHIGH_MSG("ignore wrong line: %s", line.c_str());
}
}
}
@ -896,7 +1046,8 @@ namespace Mist{
return true;
}
/// Read next .ts file from the playlist. (from the list of entries which needs to be processed)
/// Read next .ts file from the playlist. (from the list of entries which needs
/// to be processed)
bool inputHLS::readNextFile(){
tsStream.clear();
Playlist &curList = playlists[currentPlaylist];
@ -906,10 +1057,21 @@ namespace Mist{
return false;
}
std::string url = (curList.uri_root + curList.entries.front().filename).c_str();
// std::string url = (curList.uri_root + curList.entries.front().filename).c_str();
std::string url = curList.root.link(curList.entries.front().filename).getUrl(); //use link
if (curList.isUrl() && curList.loadURL(url)){
curList.entries.pop_front(); // remove the item which is opened for reading.
// std::string url = curList.root.getUrl().c_str();
//FAIL_MSG("url: %s, root: %s",url.c_str(), curList.root.getUrl().c_str());
if (curList.isUrl()){
if(curList.loadURL(url)){
curList.entries.pop_front(); // remove the item which is opened for reading.
}else{
if(curList.DL.getStatusCode() == 404){
curList.entries.pop_front(); //remove files on 404 errors.
WARN_MSG("removing non existing file from the queue: %s", url.c_str());
}
}
}
if (curList.playlistType == LIVE){
@ -928,13 +1090,18 @@ namespace Mist{
return false;
}
in.close();
url = curList.uri_root + curList.entries.at(currentIndex).filename;
url = curList.uri_root + curList.entries.at(currentIndex).filename;
url = curList.root.link(curList.entries.at(currentIndex).filename).getUrl(); //use link
in.open(url.c_str());
return true;
}
/// return the playlist id from which we need to read the first upcoming segment by timestamp.
/// return the playlist id from which we need to read the first upcoming segment
/// by timestamp.
/// this will keep the playlists in sync while reading segments.
int inputHLS::firstSegment(){
uint64_t firstTimeStamp = 0;
@ -956,13 +1123,15 @@ namespace Mist{
void inputHLS::waitForNextSegment(){
uint32_t pListId = firstSegment();
if (pListId == -1){
FAIL_MSG("no segments");
VERYHIGH_MSG("no segments found!");
return;
}
int segmentTime = playlists[pListId].entries.front().timestamp - Util::bootSecs();
DEBUG_MSG(DLVL_HIGH, "segmenttime: %d, %llu, bootsecs: %llu",segmentTime, playlists[pListId].entries.front().timestamp , Util::bootSecs());
if (segmentTime){
--segmentTime;
while (segmentTime > 1 && (needsLock() || nProxy.userClient.isAlive())){
while (segmentTime > 0 && (needsLock() || nProxy.userClient.isAlive())){
//INFO_MSG("waiting for segment...");
Util::wait(1000);
--segmentTime;
continueNegotiate();
@ -970,5 +1139,9 @@ namespace Mist{
}
}
}
}

View file

@ -9,6 +9,11 @@
#include <string>
#include <vector>
//#include <stdint.h>
#include <mist/http_parser.h>
#include <mist/downloader.h>
#define BUFFERTIME 10
@ -32,9 +37,15 @@ namespace Mist{
bool reload();
void addEntry(const std::string &filename, float duration, uint64_t &totalBytes);
bool loadURL(const std::string &loadUrl);
bool isSupportedFile(const std::string filename);
std::string uri; //link to the current playlistfile
std::string uri_root;
HTTP::URL root;
HTTP::Downloader DL;
std::string uri;
std::string uri_root;
std::string source;
const char *packetPtr;
@ -67,6 +78,7 @@ namespace Mist{
~inputHLS();
bool needsLock();
bool openStreamSource();
bool callback();
protected:
// Private Functions