Fixing all of the things. 😩

This commit is contained in:
Thulinma 2023-03-15 03:43:19 +01:00
parent 53f941449f
commit c979acff52
14 changed files with 171 additions and 76 deletions

View file

@ -58,7 +58,7 @@ namespace Mist{
if (i == key){
keyLoadPriority[trackKey(track, pageNumber)] += 10000;
}else{
keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (key - i);
keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (i - key);
}
uint64_t cnt = tPages.getInt("keycount", pageIdx);
if (pageNumber + cnt <= i){return;}
@ -73,6 +73,7 @@ namespace Mist{
std::multimap<uint64_t, trackKey> reverse;
for (std::map<trackKey, uint64_t>::iterator i = keyLoadPriority.begin(); i != keyLoadPriority.end(); ++i){
reverse.insert(std::pair<uint64_t, trackKey>(i->second, i->first));
VERYHIGH_MSG("Key priority for %zu:%zu = %" PRIu64, i->first.track, i->first.key, i->second);
}
uint64_t timer = Util::bootMS();
for (std::multimap<uint64_t, trackKey>::reverse_iterator i = reverse.rbegin(); i != reverse.rend() && Util::bootMS() < timer + 500; ++i){
@ -1543,12 +1544,21 @@ namespace Mist{
}
bufferFinalize(idx, page);
bufferTimer = Util::bootMS() - bufferTimer;
INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms",
idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer);
INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter,
tPages.getInt("parts", pageIdx), byteCounter);
pageCounter[idx][pageNumber] = Util::bootSecs();
return true;
if (packCounter != tPages.getInt("parts", pageIdx)){
FAIL_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) NOT FULLY buffered in %" PRIu64 "ms",
idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer);
INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter,
tPages.getInt("parts", pageIdx), byteCounter);
pageCounter[idx][pageNumber] = Util::bootSecs();
return false;
}else{
INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms",
idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer);
INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter,
tPages.getInt("parts", pageIdx), byteCounter);
pageCounter[idx][pageNumber] = Util::bootSecs();
return true;
}
}
bool Input::atKeyFrame(){

View file

@ -109,6 +109,11 @@ namespace Mist{
/// Order of adding/accessing for local RAM buffer of segments
std::deque<std::string> segBufAccs;
/// Order of adding/accessing sizes for local RAM buffer of segments
std::deque<size_t> segBufSize;
size_t segBufTotalSize = 0;
/// Track which segment numbers have been parsed
std::map<uint64_t, uint64_t> parsedSegments;
@ -249,7 +254,7 @@ namespace Mist{
bool SegmentDownloader::atEnd() const{
if (!isOpen || !currBuf){return true;}
if (buffered){return currBuf->size() <= offset + 188;}
return segDL.isEOF() && currBuf->size() <= offset + 188;
return !segDL && currBuf->size() <= offset + 188;
// return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
}
@ -318,9 +323,25 @@ namespace Mist{
if (atEnd()){return false;}
}else{
if (!currBuf){return false;}
size_t retries = 0;
while (segDL && currBuf->size() < offset + 188 + 188){
segDL.readSome(188, *this);
if (currBuf->size() < offset + 188 + 188){Util::sleep(50);}
segDL.readSome(offset + 188 + 188 - currBuf->size(), *this);
if (currBuf->size() < offset + 188 + 188){
if (!segDL){
if (!segDL.isSeekable()){return false;}
// Only retry/resume if seekable and allocated size greater than current size
if (currBuf->rsize() > currBuf->size()){
// Seek to current position to resume
++retries;
if (retries > 5){
segDL.close();
return false;
}
segDL.seek(currBuf->size());
}
}
Util::sleep(50);
}
}
if (currBuf->size() < offset + 188 + 188){return false;}
}
@ -332,7 +353,13 @@ namespace Mist{
}
packetPtr = *currBuf + offset;
if (!packetPtr || packetPtr[0] != 0x47){
FAIL_MSG("Not a valid TS packet: first byte %" PRIu8, packetPtr?(uint8_t)packetPtr[0]:0);
std::stringstream packData;
if (packetPtr){
for (uint64_t i = 0; i < 188; ++i){
packData << std::hex << std::setw(2) << std::setfill('0') << (unsigned int)packetPtr[i];
}
}
FAIL_MSG("Not a valid TS packet: byte %zu is not 0x47: %s", offset, packData.str().c_str());
return false;
}
return true;
@ -341,8 +368,14 @@ namespace Mist{
void SegmentDownloader::dataCallback(const char *ptr, size_t size){
currBuf->append(ptr, size);
//Overwrite the current segment size
segBufTotalSize -= segBufSize.front();
segBufSize.front() = currBuf->size();
segBufTotalSize += segBufSize.front();
}
size_t SegmentDownloader::getDataCallbackPos() const{return currBuf->size();}
/// Attempts to read a single TS packet from the current segment, setting packetPtr on success
void SegmentDownloader::close(){
packetPtr = 0;
@ -362,21 +395,54 @@ namespace Mist{
firstPacket = true;
buffered = segBufs.count(entry.filename);
if (!buffered){
INFO_MSG("Reading non-cache: %s", entry.filename.c_str());
HIGH_MSG("Reading non-cache: %s", entry.filename.c_str());
if (!segDL.open(entry.filename)){
FAIL_MSG("Could not open %s", entry.filename.c_str());
return false;
}
if (!segDL){return false;}
if (segBufs.size() > 30){
//Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times)
while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){
HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().c_str());
segBufs.erase(segBufAccs.back());
segBufTotalSize -= segBufSize.back();
segBufAccs.pop_back();
segBufSize.pop_back();
}
segBufAccs.push_front(entry.filename);
segBufSize.push_front(0);
currBuf = &(segBufs[entry.filename]);
}else{
INFO_MSG("Reading from segment cache: %s", entry.filename.c_str());
HIGH_MSG("Reading from segment cache: %s", entry.filename.c_str());
currBuf = &(segBufs[entry.filename]);
if (currBuf->rsize() != currBuf->size()){
MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize());
buffered = false;
// We only re-open and seek if the opened URL doesn't match what we want already
HTTP::URL A = segDL.getURI();
HTTP::URL B = HTTP::localURIResolver().link(entry.filename);
if (A != B){
if (!segDL.open(entry.filename)){
FAIL_MSG("Could not open %s", entry.filename.c_str());
return false;
}
if (!segDL){return false;}
//Seek to current position in segment for resuming
currBuf->truncate(currBuf->size() / 188 * 188);
MEDIUM_MSG("Seeking to %zu", currBuf->size());
segDL.seek(currBuf->size());
}
}
}
if (!buffered){
// Allocate full size if known
if (segDL.getSize() != std::string::npos){currBuf->allocate(segDL.getSize());}
// Download full segment if not seekable, pretend it was cached all along
if (!segDL.isSeekable()){
segDL.readAll(*this);
buffered = true;
}
}
currBuf = &(segBufs[entry.filename]);
encrypted = false;
outData.truncate(0);
@ -1221,7 +1287,7 @@ namespace Mist{
currentIndex = plistEntry - 1;
currentPlaylist = getMappedTrackPlaylist(trackId);
INFO_MSG("Seeking to index %zu on playlist %" PRIu64, currentIndex, currentPlaylist);
VERYHIGH_MSG("Seeking to index %zu on playlist %" PRIu64, currentIndex, currentPlaylist);
{// Lock mutex for listEntries
tthread::lock_guard<tthread::mutex> guard(entryMutex);

View file

@ -58,6 +58,7 @@ namespace Mist{
bool loadSegment(const playListEntries &entry);
bool readNext();
virtual void dataCallback(const char *ptr, size_t size);
virtual size_t getDataCallbackPos() const;
void close();
bool atEnd() const;

View file

@ -156,6 +156,7 @@ namespace Mist{
}
void inputMP4::dataCallback(const char *ptr, size_t size){readBuffer.append(ptr, size);}
size_t inputMP4::getDataCallbackPos() const{return readPos + readBuffer.size();}
bool inputMP4::needHeader(){
//Attempt to read cache, but force calling of the readHeader function anyway

View file

@ -73,7 +73,8 @@ namespace Mist{
class inputMP4 : public Input, public Util::DataCallback {
public:
inputMP4(Util::Config *cfg);
void dataCallback(const char *ptr, size_t size);
virtual void dataCallback(const char *ptr, size_t size);
virtual size_t getDataCallbackPos() const;
protected:
// Private Functions