diff --git a/lib/downloader.cpp b/lib/downloader.cpp index 8a09a881..6bfa1232 100644 --- a/lib/downloader.cpp +++ b/lib/downloader.cpp @@ -71,6 +71,14 @@ namespace HTTP{ return S; } + void Downloader::clean(){ + H.headerOnly = false; + H.Clean(); + getSocket().close(); + getSocket().Received().clear(); + extraHeaders.clear(); + } + ///Sets an override to use the given socket void Downloader::setSocket(Socket::Connection * socketPtr){ sPtr = socketPtr; @@ -187,10 +195,7 @@ namespace HTTP{ if (progressCallback != 0){ if (!progressCallback()){ WARN_MSG("Download aborted by callback"); - H.headerOnly = false; - H.Clean(); - getSocket().close(); - getSocket().Received().clear(); + clean(); return false; } } @@ -203,10 +208,7 @@ namespace HTTP{ // If the return status code is invalid, close the socket, wipe all buffers, and return false if(!getStatusCode()){ - H.headerOnly = false; - getSocket().close(); - getSocket().Received().clear(); - H.Clean(); + clean(); return false; } @@ -233,10 +235,7 @@ namespace HTTP{ if (progressCallback != 0){ if (!progressCallback()){ WARN_MSG("Download aborted by callback"); - H.headerOnly = false; - H.Clean(); - getSocket().close(); - getSocket().Received().clear(); + clean(); return false; } } @@ -246,11 +245,9 @@ namespace HTTP{ } } } - H.headerOnly = false; if (getSocket()){ FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); - getSocket().close(); }else{ if (loop > 1){ INFO_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); @@ -258,8 +255,7 @@ namespace HTTP{ MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); } } - H.Clean(); - getSocket().Received().clear(); + clean(); Util::sleep(100); // wait a bit before retrying } FAIL_MSG("Could not retrieve %s", link.getUrl().c_str()); @@ -309,6 +305,7 @@ namespace HTTP{ nbMaxRecursiveDepth = maxRecursiveDepth; nbLoop = retryCount + 1; // max 5 attempts isComplete = false; + extraHeaders.erase("Range"); doRequest(nbLink); nbReqTime = Util::bootSecs(); nbLastOff = getSocket().dataDown(); @@ -343,28 +340,17 @@ namespace HTTP{ } if (H.hasHeader("Accept-Ranges") && getHeader("Accept-Ranges").size() > 0){ - getRangeNonBlocking(nbLink, H.currentLength, 0, cb); - return true; + getRangeNonBlocking(nbLink, cb.getDataCallbackPos(), 0, cb); + continue; }else{ doRequest(nbLink); } - if (!getSocket()){ - WARN_MSG("Aborting download: could not open connection"); - return true; - } nbReqTime = Util::bootSecs(); nbLastOff = getSocket().dataDown(); } - if (Util::bootSecs() >= nbReqTime + dataTimeout){ - FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", nbLink.getUrl().c_str(), - retryCount - nbLoop + 1, retryCount); - getSocket().close(); - return false; // because we may have retries left - } - - // No data? Wait for a second or so. + // No data? Return false to indicate retry later. if (!getSocket().spool() && getSocket()){ if (progressCallback != 0){ if (!progressCallback()){ @@ -372,9 +358,30 @@ namespace HTTP{ return true; } } + if (Util::bootSecs() >= nbReqTime + dataTimeout){ + FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", nbLink.getUrl().c_str(), + retryCount - nbLoop + 1, retryCount); + getSocket().close(); + return false; // Retry on next attempt + } return false; } - // Data! Check if we can parse it... + + // Reset the data timeout + if (nbReqTime != Util::bootSecs()){ + if (progressCallback != 0){ + if (!progressCallback()){ + WARN_MSG("Download aborted by callback"); + return true; + } + } + if (getSocket().dataDown() > nbLastOff + 25600){ + nbReqTime = Util::bootSecs(); + nbLastOff = getSocket().dataDown(); + } + } + + //Attempt to parse the data we received if (H.Read(getSocket(), cb)){ if (shouldContinue()){ if (nbMaxRecursiveDepth == 0){ @@ -393,21 +400,7 @@ namespace HTTP{ isComplete = true; // Success return true; } - // reset the data timeout - if (nbReqTime != Util::bootSecs()){ - if (progressCallback != 0){ - if (!progressCallback()){ - WARN_MSG("Download aborted by callback"); - return true; - } - } - if (getSocket().dataDown() > nbLastOff + 25600){ - nbReqTime = Util::bootSecs(); - nbLastOff = getSocket().dataDown(); - } - } } - WARN_MSG("Invalid connection state for HTTP request"); return false; // we should never get here } diff --git a/lib/downloader.h b/lib/downloader.h index 04697f27..92bcca2c 100644 --- a/lib/downloader.h +++ b/lib/downloader.h @@ -45,6 +45,7 @@ namespace HTTP{ Parser &getHTTP(); Socket::Connection &getSocket(); const Socket::Connection &getSocket() const; + void clean(); void setSocket(Socket::Connection * socketPtr); uint32_t retryCount, dataTimeout; bool isProxied() const; diff --git a/lib/http_parser.h b/lib/http_parser.h index 4d695264..35845aac 100644 --- a/lib/http_parser.h +++ b/lib/http_parser.h @@ -17,7 +17,7 @@ namespace HTTP{ void parseVars(const std::string &data, std::map &storage, const std::string & separator = "&"); /// Simple class for reading and writing HTTP 1.0 and 1.1. - class Parser : public Util::DataCallback{ + class Parser{ public: Parser(); bool Read(Socket::Connection &conn, Util::DataCallback &cb = Util::defaultDataCallback); diff --git a/lib/urireader.cpp b/lib/urireader.cpp index bc690980..ffc40c94 100644 --- a/lib/urireader.cpp +++ b/lib/urireader.cpp @@ -65,13 +65,17 @@ namespace HTTP{ return url; } + HTTP::URL localURIResolver(){ + char workDir[512]; + getcwd(workDir, 512); + return HTTP::URL(std::string("file://") + workDir + "/"); + } void URIReader::init(){ handle = -1; mapped = 0; - char workDir[512]; - getcwd(workDir, 512); - myURI = HTTP::URL(std::string("file://") + workDir + "/"); + myURI = localURIResolver(); + originalUrl = myURI; cbProgress = 0; minLen = 1; maxLen = std::string::npos; @@ -97,11 +101,13 @@ namespace HTTP{ open(reluri); } - bool URIReader::open(const std::string &reluri){return open(myURI.link(reluri));} + bool URIReader::open(const std::string &reluri){return open(originalUrl.link(reluri));} /// Internal callback function, used to buffer data. void URIReader::dataCallback(const char *ptr, size_t size){allData.append(ptr, size);} + size_t URIReader::getDataCallbackPos() const{return allData.size();} + bool URIReader::open(const HTTP::URL &uri){ close(); myURI = uri; @@ -247,14 +253,13 @@ namespace HTTP{ //HTTP-based needs to do a range request if (stateType == HTTP::HTTP && supportRangeRequest){ - downer.getSocket().close(); - downer.getSocket().Received().clear(); + downer.clean(); + curPos = pos; injectHeaders(originalUrl, "GET", downer); - if (!downer.getRangeNonBlocking(myURI.getUrl(), pos, 0)){ + if (!downer.getRangeNonBlocking(myURI, pos, 0)){ FAIL_MSG("Error making range request"); return false; } - curPos = pos; return true; } return false; @@ -354,9 +359,7 @@ namespace HTTP{ allData.truncate(0); bufPos = 0; // Close downloader socket if open - downer.getSocket().close(); - downer.getSocket().Received().clear(); - downer.getHTTP().Clean(); + downer.clean(); // Unmap file if mapped if (mapped){ munmap(mapped, totalSize); @@ -408,7 +411,7 @@ namespace HTTP{ uint64_t URIReader::getPos(){return curPos;} - const HTTP::URL &URIReader::getURI() const{return myURI;} + const HTTP::URL &URIReader::getURI() const{return originalUrl;} size_t URIReader::getSize() const{return totalSize;} diff --git a/lib/urireader.h b/lib/urireader.h index 72fd90bb..1d4da390 100644 --- a/lib/urireader.h +++ b/lib/urireader.h @@ -60,7 +60,8 @@ namespace HTTP{ size_t getSize() const; ///< Returns the size of the currently open URI, if known. Returns std::string::npos if unknown size. void (*httpBodyCallback)(const char *ptr, size_t size); - void dataCallback(const char *ptr, size_t size); + virtual void dataCallback(const char *ptr, size_t size); + virtual size_t getDataCallbackPos() const; std::string userAgentOverride; @@ -86,4 +87,6 @@ namespace HTTP{ HTTP::Downloader downer; ///< For HTTP(S)-based URIs, the Downloader instance used for the download. void init(); }; + + HTTP::URL localURIResolver(); }// namespace HTTP diff --git a/lib/url.cpp b/lib/url.cpp index 77fa92c2..0978dc14 100644 --- a/lib/url.cpp +++ b/lib/url.cpp @@ -345,3 +345,13 @@ HTTP::URL HTTP::URL::link(const std::string &l) const{ DONTEVEN_MSG("Relative link: %s+%s", base.c_str(), l.c_str()); return URL(base + l); } + +/// Returns true if the URL matches, ignoring username, password and fragment +bool HTTP::URL::operator==(const URL& rhs) const{ + return (host == rhs.host && getPort() == rhs.getPort() && protocol == rhs.protocol && path == rhs.path && args == rhs.args); +} + +/// Returns false if the URL matches, ignoring username, password and fragment. +/// Simply calls == internally, and negates the result. +bool HTTP::URL::operator!=(const URL& rhs) const{return !(*this == rhs);} + diff --git a/lib/url.h b/lib/url.h index 8e2c9b42..479e4378 100644 --- a/lib/url.h +++ b/lib/url.h @@ -34,6 +34,8 @@ namespace HTTP{ std::string pass; ///< Password, if it was present URL link(const std::string &l) const; bool IPv6Addr; + bool operator==(const URL& rhs) const; + bool operator!=(const URL& rhs) const; }; }// namespace HTTP diff --git a/lib/util.h b/lib/util.h index e9f28ef7..0970807e 100644 --- a/lib/util.h +++ b/lib/util.h @@ -30,6 +30,7 @@ namespace Util{ virtual void dataCallback(const char *ptr, size_t size){ INFO_MSG("default callback, size: %zu", size); } + virtual size_t getDataCallbackPos() const{return 0;} virtual ~DataCallback(){}; }; diff --git a/src/input/input.cpp b/src/input/input.cpp index 9c14fc27..0cca27e1 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -58,7 +58,7 @@ namespace Mist{ if (i == key){ keyLoadPriority[trackKey(track, pageNumber)] += 10000; }else{ - keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (key - i); + keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (i - key); } uint64_t cnt = tPages.getInt("keycount", pageIdx); if (pageNumber + cnt <= i){return;} @@ -73,6 +73,7 @@ namespace Mist{ std::multimap reverse; for (std::map::iterator i = keyLoadPriority.begin(); i != keyLoadPriority.end(); ++i){ reverse.insert(std::pair(i->second, i->first)); + VERYHIGH_MSG("Key priority for %zu:%zu = %" PRIu64, i->first.track, i->first.key, i->second); } uint64_t timer = Util::bootMS(); for (std::multimap::reverse_iterator i = reverse.rbegin(); i != reverse.rend() && Util::bootMS() < timer + 500; ++i){ @@ -1543,12 +1544,21 @@ namespace Mist{ } bufferFinalize(idx, page); bufferTimer = Util::bootMS() - bufferTimer; - INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms", - idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer); - INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, - tPages.getInt("parts", pageIdx), byteCounter); - pageCounter[idx][pageNumber] = Util::bootSecs(); - return true; + if (packCounter != tPages.getInt("parts", pageIdx)){ + FAIL_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) NOT FULLY buffered in %" PRIu64 "ms", + idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer); + INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, + tPages.getInt("parts", pageIdx), byteCounter); + pageCounter[idx][pageNumber] = Util::bootSecs(); + return false; + }else{ + INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms", + idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer); + INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter, + tPages.getInt("parts", pageIdx), byteCounter); + pageCounter[idx][pageNumber] = Util::bootSecs(); + return true; + } } bool Input::atKeyFrame(){ diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index da2b3b69..b86339a9 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -109,6 +109,11 @@ namespace Mist{ /// Order of adding/accessing for local RAM buffer of segments std::deque segBufAccs; + /// Order of adding/accessing sizes for local RAM buffer of segments + std::deque segBufSize; + + size_t segBufTotalSize = 0; + /// Track which segment numbers have been parsed std::map parsedSegments; @@ -249,7 +254,7 @@ namespace Mist{ bool SegmentDownloader::atEnd() const{ if (!isOpen || !currBuf){return true;} if (buffered){return currBuf->size() <= offset + 188;} - return segDL.isEOF() && currBuf->size() <= offset + 188; + return !segDL && currBuf->size() <= offset + 188; // return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); } @@ -318,9 +323,25 @@ namespace Mist{ if (atEnd()){return false;} }else{ if (!currBuf){return false;} + size_t retries = 0; while (segDL && currBuf->size() < offset + 188 + 188){ - segDL.readSome(188, *this); - if (currBuf->size() < offset + 188 + 188){Util::sleep(50);} + segDL.readSome(offset + 188 + 188 - currBuf->size(), *this); + if (currBuf->size() < offset + 188 + 188){ + if (!segDL){ + if (!segDL.isSeekable()){return false;} + // Only retry/resume if seekable and allocated size greater than current size + if (currBuf->rsize() > currBuf->size()){ + // Seek to current position to resume + ++retries; + if (retries > 5){ + segDL.close(); + return false; + } + segDL.seek(currBuf->size()); + } + } + Util::sleep(50); + } } if (currBuf->size() < offset + 188 + 188){return false;} } @@ -332,7 +353,13 @@ namespace Mist{ } packetPtr = *currBuf + offset; if (!packetPtr || packetPtr[0] != 0x47){ - FAIL_MSG("Not a valid TS packet: first byte %" PRIu8, packetPtr?(uint8_t)packetPtr[0]:0); + std::stringstream packData; + if (packetPtr){ + for (uint64_t i = 0; i < 188; ++i){ + packData << std::hex << std::setw(2) << std::setfill('0') << (unsigned int)packetPtr[i]; + } + } + FAIL_MSG("Not a valid TS packet: byte %zu is not 0x47: %s", offset, packData.str().c_str()); return false; } return true; @@ -341,8 +368,14 @@ namespace Mist{ void SegmentDownloader::dataCallback(const char *ptr, size_t size){ currBuf->append(ptr, size); + //Overwrite the current segment size + segBufTotalSize -= segBufSize.front(); + segBufSize.front() = currBuf->size(); + segBufTotalSize += segBufSize.front(); } + size_t SegmentDownloader::getDataCallbackPos() const{return currBuf->size();} + /// Attempts to read a single TS packet from the current segment, setting packetPtr on success void SegmentDownloader::close(){ packetPtr = 0; @@ -362,21 +395,54 @@ namespace Mist{ firstPacket = true; buffered = segBufs.count(entry.filename); if (!buffered){ - INFO_MSG("Reading non-cache: %s", entry.filename.c_str()); + HIGH_MSG("Reading non-cache: %s", entry.filename.c_str()); if (!segDL.open(entry.filename)){ FAIL_MSG("Could not open %s", entry.filename.c_str()); return false; } if (!segDL){return false;} - if (segBufs.size() > 30){ + //Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times) + while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){ + HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().c_str()); segBufs.erase(segBufAccs.back()); + segBufTotalSize -= segBufSize.back(); segBufAccs.pop_back(); + segBufSize.pop_back(); } segBufAccs.push_front(entry.filename); + segBufSize.push_front(0); + currBuf = &(segBufs[entry.filename]); }else{ - INFO_MSG("Reading from segment cache: %s", entry.filename.c_str()); + HIGH_MSG("Reading from segment cache: %s", entry.filename.c_str()); + currBuf = &(segBufs[entry.filename]); + if (currBuf->rsize() != currBuf->size()){ + MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize()); + buffered = false; + // We only re-open and seek if the opened URL doesn't match what we want already + HTTP::URL A = segDL.getURI(); + HTTP::URL B = HTTP::localURIResolver().link(entry.filename); + if (A != B){ + if (!segDL.open(entry.filename)){ + FAIL_MSG("Could not open %s", entry.filename.c_str()); + return false; + } + if (!segDL){return false;} + //Seek to current position in segment for resuming + currBuf->truncate(currBuf->size() / 188 * 188); + MEDIUM_MSG("Seeking to %zu", currBuf->size()); + segDL.seek(currBuf->size()); + } + } + } + if (!buffered){ + // Allocate full size if known + if (segDL.getSize() != std::string::npos){currBuf->allocate(segDL.getSize());} + // Download full segment if not seekable, pretend it was cached all along + if (!segDL.isSeekable()){ + segDL.readAll(*this); + buffered = true; + } } - currBuf = &(segBufs[entry.filename]); encrypted = false; outData.truncate(0); @@ -1221,7 +1287,7 @@ namespace Mist{ currentIndex = plistEntry - 1; currentPlaylist = getMappedTrackPlaylist(trackId); - INFO_MSG("Seeking to index %zu on playlist %" PRIu64, currentIndex, currentPlaylist); + VERYHIGH_MSG("Seeking to index %zu on playlist %" PRIu64, currentIndex, currentPlaylist); {// Lock mutex for listEntries tthread::lock_guard guard(entryMutex); diff --git a/src/input/input_hls.h b/src/input/input_hls.h index f160433f..2d799299 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -58,6 +58,7 @@ namespace Mist{ bool loadSegment(const playListEntries &entry); bool readNext(); virtual void dataCallback(const char *ptr, size_t size); + virtual size_t getDataCallbackPos() const; void close(); bool atEnd() const; diff --git a/src/input/input_mp4.cpp b/src/input/input_mp4.cpp index f2aaac28..b4e02061 100644 --- a/src/input/input_mp4.cpp +++ b/src/input/input_mp4.cpp @@ -156,6 +156,7 @@ namespace Mist{ } void inputMP4::dataCallback(const char *ptr, size_t size){readBuffer.append(ptr, size);} + size_t inputMP4::getDataCallbackPos() const{return readPos + readBuffer.size();} bool inputMP4::needHeader(){ //Attempt to read cache, but force calling of the readHeader function anyway diff --git a/src/input/input_mp4.h b/src/input/input_mp4.h index 3d3fd317..f491a92f 100644 --- a/src/input/input_mp4.h +++ b/src/input/input_mp4.h @@ -73,7 +73,8 @@ namespace Mist{ class inputMP4 : public Input, public Util::DataCallback { public: inputMP4(Util::Config *cfg); - void dataCallback(const char *ptr, size_t size); + virtual void dataCallback(const char *ptr, size_t size); + virtual size_t getDataCallbackPos() const; protected: // Private Functions diff --git a/src/output/output.cpp b/src/output/output.cpp index 74a5eddf..e41c70de 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -861,6 +861,7 @@ namespace Mist{ } uint64_t actualKeyTime = keys.getTime(keyNum); HIGH_MSG("Seeking to track %zu key %" PRIu32 " => time %" PRIu64, tid, keyNum, pos); + emptyCount = 0; if (actualKeyTime > pos){ pos = actualKeyTime; userSelect[tid].setKeyNum(keyNum); @@ -2076,6 +2077,8 @@ namespace Mist{ // in sync mode, after ~25 seconds, give up and drop the track. if (++emptyCount >= dataWaitTimeout){ + //curPage[nxt.tid].mapped + nxt.offset + preLoad.getDataLen() + WARN_MSG("Waiting at %s byte %zu", curPage[nxt.tid].name.c_str(), nxt.offset + preLoad.getDataLen()); dropTrack(nxt.tid, "EOP: data wait timeout"); return false; }