Support for clearkey encrypted HLS input and multithreaded HLS input playlist updating
This commit is contained in:
parent
c64160f4d8
commit
9d1b3cfe98
2 changed files with 389 additions and 315 deletions
|
@ -19,11 +19,22 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include "mbedtls/aes.h"
|
||||||
|
|
||||||
#define SEM_TS_CLAIM "/MstTSIN%s"
|
#define SEM_TS_CLAIM "/MstTSIN%s"
|
||||||
|
|
||||||
namespace Mist{
|
namespace Mist{
|
||||||
|
|
||||||
|
///Mutex for accesses to listEntries
|
||||||
|
tthread::mutex entryMutex;
|
||||||
|
|
||||||
|
static unsigned int plsTotalCount = 0;///Total playlists active
|
||||||
|
static unsigned int plsInitCount = 0;///Count of playlists fully inited
|
||||||
|
|
||||||
|
bool streamIsLive;
|
||||||
|
uint32_t globalWaitTime;
|
||||||
|
std::map<uint32_t, std::deque<playListEntries> > listEntries;
|
||||||
|
|
||||||
// These are used in the HTTP::Downloader callback, to prevent timeouts when downloading
|
// These are used in the HTTP::Downloader callback, to prevent timeouts when downloading
|
||||||
// segments/playlists.
|
// segments/playlists.
|
||||||
inputHLS *self = 0;
|
inputHLS *self = 0;
|
||||||
|
@ -40,12 +51,48 @@ namespace Mist{
|
||||||
if (s.length() > 0 && s.at(s.length() - 1) == '\r'){s.erase(s.size() - 1);}
|
if (s.length() > 0 && s.at(s.length() - 1) == '\r'){s.erase(s.size() - 1);}
|
||||||
}
|
}
|
||||||
|
|
||||||
Playlist::Playlist(const std::string &uriSrc){
|
/// Helper function that is used to run the playlist downloaders
|
||||||
INFO_MSG("Adding variant playlist: %s", uriSrc.c_str());
|
/// Expects character array with playlist URL as argument, sets the first byte of the pointer to zero when loaded.
|
||||||
|
void playlistRunner(void * ptr){
|
||||||
|
if (!ptr){return;}//abort if we received a null pointer - something is seriously wrong
|
||||||
|
bool initOnly = false;
|
||||||
|
if (((char*)ptr)[0] == ';'){initOnly = true;}
|
||||||
|
|
||||||
|
Playlist pls(initOnly?((char*)ptr)+1:(char*)ptr);
|
||||||
|
plsTotalCount++;
|
||||||
|
//signal that we have now copied the URL and no longer need it
|
||||||
|
((char*)ptr)[0] = 0;
|
||||||
|
|
||||||
|
if (!pls.uri.size()){
|
||||||
|
FAIL_MSG("Variant playlist URL is empty, aborting update thread.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pls.reload();
|
||||||
|
plsInitCount++;
|
||||||
|
if (initOnly){return;}//Exit because init-only mode
|
||||||
|
|
||||||
|
while (self->config->is_active){
|
||||||
|
//If the timer has not expired yet, sleep up to a second. Otherwise, reload.
|
||||||
|
/// \TODO Sleep longer if that makes sense?
|
||||||
|
if (pls.reloadNext > Util::bootSecs()){
|
||||||
|
Util::sleep(1000);
|
||||||
|
}else{
|
||||||
|
pls.reload();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
SegmentDownloader::SegmentDownloader(){
|
||||||
segDL.progressCallback = callbackFunc;
|
segDL.progressCallback = callbackFunc;
|
||||||
segDL.dataTimeout = 5;
|
segDL.dataTimeout = 5;
|
||||||
segDL.retryCount = 5;
|
segDL.retryCount = 5;
|
||||||
plsDL.progressCallback = callbackFunc;
|
}
|
||||||
|
|
||||||
|
Playlist::Playlist(const std::string &uriSrc){
|
||||||
|
id = 0;//to be set later
|
||||||
|
INFO_MSG("Adding variant playlist: %s", uriSrc.c_str());
|
||||||
plsDL.dataTimeout = 15;
|
plsDL.dataTimeout = 15;
|
||||||
plsDL.retryCount = 8;
|
plsDL.retryCount = 8;
|
||||||
lastFileIndex = 0;
|
lastFileIndex = 0;
|
||||||
|
@ -55,12 +102,44 @@ namespace Mist{
|
||||||
lastTimestamp = 0;
|
lastTimestamp = 0;
|
||||||
uri = uriSrc;
|
uri = uriSrc;
|
||||||
root = HTTP::URL(uri);
|
root = HTTP::URL(uri);
|
||||||
|
memset(keyAES, 0, 16);
|
||||||
startTime = Util::bootSecs();
|
startTime = Util::bootSecs();
|
||||||
if (uri.size()){reload();}
|
reloadNext = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void parseKey(std::string key, char * newKey, unsigned int len){
|
||||||
|
memset(newKey, 0, len);
|
||||||
|
for (size_t i = 0; i < key.size() && i < (len << 1); ++i){
|
||||||
|
char c = key[i];
|
||||||
|
newKey[i>>1] |= ((c&15) + (((c&64)>>6) | ((c&64)>>3))) << ((~i&1) << 2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void flipKey(char * d){
|
||||||
|
for(size_t i = 0; i< 8; i++){
|
||||||
|
char tmp = d[i];
|
||||||
|
d[i] = d[15-i];
|
||||||
|
d[15-i]=tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string printhex(const char * data, size_t len)
|
||||||
|
{
|
||||||
|
static const char* const lut = "0123456789ABCDEF";
|
||||||
|
|
||||||
|
std::string output;
|
||||||
|
output.reserve(2 * len);
|
||||||
|
for (size_t i = 0; i < len; ++i)
|
||||||
|
{
|
||||||
|
const unsigned char c = data[i];
|
||||||
|
output.push_back(lut[c >> 4]);
|
||||||
|
output.push_back(lut[c & 15]);
|
||||||
|
}
|
||||||
|
return output;
|
||||||
|
}
|
||||||
/// Returns true if packetPtr is at the end of the current segment.
|
/// Returns true if packetPtr is at the end of the current segment.
|
||||||
bool Playlist::atEnd() const{
|
bool SegmentDownloader::atEnd() const{
|
||||||
return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
|
return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,9 +147,13 @@ namespace Mist{
|
||||||
bool Playlist::isUrl() const{return root.protocol.size();}
|
bool Playlist::isUrl() const{return root.protocol.size();}
|
||||||
|
|
||||||
/// Loads the given segment URL into the segment buffer.
|
/// Loads the given segment URL into the segment buffer.
|
||||||
bool Playlist::loadSegment(const HTTP::URL &uri){
|
bool SegmentDownloader::loadSegment(const playListEntries & entry){
|
||||||
if (!segDL.get(uri)){
|
std::string hexKey = printhex(entry.keyAES,16);
|
||||||
FAIL_MSG("failed download: %s", uri.getUrl().c_str());
|
std::string hexIvec = printhex(entry.ivec, 16);
|
||||||
|
|
||||||
|
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), hexIvec.c_str());
|
||||||
|
if (!segDL.get(entry.filename)){
|
||||||
|
FAIL_MSG("failed download: %s", entry.filename.c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,25 +171,59 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//If we have a non-null key, decrypt
|
||||||
|
if (entry.keyAES[ 0] != 0 || entry.keyAES[ 1] != 0 || entry.keyAES[ 2] != 0 || entry.keyAES[ 3] != 0 || \
|
||||||
|
entry.keyAES[ 4] != 0 || entry.keyAES[ 5] != 0 || entry.keyAES[ 6] != 0 || entry.keyAES[ 7] != 0 || \
|
||||||
|
entry.keyAES[ 8] != 0 || entry.keyAES[ 9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || \
|
||||||
|
entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){
|
||||||
|
//Setup AES context
|
||||||
|
mbedtls_aes_context aes;
|
||||||
|
//Load key for decryption
|
||||||
|
mbedtls_aes_setkey_dec(&aes, (const unsigned char*)entry.keyAES, 128);
|
||||||
|
//Allocate a pointer for writing the decrypted data to
|
||||||
|
static Util::ResizeablePointer outdata;
|
||||||
|
outdata.allocate(segDL.data().size());
|
||||||
|
//Actually decrypt the data
|
||||||
|
unsigned char tmpIvec[16];
|
||||||
|
memcpy(tmpIvec, entry.ivec, 16);
|
||||||
|
|
||||||
|
|
||||||
|
mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec, (const unsigned char*)segDL.data().data(), (unsigned char*)(char*)outdata);
|
||||||
|
//Data is now still padded, the padding consists of X bytes of padding, all containing the raw value X.
|
||||||
|
//Since padding is mandatory, we can simply read the last byte and remove X bytes from the length.
|
||||||
|
if (segDL.data().size() <= outdata[segDL.data().size()-1]){
|
||||||
|
FAIL_MSG("Encryption padding is >= entire segment. Considering download failed.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
size_t newSize = segDL.data().size() - outdata[segDL.data().size()-1];
|
||||||
|
//Finally, overwrite the original data buffer with the new one
|
||||||
|
segDL.data().assign(outdata, newSize);
|
||||||
|
}
|
||||||
|
|
||||||
// check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes
|
// check first byte = 0x47. begin of ts file, then check if it is a multiple of 188bytes
|
||||||
if (segDL.data().data()[0] == 0x47){
|
if (segDL.data().data()[0] == 0x47){
|
||||||
if (segDL.data().size() % 188){
|
if (segDL.data().size() % 188){
|
||||||
FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s",
|
FAIL_MSG("Expected a multiple of 188 bytes, received %d bytes. url: %s",
|
||||||
segDL.data().size(), uri.getUrl().c_str());
|
segDL.data().size(), entry.filename.c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}else if (segDL.data().data()[5] == 0x47){
|
}else if (segDL.data().data()[5] == 0x47){
|
||||||
if (segDL.data().size() % 192){
|
if (segDL.data().size() % 192){
|
||||||
FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s",
|
FAIL_MSG("Expected a multiple of 192 bytes, received %d bytes. url: %s",
|
||||||
segDL.data().size(), uri.getUrl().c_str());
|
segDL.data().size(), entry.filename.c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}else{
|
||||||
|
FAIL_MSG("Segment does not appear to contain TS data. Considering download failed.");
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
packetPtr = segDL.data().data();
|
packetPtr = segDL.data().data();
|
||||||
|
HIGH_MSG("Segment download complete and passed sanity checks");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Handles both initial load and future reloads.
|
/// Handles both initial load and future reloads.
|
||||||
/// Returns how many segments were added to the internal segment list.
|
/// Returns how many segments were added to the internal segment list.
|
||||||
bool Playlist::reload(){
|
bool Playlist::reload(){
|
||||||
|
@ -114,6 +231,11 @@ namespace Mist{
|
||||||
std::string line;
|
std::string line;
|
||||||
std::string key;
|
std::string key;
|
||||||
std::string val;
|
std::string val;
|
||||||
|
|
||||||
|
std::string keyMethod;
|
||||||
|
std::string keyUri;
|
||||||
|
std::string keyIV;
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
uint64_t totalBytes = 0;
|
uint64_t totalBytes = 0;
|
||||||
|
|
||||||
|
@ -147,6 +269,29 @@ namespace Mist{
|
||||||
key = line.substr(7, pos - 7);
|
key = line.substr(7, pos - 7);
|
||||||
val = line.c_str() + pos + 1;
|
val = line.c_str() + pos + 1;
|
||||||
|
|
||||||
|
if(key == "KEY" ){
|
||||||
|
size_t tmpPos = val.find("METHOD=");
|
||||||
|
size_t tmpPos2 = val.substr(tmpPos).find(",");
|
||||||
|
keyMethod = val.substr(tmpPos +7, tmpPos2-tmpPos-7);
|
||||||
|
|
||||||
|
tmpPos = val.find("URI=\"");
|
||||||
|
tmpPos2 = val.substr(tmpPos+5).find("\"");
|
||||||
|
keyUri = val.substr(tmpPos + 5, tmpPos2);
|
||||||
|
|
||||||
|
tmpPos = val.find("IV=");
|
||||||
|
keyIV = val.substr(tmpPos+5, 32);
|
||||||
|
|
||||||
|
//when key not found, download and store it in the map
|
||||||
|
if (!keys.count(keyUri)) {
|
||||||
|
HTTP::Downloader keyDL;
|
||||||
|
if (!keyDL.get(root.link(keyUri)) || !keyDL.isOk()){
|
||||||
|
FAIL_MSG("Could not retrieve decryption key from '%s'", root.link(keyUri).getUrl().c_str());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
keys.insert(std::pair<std::string, std::string>(keyUri, keyDL.data()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (key == "TARGETDURATION"){
|
if (key == "TARGETDURATION"){
|
||||||
waitTime = atoi(val.c_str()) / 2;
|
waitTime = atoi(val.c_str()) / 2;
|
||||||
if (waitTime < 5){waitTime = 5;}
|
if (waitTime < 5){waitTime = 5;}
|
||||||
|
@ -183,7 +328,10 @@ namespace Mist{
|
||||||
// check for already added segments
|
// check for already added segments
|
||||||
if (fileNo >= lastFileIndex){
|
if (fileNo >= lastFileIndex){
|
||||||
cleanLine(filename);
|
cleanLine(filename);
|
||||||
addEntry(filename, f, totalBytes);
|
filename = root.link(filename).getUrl();
|
||||||
|
char ivec[16];
|
||||||
|
parseKey(keyIV, ivec, 16);
|
||||||
|
addEntry(filename, f, totalBytes,keys[keyUri],std::string(ivec,16));
|
||||||
lastFileIndex = fileNo + 1;
|
lastFileIndex = fileNo + 1;
|
||||||
++count;
|
++count;
|
||||||
}
|
}
|
||||||
|
@ -196,6 +344,10 @@ namespace Mist{
|
||||||
}else{
|
}else{
|
||||||
fileSource.close();
|
fileSource.close();
|
||||||
}
|
}
|
||||||
|
//Set the global live/vod bool to live if this playlist looks like a live playlist
|
||||||
|
if (playlistType == LIVE){streamIsLive = true;}
|
||||||
|
|
||||||
|
if (globalWaitTime < waitTime){globalWaitTime = waitTime;}
|
||||||
|
|
||||||
reloadNext = Util::bootSecs() + waitTime;
|
reloadNext = Util::bootSecs() + waitTime;
|
||||||
return (count > 0);
|
return (count > 0);
|
||||||
|
@ -218,13 +370,12 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds playlist segments to be processed
|
/// Adds playlist segments to be processed
|
||||||
void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes){
|
void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &iv){
|
||||||
if (!isSupportedFile(filename)){
|
if (!isSupportedFile(filename)){
|
||||||
WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
|
WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
MEDIUM_MSG("Adding segment (%d): %s", lastFileIndex, filename.c_str());
|
|
||||||
|
|
||||||
playListEntries entry;
|
playListEntries entry;
|
||||||
entry.filename = filename;
|
entry.filename = filename;
|
||||||
|
@ -232,6 +383,16 @@ namespace Mist{
|
||||||
entry.bytePos = totalBytes;
|
entry.bytePos = totalBytes;
|
||||||
entry.duration = duration;
|
entry.duration = duration;
|
||||||
|
|
||||||
|
if(key.size() && iv.size()){
|
||||||
|
memcpy(entry.ivec, iv.data(), 16);
|
||||||
|
memcpy(entry.keyAES, key.data(), 16);
|
||||||
|
}else{
|
||||||
|
memset(entry.ivec, 0, 16);
|
||||||
|
memset(entry.keyAES, 0, 16);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (!isUrl()){
|
if (!isUrl()){
|
||||||
std::ifstream fileSource;
|
std::ifstream fileSource;
|
||||||
std::string test = root.link(entry.filename).getFilePath();
|
std::string test = root.link(entry.filename).getFilePath();
|
||||||
|
@ -242,12 +403,22 @@ namespace Mist{
|
||||||
|
|
||||||
entry.timestamp = lastTimestamp + startTime;
|
entry.timestamp = lastTimestamp + startTime;
|
||||||
lastTimestamp += duration;
|
lastTimestamp += duration;
|
||||||
entries.push_back(entry);
|
{
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
|
//Set a playlist ID if we haven't assigned one yet.
|
||||||
|
//Note: This method requires never removing playlists, only adding.
|
||||||
|
//The mutex assures we have a unique count/number.
|
||||||
|
if (!id){id = listEntries.size()+1;}
|
||||||
|
listEntries[id].push_back(entry);
|
||||||
|
MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %d queued): %s", id, lastFileIndex, listEntries[id].size(), filename.c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Constructor of HLS Input
|
/// Constructor of HLS Input
|
||||||
inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){
|
inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){
|
||||||
self = this;
|
self = this;
|
||||||
|
streamIsLive = false;
|
||||||
|
globalWaitTime = 0;
|
||||||
currentPlaylist = 0;
|
currentPlaylist = 0;
|
||||||
|
|
||||||
capa["name"] = "HLS";
|
capa["name"] = "HLS";
|
||||||
|
@ -280,7 +451,9 @@ namespace Mist{
|
||||||
bool inputHLS::checkArguments(){
|
bool inputHLS::checkArguments(){
|
||||||
config->is_active = true;
|
config->is_active = true;
|
||||||
if (config->getString("input") == "-"){return false;}
|
if (config->getString("input") == "-"){return false;}
|
||||||
if (!initPlaylist(config->getString("input"))){return false;}
|
HTTP::URL mainPls(config->getString("input"));
|
||||||
|
if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){return false;}
|
||||||
|
if (!initPlaylist(config->getString("input"), false)){return false;}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,10 +472,15 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
void inputHLS::parseStreamHeader(){
|
void inputHLS::parseStreamHeader(){
|
||||||
|
if (!initPlaylist(config->getString("input"))){
|
||||||
|
FAIL_MSG("Failed to load HLS playlist, aborting");
|
||||||
|
myMeta = DTSC::Meta();
|
||||||
|
return;
|
||||||
|
}
|
||||||
myMeta = DTSC::Meta();
|
myMeta = DTSC::Meta();
|
||||||
myMeta.live = false;
|
myMeta.live = false;
|
||||||
myMeta.vod = true;
|
myMeta.vod = true;
|
||||||
VERYHIGH_MSG("parsestream");
|
INFO_MSG("Parsing live stream to create header...");
|
||||||
TS::Packet packet; // to analyse and extract data
|
TS::Packet packet; // to analyse and extract data
|
||||||
int counter = 1;
|
int counter = 1;
|
||||||
|
|
||||||
|
@ -310,89 +488,69 @@ namespace Mist{
|
||||||
unsigned int dataLen;
|
unsigned int dataLen;
|
||||||
bool keepReading = false;
|
bool keepReading = false;
|
||||||
|
|
||||||
for (std::vector<Playlist>::iterator pListIt = playlists.begin(); pListIt != playlists.end();
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
|
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end();
|
||||||
pListIt++){
|
pListIt++){
|
||||||
if (!pListIt->entries.size()){continue;}
|
//Skip empty playlists
|
||||||
|
if (!pListIt->second.size()){continue;}
|
||||||
int preCounter = counter;
|
int preCounter = counter;
|
||||||
tsStream.clear();
|
tsStream.clear();
|
||||||
|
|
||||||
|
|
||||||
std::deque<playListEntries>::iterator entryIt = pListIt->entries.begin();
|
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end(); ++entryIt){
|
||||||
while (true){
|
|
||||||
uint64_t lastBpos = entryIt->bytePos;
|
uint64_t lastBpos = entryIt->bytePos;
|
||||||
|
nProxy.userClient.keepAlive();
|
||||||
if (pListIt->isUrl()){
|
if (!segDowner.loadSegment(*entryIt)){
|
||||||
bool ret = false;
|
WARN_MSG("Skipping segment that could not be loaded in an attempt to recover");
|
||||||
nProxy.userClient.keepAlive();
|
tsStream.clear();
|
||||||
ret = pListIt->loadSegment(pListIt->root.link(entryIt->filename));
|
continue;
|
||||||
keepReading = packet.FromPointer(pListIt->packetPtr);
|
|
||||||
pListIt->packetPtr += 188;
|
|
||||||
}else{
|
|
||||||
in.open(pListIt->root.link(entryIt->filename).getUrl().c_str());
|
|
||||||
if (!in.good()){
|
|
||||||
FAIL_MSG("Could not open segment (%s): %s", strerror(errno),
|
|
||||||
pListIt->root.link(entryIt->filename).getFilePath().c_str());
|
|
||||||
continue; // skip to the next one
|
|
||||||
}
|
|
||||||
keepReading = packet.FromStream(in);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (keepReading){
|
do{
|
||||||
|
if (!packet.FromPointer(segDowner.packetPtr)){
|
||||||
|
WARN_MSG("Could not load TS packet, aborting segment parse");
|
||||||
|
tsStream.clear();
|
||||||
|
break;//Abort load
|
||||||
|
}
|
||||||
tsStream.parse(packet, lastBpos);
|
tsStream.parse(packet, lastBpos);
|
||||||
if (pListIt->isUrl()){
|
segDowner.packetPtr += 188;
|
||||||
lastBpos = entryIt->bytePos + pListIt->segDL.data().size();
|
|
||||||
}else{
|
|
||||||
lastBpos = entryIt->bytePos + in.tellg();
|
|
||||||
}
|
|
||||||
|
|
||||||
while (tsStream.hasPacketOnEachTrack()){
|
if (tsStream.hasPacketOnEachTrack()){
|
||||||
DTSC::Packet headerPack;
|
while (tsStream.hasPacket()){
|
||||||
tsStream.getEarliestPacket(headerPack);
|
DTSC::Packet headerPack;
|
||||||
int tmpTrackId = headerPack.getTrackId();
|
tsStream.getEarliestPacket(headerPack);
|
||||||
uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId];
|
int tmpTrackId = headerPack.getTrackId();
|
||||||
|
uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId];
|
||||||
|
|
||||||
if (packetId == 0){
|
if (packetId == 0){
|
||||||
pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter;
|
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
|
||||||
pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId();
|
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
|
||||||
packetId = counter;
|
packetId = counter;
|
||||||
HIGH_MSG("Added file %s, trackid: %d, mapped to: %d",
|
VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), headerPack.getTrackId(), counter);
|
||||||
pListIt->root.link(entryIt->filename).getUrl().c_str(),
|
counter++;
|
||||||
headerPack.getTrackId(), counter);
|
}
|
||||||
counter++;
|
|
||||||
}
|
if ((!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){
|
||||||
|
tsStream.initializeMetadata(myMeta, tmpTrackId, packetId);
|
||||||
if ((!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){
|
myMeta.tracks[packetId].minKeepAway = globalWaitTime * 2000;
|
||||||
tsStream.initializeMetadata(myMeta, tmpTrackId, packetId);
|
VERYHIGH_MSG("setting minKeepAway = %d for track: %d",
|
||||||
myMeta.tracks[packetId].minKeepAway = pListIt->waitTime * 2000;
|
myMeta.tracks[packetId].minKeepAway, packetId);
|
||||||
VERYHIGH_MSG("setting minKeepAway = %d for track: %d",
|
}
|
||||||
myMeta.tracks[packetId].minKeepAway, packetId);
|
|
||||||
}
|
}
|
||||||
|
break;//we have all tracks discovered, next playlist!
|
||||||
}
|
}
|
||||||
|
}while(!segDowner.atEnd());
|
||||||
if (pListIt->isUrl()){
|
if (preCounter < counter){break;}//We're done reading this playlist!
|
||||||
keepReading = !pListIt->atEnd();
|
|
||||||
if (keepReading){
|
|
||||||
packet.FromPointer(pListIt->packetPtr);
|
|
||||||
pListIt->packetPtr += 188;
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
keepReading = packet.FromStream(in);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
in.close();
|
|
||||||
|
|
||||||
//Go to next segment, abort if we found at least one track or ran out of segments.
|
|
||||||
entryIt++;
|
|
||||||
if (counter != preCounter || entryIt == pListIt->entries.end()){break;}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tsStream.clear();
|
tsStream.clear();
|
||||||
in.close();
|
currentPlaylist = 0;
|
||||||
|
segDowner.segDL.data().clear();//make sure we have nothing left over
|
||||||
|
INFO_MSG("header complete, beginning live ingest of %d tracks", counter-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool inputHLS::readHeader(){
|
bool inputHLS::readHeader(){
|
||||||
if (playlists.size() && playlists[0].playlistType == LIVE){return true;}
|
if (streamIsLive){return true;}
|
||||||
|
|
||||||
std::istringstream urlSource;
|
std::istringstream urlSource;
|
||||||
std::ifstream fileSource;
|
std::ifstream fileSource;
|
||||||
|
@ -416,54 +574,43 @@ namespace Mist{
|
||||||
char *data;
|
char *data;
|
||||||
size_t dataLen;
|
size_t dataLen;
|
||||||
|
|
||||||
for (std::vector<Playlist>::iterator pListIt = playlists.begin(); pListIt != playlists.end();
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
|
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end();
|
||||||
pListIt++){
|
pListIt++){
|
||||||
tsStream.clear();
|
tsStream.clear();
|
||||||
uint32_t entId = 0;
|
uint32_t entId = 0;
|
||||||
|
|
||||||
for (std::deque<playListEntries>::iterator entryIt = pListIt->entries.begin();
|
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
|
||||||
entryIt != pListIt->entries.end(); entryIt++){
|
entryIt != pListIt->second.end(); entryIt++){
|
||||||
tsStream.partialClear();
|
tsStream.partialClear();
|
||||||
endOfFile = false;
|
endOfFile = false;
|
||||||
|
|
||||||
if (pListIt->isUrl()){
|
segDowner.loadSegment(*entryIt);
|
||||||
pListIt->loadSegment(pListIt->root.link(entryIt->filename));
|
endOfFile = !segDowner.atEnd();
|
||||||
endOfFile = !pListIt->atEnd();
|
if (!endOfFile){packet.FromPointer(segDowner.packetPtr);}
|
||||||
if (!endOfFile){packet.FromPointer(pListIt->packetPtr);}
|
segDowner.packetPtr += 188;
|
||||||
pListIt->packetPtr += 188;
|
|
||||||
}else{
|
|
||||||
in.close();
|
|
||||||
in.open(pListIt->root.link(entryIt->filename).getFilePath().c_str());
|
|
||||||
if (!in.good()){
|
|
||||||
FAIL_MSG("Could not open segment (%s): %s", strerror(errno),
|
|
||||||
pListIt->root.link(entryIt->filename).getFilePath().c_str());
|
|
||||||
continue; // skip to the next one
|
|
||||||
}
|
|
||||||
packet.FromStream(in);
|
|
||||||
endOfFile = in.eof();
|
|
||||||
}
|
|
||||||
|
|
||||||
entId++;
|
entId++;
|
||||||
uint64_t lastBpos = entryIt->bytePos;
|
uint64_t lastBpos = entryIt->bytePos;
|
||||||
while (!endOfFile){
|
while (!endOfFile){
|
||||||
tsStream.parse(packet, lastBpos);
|
tsStream.parse(packet, lastBpos);
|
||||||
|
|
||||||
if (pListIt->isUrl()){
|
//if (pListIt->isUrl()){
|
||||||
lastBpos = entryIt->bytePos + pListIt->segDL.data().size();
|
lastBpos = entryIt->bytePos + segDowner.segDL.data().size();
|
||||||
}else{
|
//}else{
|
||||||
lastBpos = entryIt->bytePos + in.tellg();
|
// lastBpos = entryIt->bytePos + in.tellg();
|
||||||
}
|
//}
|
||||||
|
|
||||||
while (tsStream.hasPacketOnEachTrack()){
|
while (tsStream.hasPacketOnEachTrack()){
|
||||||
DTSC::Packet headerPack;
|
DTSC::Packet headerPack;
|
||||||
tsStream.getEarliestPacket(headerPack);
|
tsStream.getEarliestPacket(headerPack);
|
||||||
|
|
||||||
int tmpTrackId = headerPack.getTrackId();
|
int tmpTrackId = headerPack.getTrackId();
|
||||||
uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId];
|
uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId];
|
||||||
|
|
||||||
if (packetId == 0){
|
if (packetId == 0){
|
||||||
pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter;
|
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
|
||||||
pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId();
|
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
|
||||||
packetId = counter;
|
packetId = counter;
|
||||||
counter++;
|
counter++;
|
||||||
}
|
}
|
||||||
|
@ -487,16 +634,16 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pListIt->isUrl()){
|
//if (pListIt->isUrl()){
|
||||||
endOfFile = pListIt->atEnd();
|
endOfFile = segDowner.atEnd();
|
||||||
if (!endOfFile){
|
if (!endOfFile){
|
||||||
packet.FromPointer(pListIt->packetPtr);
|
packet.FromPointer(segDowner.packetPtr);
|
||||||
pListIt->packetPtr += 188;
|
segDowner.packetPtr += 188;
|
||||||
}
|
}
|
||||||
}else{
|
//}else{
|
||||||
packet.FromStream(in);
|
// packet.FromStream(in);
|
||||||
endOfFile = in.eof();
|
// endOfFile = in.eof();
|
||||||
}
|
//}
|
||||||
}
|
}
|
||||||
// get last packets
|
// get last packets
|
||||||
tsStream.finish();
|
tsStream.finish();
|
||||||
|
@ -504,14 +651,14 @@ namespace Mist{
|
||||||
tsStream.getEarliestPacket(headerPack);
|
tsStream.getEarliestPacket(headerPack);
|
||||||
while (headerPack){
|
while (headerPack){
|
||||||
int tmpTrackId = headerPack.getTrackId();
|
int tmpTrackId = headerPack.getTrackId();
|
||||||
uint64_t packetId = pidMapping[(((uint64_t)pListIt->id) << 32) + tmpTrackId];
|
uint64_t packetId = pidMapping[(((uint64_t)pListIt->first) << 32) + tmpTrackId];
|
||||||
|
|
||||||
if (packetId == 0){
|
if (packetId == 0){
|
||||||
pidMapping[(((uint64_t)pListIt->id) << 32) + headerPack.getTrackId()] = counter;
|
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
|
||||||
pidMappingR[counter] = (((uint64_t)pListIt->id) << 32) + headerPack.getTrackId();
|
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
|
||||||
packetId = counter;
|
packetId = counter;
|
||||||
INFO_MSG("Added file %s, trackid: %d, mapped to: %d",
|
INFO_MSG("Added file %s, trackid: %d, mapped to: %d",
|
||||||
pListIt->root.link(entryIt->filename).getUrl().c_str(),
|
entryIt->filename.c_str(),
|
||||||
headerPack.getTrackId(), counter);
|
headerPack.getTrackId(), counter);
|
||||||
counter++;
|
counter++;
|
||||||
}
|
}
|
||||||
|
@ -535,83 +682,40 @@ namespace Mist{
|
||||||
tsStream.getEarliestPacket(headerPack);
|
tsStream.getEarliestPacket(headerPack);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pListIt->isUrl()){in.close();}
|
//if (!pListIt->isUrl()){in.close();}
|
||||||
|
|
||||||
if (hasHeader){break;}
|
if (hasHeader){break;}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasHeader || (playlists.size() && playlists[0].isUrl())){return true;}
|
if (streamIsLive){return true;}
|
||||||
|
|
||||||
INFO_MSG("write header file...");
|
INFO_MSG("write header file...");
|
||||||
std::ofstream oFile((config->getString("input") + ".dtsh").c_str());
|
std::ofstream oFile((config->getString("input") + ".dtsh").c_str());
|
||||||
|
|
||||||
oFile << myMeta.toJSON().toNetPacked();
|
oFile << myMeta.toJSON().toNetPacked();
|
||||||
oFile.close();
|
oFile.close();
|
||||||
in.close();
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool inputHLS::needsLock(){
|
bool inputHLS::needsLock(){
|
||||||
if (playlists.size() && playlists[0].isUrl()){return false;}
|
return !streamIsLive;
|
||||||
return (playlists.size() <= currentPlaylist) ||
|
|
||||||
!(playlists[currentPlaylist].playlistType == LIVE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool inputHLS::openStreamSource(){return true;}
|
bool inputHLS::openStreamSource(){return true;}
|
||||||
|
|
||||||
int inputHLS::getFirstPlaylistToReload(){
|
|
||||||
int plsNum = 0;
|
|
||||||
int bestPls = 0;
|
|
||||||
uint64_t earRld = 0;
|
|
||||||
for (std::vector<Playlist>::iterator it = playlists.begin(); it != playlists.end(); ++it){
|
|
||||||
if (!plsNum || it->reloadNext < earRld){
|
|
||||||
bestPls = plsNum;
|
|
||||||
earRld = it->reloadNext;
|
|
||||||
}
|
|
||||||
++plsNum;
|
|
||||||
}
|
|
||||||
return bestPls;
|
|
||||||
}
|
|
||||||
|
|
||||||
void inputHLS::getNext(bool smart){
|
void inputHLS::getNext(bool smart){
|
||||||
currentPlaylist = firstSegment();
|
|
||||||
INSANE_MSG("Getting next");
|
INSANE_MSG("Getting next");
|
||||||
uint32_t tid = 0;
|
uint32_t tid = 0;
|
||||||
bool endOfFile = false;
|
static bool endOfFile = false;
|
||||||
if (selectedTracks.size()){tid = *selectedTracks.begin();}
|
if (selectedTracks.size()){tid = *selectedTracks.begin();}
|
||||||
thisPacket.null();
|
thisPacket.null();
|
||||||
while (config->is_active && (needsLock() || nProxy.userClient.isAlive())){
|
while (config->is_active && (needsLock() || nProxy.userClient.isAlive())){
|
||||||
int oldPlaylist = currentPlaylist;
|
|
||||||
currentPlaylist = firstSegment();
|
|
||||||
|
|
||||||
// If we have a new playlist, print that info.
|
|
||||||
if (currentPlaylist >= 0 && oldPlaylist != currentPlaylist){
|
|
||||||
MEDIUM_MSG("Switched to playlist %d", currentPlaylist);
|
|
||||||
}
|
|
||||||
|
|
||||||
// No segments? Wait until next playlist reloading time.
|
|
||||||
if (currentPlaylist < 0){
|
|
||||||
int a = getFirstPlaylistToReload();
|
|
||||||
MEDIUM_MSG("Waiting for %d seconds until next playlist reload...",
|
|
||||||
playlists[a].reloadNext - Util::bootSecs());
|
|
||||||
while (Util::bootSecs() < playlists[a].reloadNext &&
|
|
||||||
(needsLock() || nProxy.userClient.isAlive())){
|
|
||||||
Util::wait(1000);
|
|
||||||
nProxy.userClient.keepAlive();
|
|
||||||
}
|
|
||||||
MEDIUM_MSG("Reloading playlist %d", a);
|
|
||||||
playlists[a].reload();
|
|
||||||
currentPlaylist = firstSegment();
|
|
||||||
// Continue regular parsing, in case we need another reload.
|
|
||||||
currentPlaylist = oldPlaylist;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have a packet
|
// Check if we have a packet
|
||||||
bool hasPacket = false;
|
bool hasPacket = false;
|
||||||
if (playlists[currentPlaylist].playlistType == LIVE){
|
if (streamIsLive){
|
||||||
hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket());
|
hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket());
|
||||||
}else{
|
}else{
|
||||||
hasPacket = tsStream.hasPacket(getMappedTrackId(tid));
|
hasPacket = tsStream.hasPacket(getMappedTrackId(tid));
|
||||||
|
@ -637,51 +741,37 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
// No? Let's read some more data and check again.
|
// No? Let's read some more data and check again.
|
||||||
if (playlists[currentPlaylist].isUrl()){
|
if (!segDowner.atEnd()){
|
||||||
if (!playlists[currentPlaylist].atEnd()){
|
tsBuf.FromPointer(segDowner.packetPtr);
|
||||||
tsBuf.FromPointer(playlists[currentPlaylist].packetPtr);
|
segDowner.packetPtr += 188;
|
||||||
playlists[currentPlaylist].packetPtr += 188;
|
tsStream.parse(tsBuf, 0);
|
||||||
tsStream.parse(tsBuf, 0);
|
continue; // check again
|
||||||
continue; // check again
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
if (in.good()){
|
|
||||||
tsBuf.FromStream(in);
|
|
||||||
tsStream.parse(tsBuf, 0);
|
|
||||||
continue; // check again
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Okay, reading more is not possible. Let's call finish() and check again.
|
// Okay, reading more is not possible. Let's call finish() and check again.
|
||||||
if (!endOfFile){
|
if (!endOfFile){
|
||||||
endOfFile = true; // we reached the end of file
|
endOfFile = true; // we reached the end of file
|
||||||
tsStream.finish();
|
tsStream.finish();
|
||||||
MEDIUM_MSG("Finishing reading TS segment");
|
VERYHIGH_MSG("Finishing reading TS segment");
|
||||||
continue; // Check again!
|
continue; // Check again!
|
||||||
}
|
}
|
||||||
|
|
||||||
// No? Then we try to read the next file.
|
// No? Then we try to read the next file.
|
||||||
|
//
|
||||||
// First we handle live playlist reloads, if needed
|
currentPlaylist = firstSegment();
|
||||||
if (playlists[currentPlaylist].playlistType == LIVE){
|
// No segments? Wait until next playlist reloading time.
|
||||||
// Reload the first playlist that needs it, if the time is right
|
if (currentPlaylist < 0){
|
||||||
int a = getFirstPlaylistToReload();
|
VERYHIGH_MSG("Waiting for segments...");
|
||||||
if (playlists[a].reloadNext <= Util::bootSecs()){
|
if (nProxy.userClient.isAlive()){nProxy.userClient.keepAlive();}
|
||||||
MEDIUM_MSG("Reloading playlist %d", a);
|
Util::wait(500);
|
||||||
playlists[a].reload();
|
continue;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we know our playlist is up-to-date, actually try to read the file.
|
// Now that we know our playlist is up-to-date, actually try to read the file.
|
||||||
MEDIUM_MSG("Moving on to next TS segment");
|
VERYHIGH_MSG("Moving on to next TS segment (variant %u)", currentPlaylist);
|
||||||
if (readNextFile()){
|
if (readNextFile()){
|
||||||
MEDIUM_MSG("Next segment read successfully");
|
MEDIUM_MSG("Next segment read successfully");
|
||||||
endOfFile = false; // no longer at end of file
|
endOfFile = false; // no longer at end of file
|
||||||
// Prevent timeouts, we may have just finished a download after all.
|
|
||||||
if (playlists[currentPlaylist].playlistType == LIVE){
|
|
||||||
nProxy.userClient.keepAlive();
|
|
||||||
}
|
|
||||||
continue; // Success! Continue regular parsing.
|
continue; // Success! Continue regular parsing.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -695,31 +785,18 @@ namespace Mist{
|
||||||
|
|
||||||
void inputHLS::readPMT(){
|
void inputHLS::readPMT(){
|
||||||
HIGH_MSG("readPMT()");
|
HIGH_MSG("readPMT()");
|
||||||
if (playlists[currentPlaylist].isUrl()){
|
size_t bpos;
|
||||||
size_t bpos;
|
TS::Packet tsBuffer;
|
||||||
TS::Packet tsBuffer;
|
const char *tmpPtr = segDowner.segDL.data().data();
|
||||||
const char *tmpPtr = playlists[currentPlaylist].segDL.data().data();
|
|
||||||
|
|
||||||
while (!tsStream.hasPacketOnEachTrack() &&
|
while (!tsStream.hasPacketOnEachTrack() &&
|
||||||
(tmpPtr - playlists[currentPlaylist].segDL.data().data() + 188 <=
|
(tmpPtr - segDowner.segDL.data().data() + 188 <=
|
||||||
playlists[currentPlaylist].segDL.data().size())){
|
segDowner.segDL.data().size())){
|
||||||
tsBuffer.FromPointer(tmpPtr);
|
tsBuffer.FromPointer(tmpPtr);
|
||||||
tsStream.parse(tsBuffer, 0);
|
tsStream.parse(tsBuffer, 0);
|
||||||
tmpPtr += 188;
|
tmpPtr += 188;
|
||||||
}
|
|
||||||
tsStream.partialClear();
|
|
||||||
|
|
||||||
}else{
|
|
||||||
size_t bpos = in.tellg();
|
|
||||||
in.seekg(0, in.beg);
|
|
||||||
TS::Packet tsBuffer;
|
|
||||||
while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromStream(in)){
|
|
||||||
tsStream.parse(tsBuffer, 0);
|
|
||||||
}
|
|
||||||
tsStream.partialClear();
|
|
||||||
in.clear();
|
|
||||||
in.seekg(bpos, in.beg);
|
|
||||||
}
|
}
|
||||||
|
tsStream.partialClear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: bpos is overloaded here for playlist entry!
|
// Note: bpos is overloaded here for playlist entry!
|
||||||
|
@ -752,31 +829,22 @@ namespace Mist{
|
||||||
|
|
||||||
currentPlaylist = getMappedTrackPlaylist(trackId);
|
currentPlaylist = getMappedTrackPlaylist(trackId);
|
||||||
|
|
||||||
Playlist &curPlaylist = playlists[currentPlaylist];
|
{//Lock mutex for listEntries
|
||||||
playListEntries &entry = curPlaylist.entries.at(currentIndex);
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
if (curPlaylist.isUrl()){
|
std::deque<playListEntries> &curPlaylist = listEntries[currentPlaylist];
|
||||||
curPlaylist.loadSegment(curPlaylist.root.link(entry.filename));
|
playListEntries &entry = curPlaylist.at(currentIndex);
|
||||||
}else{
|
segDowner.loadSegment(entry);
|
||||||
in.close();
|
|
||||||
in.open(curPlaylist.root.link(entry.filename).getFilePath().c_str());
|
|
||||||
MEDIUM_MSG("Opening segment: %s",
|
|
||||||
curPlaylist.root.link(entry.filename).getFilePath().c_str());
|
|
||||||
if (!in.good()){
|
|
||||||
FAIL_MSG("Could not open segment (%s): %s", strerror(errno),
|
|
||||||
curPlaylist.root.link(entry.filename).getUrl().c_str());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
readPMT();
|
readPMT();
|
||||||
}
|
}
|
||||||
|
|
||||||
int inputHLS::getEntryId(int playlistId, uint64_t bytePos){
|
int inputHLS::getEntryId(int playlistId, uint64_t bytePos){
|
||||||
if (bytePos == 0){return 0;}
|
if (bytePos == 0){return 0;}
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
for (int i = 0; i < playlists[playlistId].entries.size(); i++){
|
for (int i = 0; i < listEntries[playlistId].size(); i++){
|
||||||
if (playlists[playlistId].entries.at(i).bytePos > bytePos){return i - 1;}
|
if (listEntries[playlistId].at(i).bytePos > bytePos){return i - 1;}
|
||||||
}
|
}
|
||||||
|
return listEntries[playlistId].size() - 1;
|
||||||
return playlists[playlistId].entries.size() - 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t inputHLS::getOriginalTrackId(uint32_t playlistId, uint32_t id){
|
uint64_t inputHLS::getOriginalTrackId(uint32_t playlistId, uint32_t id){
|
||||||
|
@ -804,7 +872,13 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses the main playlist, possibly containing variants.
|
/// Parses the main playlist, possibly containing variants.
|
||||||
bool inputHLS::initPlaylist(const std::string &uri){
|
bool inputHLS::initPlaylist(const std::string &uri, bool fullInit){
|
||||||
|
plsInitCount = 0;
|
||||||
|
plsTotalCount = 0;
|
||||||
|
{
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
|
listEntries.clear();
|
||||||
|
}
|
||||||
std::string line;
|
std::string line;
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
startTime = Util::bootSecs();
|
startTime = Util::bootSecs();
|
||||||
|
@ -894,7 +968,7 @@ namespace Mist{
|
||||||
|
|
||||||
if (codecSupported){
|
if (codecSupported){
|
||||||
|
|
||||||
ret = readPlaylist(playlistRootPath.link(line));
|
ret = readPlaylist(playlistRootPath.link(line), fullInit);
|
||||||
}else{
|
}else{
|
||||||
INFO_MSG("skipping variant playlist %s, none of the codecs are supported",
|
INFO_MSG("skipping variant playlist %s, none of the codecs are supported",
|
||||||
playlistRootPath.link(line).getUrl().c_str());
|
playlistRootPath.link(line).getUrl().c_str());
|
||||||
|
@ -910,13 +984,13 @@ namespace Mist{
|
||||||
int pos = line.find("URI");
|
int pos = line.find("URI");
|
||||||
if (pos != std::string::npos){
|
if (pos != std::string::npos){
|
||||||
mediafile = line.substr(pos + 5, line.length() - pos - 6);
|
mediafile = line.substr(pos + 5, line.length() - pos - 6);
|
||||||
ret = readPlaylist(playlistRootPath.link(mediafile));
|
ret = readPlaylist(playlistRootPath.link(mediafile), fullInit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}else if (line.compare(0, 7, "#EXTINF") == 0){
|
}else if (line.compare(0, 7, "#EXTINF") == 0){
|
||||||
// current file is not a variant playlist, but regular playlist.
|
// current file is not a variant playlist, but regular playlist.
|
||||||
ret = readPlaylist(uri);
|
ret = readPlaylist(uri, fullInit);
|
||||||
break;
|
break;
|
||||||
}else{
|
}else{
|
||||||
// ignore wrong lines
|
// ignore wrong lines
|
||||||
|
@ -926,15 +1000,36 @@ namespace Mist{
|
||||||
|
|
||||||
if (!isUrl){fileSource.close();}
|
if (!isUrl){fileSource.close();}
|
||||||
|
|
||||||
|
uint32_t maxWait = 0;
|
||||||
|
unsigned int lastCount = 9999;
|
||||||
|
while (plsTotalCount != plsInitCount && ++maxWait < 50){
|
||||||
|
if (plsInitCount != lastCount){
|
||||||
|
lastCount = plsInitCount;
|
||||||
|
INFO_MSG("Waiting for variant playlists to load... %u/%u", lastCount, plsTotalCount);
|
||||||
|
}
|
||||||
|
Util::sleep(1000);
|
||||||
|
}
|
||||||
|
if (maxWait >= 50){
|
||||||
|
WARN_MSG("Timeout waiting for variant playlists (%u/%u)", plsInitCount, plsTotalCount);
|
||||||
|
}
|
||||||
|
plsInitCount = 0;
|
||||||
|
plsTotalCount = 0;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Function for reading every playlist.
|
/// Function for reading every playlist.
|
||||||
bool inputHLS::readPlaylist(const HTTP::URL &uri){
|
bool inputHLS::readPlaylist(const HTTP::URL &uri, bool fullInit){
|
||||||
Playlist p(uri.protocol.size() ? uri.getUrl() : uri.getFilePath());
|
std::string urlBuffer = (fullInit?"":";")+uri.getUrl();
|
||||||
p.id = playlists.size();
|
tthread::thread runList(playlistRunner, (void *)urlBuffer.data());
|
||||||
// set size of reloadNext to playlist count with default value 0
|
runList.detach(); //Abandon the thread, it's now running independently
|
||||||
playlists.push_back(p);
|
uint32_t timeout = 0;
|
||||||
|
while (urlBuffer.data()[0] && ++timeout < 100){
|
||||||
|
Util::sleep(100);
|
||||||
|
}
|
||||||
|
if (timeout >= 100){
|
||||||
|
WARN_MSG("Thread start timed out for: %s", urlBuffer.c_str());
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -942,53 +1037,25 @@ namespace Mist{
|
||||||
/// to be processed)
|
/// to be processed)
|
||||||
bool inputHLS::readNextFile(){
|
bool inputHLS::readNextFile(){
|
||||||
tsStream.clear();
|
tsStream.clear();
|
||||||
Playlist &curList = playlists[currentPlaylist];
|
|
||||||
|
|
||||||
if (!curList.entries.size()){
|
playListEntries ntry;
|
||||||
WARN_MSG("no entries found in playlist: %d!", currentPlaylist);
|
//This scope limiter prevents the recursion down below from deadlocking us
|
||||||
return false;
|
{
|
||||||
}
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
|
std::deque<playListEntries> &curList = listEntries[currentPlaylist];
|
||||||
// URL-based
|
if (!curList.size()){
|
||||||
if (curList.isUrl()){
|
WARN_MSG("no entries found in playlist: %d!", currentPlaylist);
|
||||||
if (!curList.loadSegment(curList.root.link(curList.entries.front().filename))){
|
return false;
|
||||||
ERROR_MSG("Could not download segment: %s",
|
|
||||||
curList.root.link(curList.entries.front().filename).getUrl().c_str());
|
|
||||||
curList.entries.pop_front();
|
|
||||||
return readNextFile(); // Attempt to read another, if possible.
|
|
||||||
}
|
}
|
||||||
curList.entries.pop_front();
|
ntry = curList.front();
|
||||||
return true;
|
curList.pop_front();
|
||||||
}
|
}
|
||||||
|
|
||||||
// file-based, live
|
if (!segDowner.loadSegment(ntry)){
|
||||||
if (curList.playlistType == LIVE){
|
ERROR_MSG("Could not download segment: %s", ntry.filename.c_str());
|
||||||
in.close();
|
|
||||||
std::string filepath =
|
|
||||||
curList.root.link(curList.entries.at(currentIndex).filename).getFilePath();
|
|
||||||
curList.entries.pop_front(); // remove the item from the playlist
|
|
||||||
in.open(filepath.c_str());
|
|
||||||
if (in.good()){return true;}
|
|
||||||
FAIL_MSG("Could not open segment (%s): %s", strerror(errno), filepath.c_str());
|
|
||||||
return readNextFile(); // Attempt to read another, if possible.
|
return readNextFile(); // Attempt to read another, if possible.
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
// file-based, VoD
|
|
||||||
++currentIndex;
|
|
||||||
if (curList.entries.size() <= currentIndex){
|
|
||||||
HIGH_MSG("end of playlist reached (%u of %u)!", currentIndex, curList.entries.size());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
in.close();
|
|
||||||
std::string filepath =
|
|
||||||
curList.root.link(curList.entries.at(currentIndex).filename).getFilePath();
|
|
||||||
in.open(filepath.c_str());
|
|
||||||
if (in.good()){
|
|
||||||
readPMT();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
FAIL_MSG("Could not open segment (%s): %s", strerror(errno), filepath.c_str());
|
|
||||||
return readNextFile();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return the playlist id from which we need to read the first upcoming segment
|
/// return the playlist id from which we need to read the first upcoming segment
|
||||||
|
@ -999,16 +1066,20 @@ namespace Mist{
|
||||||
if (selectedTracks.size() == 1){return getMappedTrackPlaylist(*selectedTracks.begin());}
|
if (selectedTracks.size() == 1){return getMappedTrackPlaylist(*selectedTracks.begin());}
|
||||||
uint64_t firstTimeStamp = 0;
|
uint64_t firstTimeStamp = 0;
|
||||||
int tmpId = -1;
|
int tmpId = -1;
|
||||||
|
int segCount = 0;
|
||||||
|
|
||||||
for (std::vector<Playlist>::iterator pListIt = playlists.begin(); pListIt != playlists.end();
|
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||||
|
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end();
|
||||||
pListIt++){
|
pListIt++){
|
||||||
if (pListIt->entries.size()){
|
segCount += pListIt->second.size();
|
||||||
if (pListIt->entries.front().timestamp < firstTimeStamp || tmpId < 0){
|
if (pListIt->second.size()){
|
||||||
firstTimeStamp = pListIt->entries.front().timestamp;
|
if (pListIt->second.front().timestamp < firstTimeStamp || tmpId < 0){
|
||||||
tmpId = pListIt->id;
|
firstTimeStamp = pListIt->second.front().timestamp;
|
||||||
|
tmpId = pListIt->first;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
MEDIUM_MSG("Active playlist: %d (%d segments total in queue)", tmpId, segCount);
|
||||||
return tmpId;
|
return tmpId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,10 @@
|
||||||
namespace Mist{
|
namespace Mist{
|
||||||
|
|
||||||
enum PlaylistType{VOD, LIVE, EVENT};
|
enum PlaylistType{VOD, LIVE, EVENT};
|
||||||
|
|
||||||
|
extern bool streamIsLive;
|
||||||
|
extern uint32_t globalWaitTime;//largest waitTime for any playlist we're loading - used to update minKeepAway
|
||||||
|
void parseKey(std::string key, char * newKey, unsigned int len);
|
||||||
|
|
||||||
struct playListEntries{
|
struct playListEntries{
|
||||||
std::string filename;
|
std::string filename;
|
||||||
|
@ -24,25 +28,35 @@ namespace Mist{
|
||||||
float duration;
|
float duration;
|
||||||
unsigned int timestamp;
|
unsigned int timestamp;
|
||||||
unsigned int wait;
|
unsigned int wait;
|
||||||
|
char ivec[16];
|
||||||
|
char keyAES[16];
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Keeps the segment entry list by playlist ID
|
||||||
|
extern std::map<uint32_t, std::deque<playListEntries> > listEntries;
|
||||||
|
|
||||||
|
class SegmentDownloader{
|
||||||
|
public:
|
||||||
|
SegmentDownloader();
|
||||||
|
HTTP::Downloader segDL;
|
||||||
|
const char *packetPtr;
|
||||||
|
bool loadSegment(const playListEntries & entry);
|
||||||
|
bool atEnd() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
class Playlist{
|
class Playlist{
|
||||||
public:
|
public:
|
||||||
Playlist(const std::string &uriSrc = "");
|
Playlist(const std::string &uriSrc = "");
|
||||||
bool atEnd() const;
|
|
||||||
bool isUrl() const;
|
bool isUrl() const;
|
||||||
bool reload();
|
bool reload();
|
||||||
void addEntry(const std::string &filename, float duration, uint64_t &totalBytes);
|
void addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &keyIV);
|
||||||
bool loadSegment(const HTTP::URL &uri);
|
|
||||||
bool isSupportedFile(const std::string filename);
|
bool isSupportedFile(const std::string filename);
|
||||||
|
|
||||||
std::string uri; // link to the current playlistfile
|
std::string uri; // link to the current playlistfile
|
||||||
HTTP::URL root;
|
HTTP::URL root;
|
||||||
|
|
||||||
HTTP::Downloader segDL;
|
|
||||||
HTTP::Downloader plsDL;
|
HTTP::Downloader plsDL;
|
||||||
|
|
||||||
const char *packetPtr;
|
|
||||||
uint64_t reloadNext;
|
uint64_t reloadNext;
|
||||||
|
|
||||||
uint32_t id;
|
uint32_t id;
|
||||||
|
@ -52,16 +66,13 @@ namespace Mist{
|
||||||
|
|
||||||
int waitTime;
|
int waitTime;
|
||||||
PlaylistType playlistType;
|
PlaylistType playlistType;
|
||||||
std::deque<playListEntries> entries;
|
|
||||||
unsigned int lastTimestamp;
|
unsigned int lastTimestamp;
|
||||||
unsigned int startTime;
|
unsigned int startTime;
|
||||||
|
char keyAES[16];
|
||||||
|
std::map<std::string, std::string> keys;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct entryBuffer{
|
void playlistRunner(void * ptr);
|
||||||
int timestamp;
|
|
||||||
playListEntries entry;
|
|
||||||
int playlistIndex;
|
|
||||||
};
|
|
||||||
|
|
||||||
class inputHLS : public Input{
|
class inputHLS : public Input{
|
||||||
public:
|
public:
|
||||||
|
@ -70,34 +81,26 @@ namespace Mist{
|
||||||
bool needsLock();
|
bool needsLock();
|
||||||
bool openStreamSource();
|
bool openStreamSource();
|
||||||
bool callback();
|
bool callback();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// Private Functions
|
|
||||||
|
|
||||||
unsigned int startTime;
|
unsigned int startTime;
|
||||||
PlaylistType playlistType;
|
PlaylistType playlistType;
|
||||||
|
SegmentDownloader segDowner;
|
||||||
int version;
|
int version;
|
||||||
int targetDuration;
|
int targetDuration;
|
||||||
bool endPlaylist;
|
bool endPlaylist;
|
||||||
int currentPlaylist;
|
int currentPlaylist;
|
||||||
|
|
||||||
// std::vector<playListEntries> entries;
|
|
||||||
std::vector<Playlist> playlists;
|
|
||||||
// std::vector<int> pidMapping;
|
|
||||||
std::map<uint64_t, uint64_t> pidMapping;
|
std::map<uint64_t, uint64_t> pidMapping;
|
||||||
std::map<uint64_t, uint64_t> pidMappingR;
|
std::map<uint64_t, uint64_t> pidMappingR;
|
||||||
|
|
||||||
int currentIndex;
|
int currentIndex;
|
||||||
std::string currentFile;
|
std::string currentFile;
|
||||||
std::ifstream in;
|
|
||||||
|
|
||||||
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
|
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
|
||||||
|
|
||||||
Socket::Connection conn;
|
Socket::Connection conn;
|
||||||
TS::Packet tsBuf;
|
TS::Packet tsBuf;
|
||||||
|
|
||||||
int getFirstPlaylistToReload();
|
|
||||||
|
|
||||||
int firstSegment();
|
int firstSegment();
|
||||||
void waitForNextSegment();
|
void waitForNextSegment();
|
||||||
void readPMT();
|
void readPMT();
|
||||||
|
@ -112,8 +115,8 @@ namespace Mist{
|
||||||
FILE *tsFile;
|
FILE *tsFile;
|
||||||
|
|
||||||
bool readIndex();
|
bool readIndex();
|
||||||
bool initPlaylist(const std::string &uri);
|
bool initPlaylist(const std::string &uri, bool fullInit = true);
|
||||||
bool readPlaylist(const HTTP::URL &uri);
|
bool readPlaylist(const HTTP::URL &uri, bool fullInit = true);
|
||||||
bool readNextFile();
|
bool readNextFile();
|
||||||
|
|
||||||
void parseStreamHeader();
|
void parseStreamHeader();
|
||||||
|
|
Loading…
Add table
Reference in a new issue