WIP fixes to HLS input

This commit is contained in:
Thulinma 2024-05-01 10:11:20 +02:00
parent 4dfb261283
commit e4a2da45d3
4 changed files with 227 additions and 249 deletions

View file

@ -288,19 +288,23 @@ namespace HTTP{
} }
void URIReader::readAll(size_t (*dataCallback)(const char *data, size_t len)){ void URIReader::readAll(size_t (*dataCallback)(const char *data, size_t len)){
while (!isEOF()){readSome(dataCallback, 419430);} while (!isEOF()){
if (!readSome(dataCallback, 419430)){Util::sleep(50);}
}
} }
/// Read all function, with use of callbacks /// Read all function, with use of callbacks
void URIReader::readAll(Util::DataCallback &cb){ void URIReader::readAll(Util::DataCallback &cb){
while (!isEOF()){readSome(1048576, cb);} while (!isEOF()){
if (!readSome(1048576, cb)){Util::sleep(50);}
}
} }
/// Read all blocking function, which internally uses the Nonblocking function. /// Read all blocking function, which internally uses the Nonblocking function.
void URIReader::readAll(char *&dataPtr, size_t &dataLen){ void URIReader::readAll(char *&dataPtr, size_t &dataLen){
if (getSize() != std::string::npos){allData.allocate(getSize());} if (getSize() != std::string::npos){allData.allocate(getSize());}
while (!isEOF()){ while (!isEOF()){
readSome(10046, *this); if (!readSome(10046, *this)){Util::sleep(50);}
bufPos = allData.size(); bufPos = allData.size();
} }
dataPtr = allData; dataPtr = allData;
@ -309,27 +313,29 @@ namespace HTTP{
void httpBodyCallback(const char *ptr, size_t size){INFO_MSG("callback");} void httpBodyCallback(const char *ptr, size_t size){INFO_MSG("callback");}
void URIReader::readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen){ size_t URIReader::readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen){
/// TODO: Implement /// TODO: Implement
return 0;
} }
// readsome with callback // readsome with callback
void URIReader::readSome(size_t wantedLen, Util::DataCallback &cb){ size_t URIReader::readSome(size_t wantedLen, Util::DataCallback &cb){
if (isEOF()){return;} if (isEOF()){return 0;}
// Files read from the memory-mapped file // Files read from the memory-mapped file
if (stateType == HTTP::File){ if (stateType == HTTP::File){
// Simple bounds check, don't read beyond the end of the file // Simple bounds check, don't read beyond the end of the file
uint64_t dataLen = ((wantedLen + curPos) > totalSize) ? totalSize - curPos : wantedLen; uint64_t dataLen = ((wantedLen + curPos) > totalSize) ? totalSize - curPos : wantedLen;
cb.dataCallback(mapped + curPos, dataLen); cb.dataCallback(mapped + curPos, dataLen);
curPos += dataLen; curPos += dataLen;
return; return dataLen;
} }
// HTTP-based read from the Downloader // HTTP-based read from the Downloader
if (stateType == HTTP::HTTP){ if (stateType == HTTP::HTTP){
// Note: this function returns true if the full read was completed only. // Note: this function returns true if the full read was completed only.
// It's the reason this function returns void rather than bool. // It's the reason this function returns void rather than bool.
size_t prev = cb.getDataCallbackPos();
downer.continueNonBlocking(cb); downer.continueNonBlocking(cb);
return; return cb.getDataCallbackPos() - prev;
} }
// Everything else uses the socket directly // Everything else uses the socket directly
int s = downer.getSocket().Received().bytes(wantedLen); int s = downer.getSocket().Received().bytes(wantedLen);
@ -339,7 +345,7 @@ namespace HTTP{
s = downer.getSocket().Received().bytes(wantedLen); s = downer.getSocket().Received().bytes(wantedLen);
}else{ }else{
Util::sleep(50); Util::sleep(50);
return; return s;
} }
} }
// Future optimization: augment the Socket::Buffer to handle a Util::DataCallback as argument. // Future optimization: augment the Socket::Buffer to handle a Util::DataCallback as argument.
@ -347,10 +353,11 @@ namespace HTTP{
Util::ResizeablePointer buf; Util::ResizeablePointer buf;
downer.getSocket().Received().remove(buf, s); downer.getSocket().Received().remove(buf, s);
cb.dataCallback(buf, s); cb.dataCallback(buf, s);
return s;
} }
/// Readsome blocking function. /// Readsome blocking function.
void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){ size_t URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){
// Clear the buffer if we're finished with it // Clear the buffer if we're finished with it
if (allData.size() && bufPos == allData.size()){ if (allData.size() && bufPos == allData.size()){
allData.truncate(0); allData.truncate(0);
@ -365,12 +372,13 @@ namespace HTTP{
dataPtr = allData + bufPos; dataPtr = allData + bufPos;
dataLen = wantedLen; dataLen = wantedLen;
bufPos += wantedLen; bufPos += wantedLen;
return; return wantedLen;
} }
// Ok, we have a short count. Return the amount we actually got. // Ok, we have a short count. Return the amount we actually got.
dataPtr = allData + bufPos; dataPtr = allData + bufPos;
dataLen = allData.size() - bufPos; dataLen = allData.size() - bufPos;
bufPos = allData.size(); bufPos = allData.size();
return dataLen;
} }
void URIReader::close(){ void URIReader::close(){

View file

@ -1,7 +1,6 @@
#pragma once #pragma once
#include "downloader.h" #include "downloader.h"
#include "util.h" #include "util.h"
#include <fstream>
namespace HTTP{ namespace HTTP{
enum URIType{Closed = 0, File, Stream, HTTP}; enum URIType{Closed = 0, File, Stream, HTTP};
@ -37,11 +36,11 @@ namespace HTTP{
void readAll(Util::DataCallback &cb); void readAll(Util::DataCallback &cb);
/// Reads wantedLen bytes of data from current position, calling the dataCallback whenever minLen/maxLen require it. /// Reads wantedLen bytes of data from current position, calling the dataCallback whenever minLen/maxLen require it.
void readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen); size_t readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen);
/// Reads wantedLen bytes of data from current position, returning it in a single buffer. /// Reads wantedLen bytes of data from current position, returning it in a single buffer.
void readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen); size_t readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen);
void readSome(size_t wantedLen, Util::DataCallback &cb); size_t readSome(size_t wantedLen, Util::DataCallback &cb);
/// Closes the currently open URI. Does not change the internal URI value. /// Closes the currently open URI. Does not change the internal URI value.
void close(); void close();

View file

@ -204,19 +204,18 @@ namespace Mist{
playlistMapping[pls.id] = pls; playlistMapping[pls.id] = pls;
plsInitCount++; plsInitCount++;
if (initOnly){ if (initOnly){
INFO_MSG("Thread for %s exiting", pls.uri.c_str());
return; return;
}// Exit because init-only mode }// Exit because init-only mode
while (self->config->is_active){ while (self->config->is_active && streamIsLive){
// If the timer has not expired yet, sleep up to a second. Otherwise, reload.
/// \TODO Sleep longer if that makes sense?
if (pls.reloadNext > Util::bootSecs()){ if (pls.reloadNext > Util::bootSecs()){
Util::sleep(1000); Util::sleep(1000);
}else{ }else{
pls.reload(); pls.reload();
} }
} }
MEDIUM_MSG("Downloader thread for '%s' exiting", pls.uri.c_str()); INFO_MSG("Downloader thread for '%s' exiting", pls.uri.c_str());
} }
Playlist::Playlist(const std::string &uriSource){ Playlist::Playlist(const std::string &uriSource){
@ -280,9 +279,7 @@ namespace Mist{
int count = 0; int count = 0;
std::istringstream urlSource; std::istringstream urlSource;
std::ifstream fileSource;
if (isUrl()){
HTTP::URIReader plsDL; HTTP::URIReader plsDL;
plsDL.open(uri); plsDL.open(uri);
char * dataPtr; char * dataPtr;
@ -294,18 +291,12 @@ namespace Mist{
return false; return false;
} }
urlSource.str(std::string(dataPtr, dataLen)); urlSource.str(std::string(dataPtr, dataLen));
}else{
fileSource.open(uri.c_str());
if (!fileSource.good()){
FAIL_MSG("Could not open playlist (%s): %s", strerror(errno), uri.c_str());
reloadNext = Util::bootSecs() + waitTime;
return false;
}
}
std::istream &input = (isUrl() ? (std::istream &)urlSource : (std::istream &)fileSource); std::istream &input = (std::istream &)urlSource;
std::getline(input, line); std::getline(input, line);
{// Mutex scope
tthread::lock_guard<tthread::mutex> guard(entryMutex);
DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str());
while (std::getline(input, line)){ while (std::getline(input, line)){
DONTEVEN_MSG("Parsing line '%s'", line.c_str()); DONTEVEN_MSG("Parsing line '%s'", line.c_str());
@ -467,7 +458,7 @@ namespace Mist{
} }
// check for already added segments // check for already added segments
DONTEVEN_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); INFO_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment);
if (bposCounter > lastSegment){ if (bposCounter > lastSegment){
char ivec[16]; char ivec[16];
if (keyIV.size()){ if (keyIV.size()){
@ -480,6 +471,7 @@ namespace Mist{
lastSegment = bposCounter; lastSegment = bposCounter;
++count; ++count;
} }
}// Mutex scope
nextUTC = 0; nextUTC = 0;
segDur = 0.0; segDur = 0.0;
startByte = std::string::npos; startByte = std::string::npos;
@ -487,11 +479,6 @@ namespace Mist{
++bposCounter; ++bposCounter;
} }
// VOD over HTTP needs to be processed as LIVE.
if (!isUrl()){
fileSource.close();
}
if (globalWaitTime < waitTime){globalWaitTime = waitTime;} if (globalWaitTime < waitTime){globalWaitTime = waitTime;}
reloadNext = Util::bootSecs() + waitTime; reloadNext = Util::bootSecs() + waitTime;
@ -512,8 +499,6 @@ namespace Mist{
DTSC::veryUglyJitterOverride = entry.duration * 1000; DTSC::veryUglyJitterOverride = entry.duration * 1000;
} }
{
tthread::lock_guard<tthread::mutex> guard(entryMutex);
if (id && listEntries[id].size()){ if (id && listEntries[id].size()){
// If the UTC has gone backwards, shift forward. // If the UTC has gone backwards, shift forward.
playListEntries & prev = listEntries[id].back(); playListEntries & prev = listEntries[id].back();
@ -534,7 +519,6 @@ namespace Mist{
// If startByte unknown and we have a length, set to zero // If startByte unknown and we have a length, set to zero
if (startByte == std::string::npos && lenByte){startByte = 0;} if (startByte == std::string::npos && lenByte){startByte = 0;}
} }
}
if ((lenByte && startByte == std::string::npos) || (!lenByte && startByte != std::string::npos)){ if ((lenByte && startByte == std::string::npos) || (!lenByte && startByte != std::string::npos)){
WARN_MSG("Invalid byte range entry for segment: %s", filename.c_str()); WARN_MSG("Invalid byte range entry for segment: %s", filename.c_str());
lenByte = 0; lenByte = 0;
@ -565,8 +549,7 @@ namespace Mist{
entry.timestamp = lastTimestamp + startTime; entry.timestamp = lastTimestamp + startTime;
} }
lastTimestamp = entry.timestamp - startTime + duration; lastTimestamp = entry.timestamp - startTime + duration;
{
tthread::lock_guard<tthread::mutex> guard(entryMutex);
// Set a playlist ID if we haven't assigned one yet. // Set a playlist ID if we haven't assigned one yet.
// Note: This method requires never removing playlists, only adding. // Note: This method requires never removing playlists, only adding.
// The mutex assures we have a unique count/number. // The mutex assures we have a unique count/number.
@ -579,7 +562,6 @@ namespace Mist{
playlist_urls[JSON::Value(id).asString()] = relurl; playlist_urls[JSON::Value(id).asString()] = relurl;
listEntries[id].push_back(entry); listEntries[id].push_back(entry);
} }
}
/// Constructor of HLS Input /// Constructor of HLS Input
InputHLS::InputHLS(Util::Config *cfg) : Input(cfg){ InputHLS::InputHLS(Util::Config *cfg) : Input(cfg){
@ -648,7 +630,7 @@ namespace Mist{
return false; return false;
} }
if (!initPlaylist(config->getString("input"), false)){ if (!initPlaylist(config->getString("input"), true)){
Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting");
return false; return false;
} }
@ -979,10 +961,6 @@ namespace Mist{
return true; return true;
} }
void InputHLS::streamMainLoop(){
parseLivePoint();
}
// Removes any metadata which is no longer and the playlist or buffered in memory // Removes any metadata which is no longer and the playlist or buffered in memory
void InputHLS::updateMeta(){ void InputHLS::updateMeta(){
// EVENT and VOD type playlists should never segments disappear from the start // EVENT and VOD type playlists should never segments disappear from the start
@ -1009,6 +987,7 @@ namespace Mist{
if (listEntries[currentPlaylist].back().timestamp - listEntries[currentPlaylist].front().timestamp < bufferTime){ if (listEntries[currentPlaylist].back().timestamp - listEntries[currentPlaylist].front().timestamp < bufferTime){
break; break;
} }
if (keys.getValidCount() <= 3){break;}
// First key could still be in memory, but is no longer seekable: drop it // First key could still be in memory, but is no longer seekable: drop it
HIGH_MSG("Removing key %lu @%lu ms on track %lu from metadata", M.getKeys(trackIdx->first).getFirstValid(), M.getFirstms(trackIdx->first), trackIdx->first); HIGH_MSG("Removing key %lu @%lu ms on track %lu from metadata", M.getKeys(trackIdx->first).getFirstValid(), M.getFirstms(trackIdx->first), trackIdx->first);
meta.removeFirstKey(trackIdx->first); meta.removeFirstKey(trackIdx->first);
@ -1021,11 +1000,8 @@ namespace Mist{
// Update all playlists to make sure listEntries contains all live segments // Update all playlists to make sure listEntries contains all live segments
for (std::map<uint64_t, Playlist>::iterator pListIt = playlistMapping.begin(); for (std::map<uint64_t, Playlist>::iterator pListIt = playlistMapping.begin();
pListIt != playlistMapping.end(); pListIt++){ pListIt != playlistMapping.end(); pListIt++){
if (pListIt->second.reloadNext < Util::bootSecs()){
pListIt->second.reload();
}
currentPlaylist = pListIt->first; currentPlaylist = pListIt->first;
const uint64_t firstIdx = playlistMapping[currentPlaylist].firstIndex; const uint64_t firstIdx = pListIt->second.firstIndex;
// If the segment counter decreases, reset counters and remove old segments from metadata // If the segment counter decreases, reset counters and remove old segments from metadata
if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){ if (firstIdx < playlistMapping[currentPlaylist].lastSegment - listEntries[currentPlaylist].size()){
@ -1036,13 +1012,10 @@ namespace Mist{
} }
// Remove segments from listEntries as soon as it is no longer requestable // Remove segments from listEntries as soon as it is no longer requestable
{
tthread::lock_guard<tthread::mutex> guard(entryMutex);
while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){ while (listEntries[currentPlaylist].front().bytePos < firstIdx + 1){
MEDIUM_MSG("Segment #%lu no longer in the input playlist", firstIdx + 1); INFO_MSG("Segment #%" PRIu64 " no longer in the input playlist", listEntries[currentPlaylist].front().bytePos);
listEntries[currentPlaylist].pop_front(); listEntries[currentPlaylist].pop_front();
} }
}
// Unload memory pages which are outside of the buffer window and not recently loaded // Unload memory pages which are outside of the buffer window and not recently loaded
removeUnused(); removeUnused();
@ -1052,20 +1025,18 @@ namespace Mist{
// Check for new segments // Check for new segments
if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){ if (listEntries[currentPlaylist].size() != parsedSegments[currentPlaylist] - firstIdx){
INFO_MSG("Playlist #%lu has parsed %" PRIu64 "/%zu entries. Parsing new segments...", currentPlaylist, parsedSegments[currentPlaylist] - firstIdx, listEntries[currentPlaylist].size()); INFO_MSG("Playlist #%lu has parsed %" PRId64 "/%zu entries. Parsing new segments...", currentPlaylist, (int64_t)(parsedSegments[currentPlaylist] - firstIdx), listEntries[currentPlaylist].size());
}else if (isInitialRun){ }else if (isInitialRun){
isInitialRun = false; isInitialRun = false;
} }
if (parsedSegments[currentPlaylist] < firstIdx){ if (parsedSegments[currentPlaylist] < firstIdx){
WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx); WARN_MSG("Skipping from segment #%lu to segment #%lu since we've fallen behind", parsedSegments[currentPlaylist], firstIdx + listEntries[currentPlaylist].size() - 1);
parsedSegments[currentPlaylist] = firstIdx; parsedSegments[currentPlaylist] = firstIdx + listEntries[currentPlaylist].size() - 1;
} }
for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){ for(uint64_t entryIt = parsedSegments[currentPlaylist] - firstIdx; entryIt < listEntries[currentPlaylist].size(); entryIt++){
MEDIUM_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1); INFO_MSG("Adding segment #%" PRIu64 " as live data", firstIdx + entryIt + 1);
if (parseSegmentAsLive(entryIt)){ if (parseSegmentAsLive(entryIt)){parsedSegments[currentPlaylist] = firstIdx + entryIt + 1;}
parsedSegments[currentPlaylist] = firstIdx + entryIt + 1; if (Util::bootMS() > maxTime){break;}
}
if (Util::bootMS() > maxTime){return;}
} }
} }
} }
@ -1074,6 +1045,7 @@ namespace Mist{
void InputHLS::userLeadOut(){ void InputHLS::userLeadOut(){
Input::userLeadOut(); Input::userLeadOut();
if (streamIsLive){ if (streamIsLive){
tthread::lock_guard<tthread::mutex> guard(entryMutex);
parseLivePoint(); parseLivePoint();
} }
} }
@ -1118,7 +1090,7 @@ namespace Mist{
currentPlaylist = firstSegment(); currentPlaylist = firstSegment();
} }
if (currentPlaylist == 0){ if (currentPlaylist == 0){
VERYHIGH_MSG("Waiting for segments..."); INFO_MSG("Waiting for segments...");
Util::wait(500); Util::wait(500);
continue; continue;
} }

View file

@ -156,7 +156,6 @@ namespace Mist{
void parseStreamHeader(); void parseStreamHeader();
void parseLivePoint(); void parseLivePoint();
void streamMainLoop();
uint32_t getMappedTrackId(uint64_t id); uint32_t getMappedTrackId(uint64_t id);
uint32_t getMappedTrackPlaylist(uint64_t id); uint32_t getMappedTrackPlaylist(uint64_t id);