HLS: support for handling and syncing on ISO8601 timestamps in input

This commit is contained in:
Thulinma 2019-11-16 19:07:16 +01:00
parent 384afb6508
commit d58e860a2c
2 changed files with 260 additions and 145 deletions

View file

@ -1,4 +1,5 @@
#include "input_hls.h"
#include "mbedtls/aes.h"
#include <algorithm>
#include <cerrno>
#include <cstdio>
@ -19,10 +20,73 @@
#include <string>
#include <sys/stat.h>
#include <vector>
#include "mbedtls/aes.h"
#define SEM_TS_CLAIM "/MstTSIN%s"
static uint64_t ISO8601toUnixmillis(const std::string &ts){
// Format examples:
// 2019-12-05T09:41:16.765000+00:00
// 2019-12-05T09:41:16.765Z
uint64_t unxTime = 0;
const size_t T = ts.find('T');
if (T == std::string::npos){
ERROR_MSG("Timestamp is date-only (no time marker): %s", ts.c_str());
return 0;
}
const size_t Z = ts.find_first_of("Z+-", T);
const std::string date = ts.substr(0, T);
const std::string time = ts.substr(T + 1, Z - T - 1);
const std::string zone = ts.substr(Z);
unsigned long year, month, day;
if (sscanf(date.c_str(), "%lu-%lu-%lu", &year, &month, &day) != 3){
ERROR_MSG("Could not parse date: %s", date.c_str());
return 0;
}
unsigned int hour, minute;
double seconds;
if (sscanf(time.c_str(), "%u:%d:%lf", &hour, &minute, &seconds) != 3){
ERROR_MSG("Could not parse time: %s", time.c_str());
return 0;
}
// Fill the tm struct with the values we just read.
// We're ignoring time zone for now, and forcing seconds to zero since we add them in later with more precision
struct tm tParts;
tParts.tm_sec = 0;
tParts.tm_min = minute;
tParts.tm_hour = hour;
tParts.tm_mon = month - 1;
tParts.tm_year = year - 1900;
tParts.tm_mday = day;
tParts.tm_isdst = 0;
// convert to unix time, in seconds
unxTime = timegm(&tParts);
// convert to milliseconds
unxTime *= 1000;
// finally add the seconds (and milliseconds)
unxTime += (seconds * 1000);
// Now, adjust for time zone if needed
if (zone.size() && zone[0] != 'Z'){
bool sign = (zone[0] == '+');
{
unsigned long hrs, mins;
if (sscanf(zone.c_str() + 1, "%lu:%lu", &hrs, &mins) == 2){
unxTime += mins * 60000 + hrs * 3600000;
}else if (sscanf(zone.c_str() + 1, "%lu", &hrs) == 1){
if (hrs > 100){
unxTime += (hrs % 100) * 60000 + ((uint64_t)(hrs / 100)) * 3600000;
}else{
unxTime += hrs * 3600000;
}
}else{
WARN_MSG("Could not parse time zone '%s'; assuming UTC", zone.c_str());
}
}
}
DONTEVEN_MSG("Time '%s' = %" PRIu64, ts.c_str(), unxTime);
return unxTime;
}
namespace Mist{
/// Mutex for accesses to listEntries
@ -81,7 +145,7 @@ namespace Mist{
pls.reload();
}
}
INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str());
MEDIUM_MSG("Downloader thread for '%s' exiting", pls.uri.c_str());
}
SegmentDownloader::SegmentDownloader(){
@ -91,6 +155,7 @@ namespace Mist{
}
Playlist::Playlist(const std::string &uriSrc){
nextUTC = 0;
id = 0; // to be set later
INFO_MSG("Adding variant playlist: %s", uriSrc.c_str());
plsDL.dataTimeout = 15;
@ -100,8 +165,8 @@ namespace Mist{
playlistEnd = false;
noChangeCount = 0;
lastTimestamp = 0;
uri = uriSrc;
root = HTTP::URL(uri);
root = HTTP::URL(uriSrc);
uri = root.getUrl();
memset(keyAES, 0, 16);
startTime = Util::bootSecs();
reloadNext = 0;
@ -121,23 +186,21 @@ namespace Mist{
d[i] = d[15 - i];
d[15 - i] = tmp;
}
}
static std::string printhex(const char * data, size_t len)
{
static std::string printhex(const char *data, size_t len){
static const char *const lut = "0123456789ABCDEF";
std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i)
{
for (size_t i = 0; i < len; ++i){
const unsigned char c = data[i];
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
}
return output;
}
/// Returns true if packetPtr is at the end of the current segment.
bool SegmentDownloader::atEnd() const{
return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
@ -151,7 +214,8 @@ static std::string printhex(const char * data, size_t len)
std::string hexKey = printhex(entry.keyAES, 16);
std::string hexIvec = printhex(entry.ivec, 16);
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), hexIvec.c_str());
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(),
hexIvec.c_str());
if (!segDL.get(entry.filename)){
FAIL_MSG("failed download: %s", entry.filename.c_str());
return false;
@ -172,9 +236,9 @@ static std::string printhex(const char * data, size_t len)
}
// If we have a non-null key, decrypt
if (entry.keyAES[ 0] != 0 || entry.keyAES[ 1] != 0 || entry.keyAES[ 2] != 0 || entry.keyAES[ 3] != 0 || \
entry.keyAES[ 4] != 0 || entry.keyAES[ 5] != 0 || entry.keyAES[ 6] != 0 || entry.keyAES[ 7] != 0 || \
entry.keyAES[ 8] != 0 || entry.keyAES[ 9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || \
if (entry.keyAES[0] != 0 || entry.keyAES[1] != 0 || entry.keyAES[2] != 0 || entry.keyAES[3] != 0 ||
entry.keyAES[4] != 0 || entry.keyAES[5] != 0 || entry.keyAES[6] != 0 || entry.keyAES[7] != 0 ||
entry.keyAES[8] != 0 || entry.keyAES[9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 ||
entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){
// Setup AES context
mbedtls_aes_context aes;
@ -187,8 +251,8 @@ static std::string printhex(const char * data, size_t len)
unsigned char tmpIvec[16];
memcpy(tmpIvec, entry.ivec, 16);
mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec, (const unsigned char*)segDL.data().data(), (unsigned char*)(char*)outdata);
mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, segDL.data().size(), tmpIvec,
(const unsigned char *)segDL.data().data(), (unsigned char *)(char *)outdata);
// Data is now still padded, the padding consists of X bytes of padding, all containing the raw value X.
// Since padding is mandatory, we can simply read the last byte and remove X bytes from the length.
if (segDL.data().size() <= outdata[segDL.data().size() - 1]){
@ -223,11 +287,11 @@ static std::string printhex(const char * data, size_t len)
return true;
}
/// Handles both initial load and future reloads.
/// Returns how many segments were added to the internal segment list.
bool Playlist::reload(){
uint64_t fileNo = 0;
nextUTC = 0; // Make sure we don't use old timestamps
std::string line;
std::string key;
std::string val;
@ -298,6 +362,7 @@ static std::string printhex(const char * data, size_t len)
}
if (key == "MEDIA-SEQUENCE"){fileNo = atoll(val.c_str());}
if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);}
if (key == "PLAYLIST-TYPE"){
if (val == "VOD"){
@ -335,6 +400,7 @@ static std::string printhex(const char * data, size_t len)
lastFileIndex = fileNo + 1;
++count;
}
nextUTC = 0;
++fileNo;
}
@ -370,18 +436,19 @@ static std::string printhex(const char * data, size_t len)
}
/// Adds playlist segments to be processed
void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &iv){
if (!isSupportedFile(filename)){
WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
return;
}
void Playlist::addEntry(const std::string &filename, float duration, uint64_t &totalBytes,
const std::string &key, const std::string &iv){
// if (!isSupportedFile(filename)){
// WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
// return;
//}
playListEntries entry;
entry.filename = filename;
cleanLine(entry.filename);
entry.bytePos = totalBytes;
entry.duration = duration;
entry.mUTC = nextUTC;
if (key.size() && iv.size()){
memcpy(entry.ivec, iv.data(), 16);
@ -391,8 +458,6 @@ static std::string printhex(const char * data, size_t len)
memset(entry.keyAES, 0, 16);
}
if (!isUrl()){
std::ifstream fileSource;
std::string test = root.link(entry.filename).getFilePath();
@ -410,12 +475,14 @@ static std::string printhex(const char * data, size_t len)
// The mutex assures we have a unique count/number.
if (!id){id = listEntries.size() + 1;}
listEntries[id].push_back(entry);
MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %d queued): %s", id, lastFileIndex, listEntries[id].size(), filename.c_str());
MEDIUM_MSG("Added segment to variant %" PRIu32 " (#%d, now %d queued): %s", id, lastFileIndex,
listEntries[id].size(), filename.c_str());
}
}
/// Constructor of HLS Input
inputHLS::inputHLS(Util::Config *cfg) : Input(cfg){
zUTC = nUTC = 0;
self = this;
streamIsLive = false;
globalWaitTime = 0;
@ -452,7 +519,9 @@ static std::string printhex(const char * data, size_t len)
config->is_active = true;
if (config->getString("input") == "-"){return false;}
HTTP::URL mainPls(config->getString("input"));
if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){return false;}
if (mainPls.getExt().substr(0, 3) != "m3u" && mainPls.protocol.find("hls") == std::string::npos){
return false;
}
if (!initPlaylist(config->getString("input"), false)){return false;}
return true;
}
@ -489,15 +558,15 @@ static std::string printhex(const char * data, size_t len)
bool keepReading = false;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end();
pListIt++){
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
// Skip empty playlists
if (!pListIt->second.size()){continue;}
int preCounter = counter;
tsStream.clear();
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end(); ++entryIt){
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
entryIt != pListIt->second.end(); ++entryIt){
uint64_t lastBpos = entryIt->bytePos;
nProxy.userClient.keepAlive();
if (!segDowner.loadSegment(*entryIt)){
@ -526,7 +595,8 @@ static std::string printhex(const char * data, size_t len)
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
packetId = counter;
VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(), headerPack.getTrackId(), counter);
VERYHIGH_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(),
headerPack.getTrackId(), counter);
counter++;
}
@ -575,8 +645,8 @@ static std::string printhex(const char * data, size_t len)
size_t dataLen;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end();
pListIt++){
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
tsStream.clear();
uint32_t entId = 0;
@ -615,8 +685,7 @@ static std::string printhex(const char * data, size_t len)
counter++;
}
if (!hasHeader &&
(!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){
if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){
tsStream.initializeMetadata(myMeta, tmpTrackId, packetId);
}
@ -625,10 +694,8 @@ static std::string printhex(const char * data, size_t len)
uint64_t pBPos = headerPack.getInt("bpos");
// keyframe data exists, so always add 19 bytes keyframedata.
long long packOffset =
headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
long long packSendSize =
24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
myMeta.update(headerPack.getTime(), packOffset, packetId, dataLen, entId,
headerPack.hasMember("keyframe"), packSendSize);
}
@ -657,14 +724,12 @@ static std::string printhex(const char * data, size_t len)
pidMapping[(((uint64_t)pListIt->first) << 32) + headerPack.getTrackId()] = counter;
pidMappingR[counter] = (((uint64_t)pListIt->first) << 32) + headerPack.getTrackId();
packetId = counter;
INFO_MSG("Added file %s, trackid: %d, mapped to: %d",
entryIt->filename.c_str(),
INFO_MSG("Added file %s, trackid: %d, mapped to: %d", entryIt->filename.c_str(),
headerPack.getTrackId(), counter);
counter++;
}
if (!hasHeader &&
(!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){
if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())){
tsStream.initializeMetadata(myMeta, tmpTrackId, packetId);
}
@ -674,8 +739,7 @@ static std::string printhex(const char * data, size_t len)
// keyframe data exists, so always add 19 bytes keyframedata.
long long packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
long long packSendSize =
24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
long long packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
myMeta.update(headerPack.getTime(), packOffset, packetId, dataLen, entId,
headerPack.hasMember("keyframe"), packSendSize);
}
@ -699,9 +763,7 @@ static std::string printhex(const char * data, size_t len)
return true;
}
bool inputHLS::needsLock(){
return !streamIsLive;
}
bool inputHLS::needsLock(){return !streamIsLive;}
bool inputHLS::openStreamSource(){return true;}
@ -727,16 +789,63 @@ static std::string printhex(const char * data, size_t len)
if (myMeta.live){
tsStream.getEarliestPacket(thisPacket);
tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId());
if (!tid){
INFO_MSG("Track %" PRIu64 " on PLS %u -> %" PRIu32, thisPacket.getTrackId(), currentPlaylist, tid);
continue;
}
}else{
tsStream.getPacket(getMappedTrackId(tid), thisPacket);
}
if (!thisPacket){
FAIL_MSG("Could not getNext TS packet!");
return;
}
uint64_t newTime = thisPacket.getTime();
// Apply offset if any was set
if (plsTimeOffset.count(currentPlaylist)){newTime += plsTimeOffset[currentPlaylist];}
if (zUTC){
//UTC based timestamp offsets
if (allowRemap && nUTC){
allowRemap = false;
int64_t prevOffset = plsTimeOffset[currentPlaylist];
plsTimeOffset[currentPlaylist] = (nUTC - zUTC) - thisPacket.getTime();
newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist];
INFO_MSG("[UTC; New offset: %" PRId64 " -> %" PRId64 "] Packet %lu@%" PRIu64
"ms -> %" PRIu64 "ms",
prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime);
}
}else{
DONTEVEN_MSG("Packet track %lu @ time %" PRIu64 " ms", tid, thisPacket.getTime());
//Non-UTC based
if (plsLastTime.count(currentPlaylist)){
if (plsInterval.count(currentPlaylist)){
if (allowRemap && (newTime < plsLastTime[currentPlaylist] || newTime > plsLastTime[currentPlaylist] + plsInterval[currentPlaylist] * 60)){
allowRemap = false;
// time difference too great, change offset to correct for it
int64_t prevOffset = plsTimeOffset[currentPlaylist];
plsTimeOffset[currentPlaylist] += (int64_t)(plsLastTime[currentPlaylist] + plsInterval[currentPlaylist]) - (int64_t)newTime;
newTime = thisPacket.getTime() + plsTimeOffset[currentPlaylist];
INFO_MSG("[Guess; New offset: %" PRId64 " -> %" PRId64 "] Packet %lu@%" PRIu64
"ms -> %" PRIu64 "ms",
prevOffset, plsTimeOffset[currentPlaylist], tid, thisPacket.getTime(), newTime);
}
}
// check if time increased, and no increase yet or is less than current, set new interval
if (newTime > plsLastTime[currentPlaylist] &&
(!plsInterval.count(currentPlaylist) || newTime - plsLastTime[currentPlaylist] < plsInterval[currentPlaylist])){
plsInterval[currentPlaylist] = newTime - plsLastTime[currentPlaylist];
}
}
// store last time for interval/offset calculations
plsLastTime[tid] = newTime;
}
DONTEVEN_MSG("Packet %lu@%" PRIu64 "ms -> %ms" PRIu64, tid, thisPacket.getTime(), newTime);
// overwrite trackId on success
Bit::htobl(thisPacket.getData() + 8, tid);
}
Bit::htobll(thisPacket.getData() + 12, newTime);
return; // Success!
}
@ -790,8 +899,7 @@ static std::string printhex(const char * data, size_t len)
const char *tmpPtr = segDowner.segDL.data().data();
while (!tsStream.hasPacketOnEachTrack() &&
(tmpPtr - segDowner.segDL.data().data() + 188 <=
segDowner.segDL.data().size())){
(tmpPtr - segDowner.segDL.data().data() + 188 <= segDowner.segDL.data().size())){
tsBuffer.FromPointer(tmpPtr);
tsStream.parse(tsBuffer, 0);
tmpPtr += 188;
@ -805,8 +913,7 @@ static std::string printhex(const char * data, size_t len)
int trackId = 0;
unsigned long plistEntry = 0xFFFFFFFFull;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end();
it++){
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
unsigned long thisBPos = 0;
for (std::deque<DTSC::Key>::iterator keyIt = myMeta.tracks[*it].keys.begin();
keyIt != myMeta.tracks[*it].keys.end(); keyIt++){
@ -990,7 +1097,7 @@ static std::string printhex(const char * data, size_t len)
}else if (line.compare(0, 7, "#EXTINF") == 0){
// current file is not a variant playlist, but regular playlist.
ret = readPlaylist(uri, fullInit);
ret = readPlaylist(playlistRootPath.getUrl(), fullInit);
break;
}else{
// ignore wrong lines
@ -1024,12 +1131,8 @@ static std::string printhex(const char * data, size_t len)
tthread::thread runList(playlistRunner, (void *)urlBuffer.data());
runList.detach(); // Abandon the thread, it's now running independently
uint32_t timeout = 0;
while (urlBuffer.data()[0] && ++timeout < 100){
Util::sleep(100);
}
if (timeout >= 100){
WARN_MSG("Thread start timed out for: %s", urlBuffer.c_str());
}
while (urlBuffer.data()[0] && ++timeout < 100){Util::sleep(100);}
if (timeout >= 100){WARN_MSG("Thread start timed out for: %s", urlBuffer.c_str());}
return true;
}
@ -1055,6 +1158,10 @@ static std::string printhex(const char * data, size_t len)
ERROR_MSG("Could not download segment: %s", ntry.filename.c_str());
return readNextFile(); // Attempt to read another, if possible.
}
nUTC = ntry.mUTC;
//If we don't have a zero-time yet, guess an hour before this UTC time is probably fine
if (nUTC && !zUTC){zUTC = nUTC - 3600000;}
allowRemap = true;
return true;
}
@ -1069,8 +1176,8 @@ static std::string printhex(const char * data, size_t len)
int segCount = 0;
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end();
pListIt++){
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
pListIt != listEntries.end(); pListIt++){
segCount += pListIt->second.size();
if (pListIt->second.size()){
if (pListIt->second.front().timestamp < firstTimeStamp || tmpId < 0){
@ -1084,4 +1191,3 @@ static std::string printhex(const char * data, size_t len)
}
}// namespace Mist

View file

@ -25,6 +25,7 @@ namespace Mist{
struct playListEntries{
std::string filename;
uint64_t bytePos;
uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known
float duration;
unsigned int timestamp;
unsigned int wait;
@ -49,7 +50,8 @@ namespace Mist{
Playlist(const std::string &uriSrc = "");
bool isUrl() const;
bool reload();
void addEntry(const std::string &filename, float duration, uint64_t &totalBytes, const std::string &key, const std::string &keyIV);
void addEntry(const std::string &filename, float duration, uint64_t &totalBytes,
const std::string &key, const std::string &keyIV);
bool isSupportedFile(const std::string filename);
std::string uri; // link to the current playlistfile
@ -68,6 +70,7 @@ namespace Mist{
PlaylistType playlistType;
unsigned int lastTimestamp;
unsigned int startTime;
uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist
char keyAES[16];
std::map<std::string, std::string> keys;
};
@ -81,7 +84,10 @@ namespace Mist{
bool needsLock();
bool openStreamSource();
bool callback();
protected:
uint64_t zUTC; ///< Zero point in local millis, as UTC unix time millis
uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis
unsigned int startTime;
PlaylistType playlistType;
SegmentDownloader segDowner;
@ -90,8 +96,12 @@ namespace Mist{
bool endPlaylist;
int currentPlaylist;
bool allowRemap; ///< True if the next packet may remap the timestamps
std::map<uint64_t, uint64_t> pidMapping;
std::map<uint64_t, uint64_t> pidMappingR;
std::map<int, int64_t> plsTimeOffset;
std::map<int, uint64_t> plsLastTime;
std::map<int, uint64_t> plsInterval;
int currentIndex;
std::string currentFile;
@ -129,4 +139,3 @@ namespace Mist{
}// namespace Mist
typedef Mist::inputHLS mistIn;