Support for byte ranges in HLS input

This commit is contained in:
Thulinma 2023-11-28 22:02:57 +01:00
parent 7a3fd0c280
commit 6a3ae16b2d
2 changed files with 122 additions and 56 deletions

View file

@ -112,10 +112,10 @@ namespace Mist{
static std::map<uint64_t, Playlist> playlistMapping;
/// Local RAM buffer for recently accessed segments
std::map<std::string, Util::ResizeablePointer> segBufs;
std::map<playListEntries, Util::ResizeablePointer> segBufs;
/// Order of adding/accessing for local RAM buffer of segments
std::deque<std::string> segBufAccs;
std::deque<playListEntries> segBufAccs;
/// Order of adding/accessing sizes for local RAM buffer of segments
std::deque<size_t> segBufSize;
@ -264,6 +264,7 @@ namespace Mist{
bool SegmentDownloader::atEnd() const{
if (!isOpen || !currBuf){return true;}
if (buffered){return currBuf->size() <= offset + 188;}
if (stopAtByte && (stopAtByte - startAtByte) <= offset + 188){return true;}
return !segDL && currBuf->size() <= offset + 188;
// return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
}
@ -334,6 +335,7 @@ namespace Mist{
}else{
if (!currBuf){return false;}
size_t retries = 0;
if (stopAtByte && (stopAtByte - startAtByte) <= currBuf->size()){return false;}
while (segDL && currBuf->size() < offset + 188 + 188){
size_t preSize = currBuf->size();
segDL.readSome(offset + 188 + 188 - currBuf->size(), *this);
@ -348,7 +350,7 @@ namespace Mist{
segDL.close();
return false;
}
segDL.seek(currBuf->size());
segDL.seek(startAtByte+currBuf->size());
}
}
if (currBuf->size() <= preSize){
@ -387,7 +389,7 @@ namespace Mist{
segBufTotalSize += segBufSize.front();
}
size_t SegmentDownloader::getDataCallbackPos() const{return currBuf->size();}
size_t SegmentDownloader::getDataCallbackPos() const{return startAtByte+currBuf->size();}
/// Attempts to read a single TS packet from the current segment, setting packetPtr on success
void SegmentDownloader::close(){
@ -401,33 +403,41 @@ namespace Mist{
std::string hexKey = printhex(entry.keyAES, 16);
std::string hexIvec = printhex(entry.ivec, 16);
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(),
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.shortName().c_str(), hexKey.c_str(),
hexIvec.c_str());
startAtByte = entry.startAtByte;
stopAtByte = entry.stopAtByte;
offset = 0;
firstPacket = true;
buffered = segBufs.count(entry.filename);
buffered = segBufs.count(entry);
if (!buffered){
HIGH_MSG("Reading non-cache: %s", entry.filename.c_str());
HIGH_MSG("Reading non-cache: %s", entry.shortName().c_str());
if (!segDL.open(entry.filename)){
FAIL_MSG("Could not open %s", entry.filename.c_str());
FAIL_MSG("Could not open %s", entry.shortName().c_str());
return false;
}
if (!segDL){return false;}
//Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times)
while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){
HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().c_str());
HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().shortName().c_str());
segBufs.erase(segBufAccs.back());
segBufTotalSize -= segBufSize.back();
segBufAccs.pop_back();
segBufSize.pop_back();
}
segBufAccs.push_front(entry.filename);
segBufAccs.push_front(entry);
segBufSize.push_front(0);
currBuf = &(segBufs[entry.filename]);
currBuf = &(segBufs[entry]);
// Non-seekable case is handled further down
if (segDL.isSeekable() && startAtByte){
//Seek to startAtByte position, since it's not the beginning of the file
MEDIUM_MSG("Seeking to %zu", startAtByte);
segDL.seek(startAtByte);
}
}else{
HIGH_MSG("Reading from segment cache: %s", entry.filename.c_str());
currBuf = &(segBufs[entry.filename]);
HIGH_MSG("Reading from segment cache: %s", entry.shortName().c_str());
currBuf = &(segBufs[entry]);
if (currBuf->rsize() != currBuf->size()){
MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize());
buffered = false;
@ -436,23 +446,31 @@ namespace Mist{
HTTP::URL B = HTTP::localURIResolver().link(entry.filename);
if (A != B){
if (!segDL.open(entry.filename)){
FAIL_MSG("Could not open %s", entry.filename.c_str());
FAIL_MSG("Could not open %s", entry.shortName().c_str());
return false;
}
if (!segDL){return false;}
//Seek to current position in segment for resuming
currBuf->truncate(currBuf->size() / 188 * 188);
MEDIUM_MSG("Seeking to %zu", currBuf->size());
segDL.seek(currBuf->size());
MEDIUM_MSG("Seeking to %zu", currBuf->size()+startAtByte);
segDL.seek(currBuf->size()+startAtByte);
}
}
}
if (!buffered){
// Allocate full size if known
if (segDL.getSize() != std::string::npos){currBuf->allocate(segDL.getSize());}
if (stopAtByte || segDL.getSize() != std::string::npos){currBuf->allocate(stopAtByte?(stopAtByte - startAtByte):segDL.getSize());}
// Download full segment if not seekable, pretend it was cached all along
if (!segDL.isSeekable()){
segDL.readAll(*this);
if (startAtByte || stopAtByte){
WARN_MSG("Wasting data: downloaded whole segment due to unavailability of range requests, but caching only part of it");
if (startAtByte){currBuf->shift(startAtByte);}
if (stopAtByte){currBuf->truncate(stopAtByte - startAtByte);}
//Overwrite the current segment size
segBufTotalSize -= segBufSize.front();
segBufSize.front() = currBuf->size();
segBufTotalSize += segBufSize.front();
}
buffered = true;
}
}
@ -487,6 +505,8 @@ namespace Mist{
std::string line;
std::string key;
std::string val;
float segDur = 0.0;
uint64_t startByte = std::string::npos, lenByte = 0;
std::string keyMethod;
std::string keyUri;
@ -527,6 +547,10 @@ namespace Mist{
cleanLine(line);
if (line.empty()){continue;}// skip empty lines
if (line.compare(0, 7, "#EXTINF") == 0){
segDur = atof(line.c_str() + 8);
continue;
}
if (line.compare(0, 7, "#EXT-X-") == 0){
size_t pos = line.find(":");
key = line.substr(7, pos - 7);
@ -560,11 +584,26 @@ namespace Mist{
}
keys.insert(std::pair<std::string, std::string>(keyUri, std::string(keyPtr, keyLen)));
}
continue;
}
if (key == "BYTERANGE"){
size_t atSign = val.find('@');
if (atSign != std::string::npos){
std::string len = val.substr(0, atSign);
std::string pos = val.substr(atSign+1);
lenByte = atoll(len.c_str());
startByte = atoll(pos.c_str());
}else{
lenByte = atoll(val.c_str());
}
continue;
}
if (key == "TARGETDURATION"){
waitTime = atoi(val.c_str()) / 2;
if (waitTime < 2){waitTime = 2;}
continue;
}
// Assuming this always comes before any segment
@ -572,47 +611,45 @@ namespace Mist{
// Reinit the segment counter
firstIndex = atoll(val.c_str());
bposCounter = firstIndex + 1;
continue;
}
if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);}
if (key == "PROGRAM-DATE-TIME"){
nextUTC = ISO8601toUnixmillis(val);
continue;
}
if (key == "PLAYLIST-TYPE"){
if (val == "VOD"){
streamIsVOD = true;
streamIsLive = false;
INFO_MSG("SIL=F");
}else if (val == "LIVE"){
streamIsVOD = false;
streamIsLive = true;
INFO_MSG("SIL=T");
}else if (val == "EVENT"){
streamIsVOD = true;
streamIsLive = true;
INFO_MSG("SIL=T");
}
continue;
}
// Once we see this tag, the entire playlist becomes VOD
if (key == "ENDLIST"){
streamIsLive = false;
INFO_MSG("SIL=F");
streamIsVOD = true;
continue;
}
VERYHIGH_MSG("ignoring line: %s.", line.c_str());
continue;
}
if (line.compare(0, 7, "#EXTINF") != 0){
if (line[0] == '#'){
VERYHIGH_MSG("ignoring line: %s.", line.c_str());
continue;
}
float f = atof(line.c_str() + 8);
std::string filename;
std::getline(input, filename);
// check for already added segments
DONTEVEN_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment);
if (bposCounter > lastSegment){
cleanLine(filename);
char ivec[16];
if (keyIV.size()){
parseKey(keyIV, ivec, 16);
@ -620,11 +657,14 @@ namespace Mist{
memset(ivec, 0, 16);
Bit::htobll(ivec + 8, bposCounter);
}
addEntry(root.link(filename).getUrl(), filename, f, bposCounter, keys[keyUri], std::string(ivec, 16));
addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), startByte, lenByte);
lastSegment = bposCounter;
++count;
}
nextUTC = 0;
segDur = 0.0;
startByte = std::string::npos;
lenByte = 0;
++bposCounter;
}
@ -639,30 +679,9 @@ namespace Mist{
return (count > 0);
}
bool Playlist::isSupportedFile(const std::string filename){
// only ts files
if (filename.find_last_of(".") != std::string::npos){
std::string ext = filename.substr(filename.find_last_of(".") + 1);
if (ext.compare(0, 2, "ts") == 0){
return true;
}else{
DEBUG_MSG(DLVL_HIGH, "Not supported extension: %s", ext.c_str());
return false;
}
}
// No extension. We assume it's fine.
return true;
}
/// Adds playlist segments to be processed
void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &bpos,
const std::string &key, const std::string &iv){
// if (!isSupportedFile(filename)){
// WARN_MSG("Ignoring unsupported file: %s", filename.c_str());
// return;
//}
const std::string &key, const std::string &iv, uint64_t startByte, uint64_t lenByte){
playListEntries entry;
entry.filename = absolute_filename;
entry.relative_filename = filename;
@ -687,8 +706,24 @@ namespace Mist{
if (!nextUTC && prev.mUTC){
nextUTC = prev.mUTC + (uint64_t)(prev.duration * 1000);
}
// If startByte unknown and we have a length, calculate it from previous entry
if (startByte == std::string::npos && lenByte){
if (filename == prev.relative_filename){startByte = prev.stopAtByte;}
}
}else{
// If startByte unknown and we have a length, set to zero
if (startByte == std::string::npos && lenByte){startByte = 0;}
}
}
if ((lenByte && startByte == std::string::npos) || (!lenByte && startByte != std::string::npos)){
WARN_MSG("Invalid byte range entry for segment: %s", filename.c_str());
lenByte = 0;
startByte = std::string::npos;
}
if (lenByte){
entry.startAtByte = startByte;
entry.stopAtByte = startByte + lenByte;
}
entry.mUTC = nextUTC;
if (nextUTC && !oUTC){
@ -716,7 +751,11 @@ namespace Mist{
// Note: This method requires never removing playlists, only adding.
// The mutex assures we have a unique count/number.
if (!id){id = listEntries.size() + 1;}
HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id);
if (entry.startAtByte){
HIGH_MSG("Adding entry '%s' (%" PRIu64 "-%" PRIu64 ") to ID %u", filename.c_str(), entry.startAtByte, entry.stopAtByte, id);
}else{
HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id);
}
playlist_urls[JSON::Value(id).asString()] = relurl;
listEntries[id].push_back(entry);
}
@ -867,6 +906,10 @@ namespace Mist{
memset(newEntry.ivec, 0, 16);
memset(newEntry.keyAES, 0, 16);
}
if (thisEntry.size() >= 11){
newEntry.startAtByte = thisEntry[9u].asInt();
newEntry.stopAtByte = thisEntry[10u].asInt();
}
newList.push_back(newEntry);
}
listEntries[plNum] = newList;
@ -1059,6 +1102,10 @@ namespace Mist{
thisEntries.append(entryIt->wait);
thisEntries.append(entryIt->ivec);
thisEntries.append(entryIt->keyAES);
if (entryIt->startAtByte || entryIt->stopAtByte){
thisEntries.append(entryIt->startAtByte);
thisEntries.append(entryIt->stopAtByte);
}
thisPlaylist.append(thisEntries);
}
allEntries[JSON::Value(pListIt->first).asString()] = thisPlaylist;
@ -1119,7 +1166,7 @@ namespace Mist{
if (!hasOffset && curList.at(segmentIndex).mUTC){
hasOffset = true;
DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime;
INFO_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]);
MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]);
curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist];
}
if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){

View file

@ -21,6 +21,8 @@ namespace Mist{
struct playListEntries{
std::string filename;
std::string relative_filename;
uint64_t startAtByte; ///< Byte position inside filename where to start reading
uint64_t stopAtByte; ///< Byte position inside filename where to stop sending
uint64_t bytePos;
uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known
float duration; ///< Duration of entry in seconds
@ -36,13 +38,29 @@ namespace Mist{
timestamp = 0;
timeOffset = 0;
wait = 0;
startAtByte = 0;
stopAtByte = 0;
for (size_t i = 0; i < 16; ++i){
ivec[i] = 0;
keyAES[i] = 0;
}
}
std::string shortName() const{
if (!startAtByte && !stopAtByte){return filename;}
std::string ret = filename;
ret += " (";
ret += JSON::Value(startAtByte).asString();
ret += "-";
ret += JSON::Value(stopAtByte).asString();
ret += ")";
return ret;
}
};
inline bool operator< (const playListEntries a, const playListEntries b){
return a.filename < b.filename || (a.filename == b.filename && a.startAtByte < b.startAtByte);
}
/// Keeps the segment entry list by playlist ID
extern std::map<uint32_t, std::deque<playListEntries> > listEntries;
@ -59,6 +77,8 @@ namespace Mist{
bool atEnd() const;
private:
uint64_t startAtByte;
uint64_t stopAtByte;
bool encrypted;
bool buffered;
size_t offset;
@ -79,8 +99,7 @@ namespace Mist{
bool isUrl() const;
bool reload();
void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &bpos,
const std::string &key, const std::string &keyIV);
bool isSupportedFile(const std::string filename);
const std::string &key, const std::string &keyIV, uint64_t startByte, uint64_t lenByte);
std::string uri; // link to the current playlistfile
HTTP::URL root;