From f88a8fc51c07eb2255aa6681ab41a8cea7dd0cf7 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 30 Aug 2020 01:20:06 +0200 Subject: [PATCH] Downloader improvements: - Added metrics to Downloader::post (only at HIGH level for successful requests) - Improved downloader timing - Robustness improvements --- lib/downloader.cpp | 127 +++++++++++++++++++++++++++++++------------- lib/downloader.h | 1 + lib/http_parser.cpp | 4 +- lib/socket.h | 5 +- 4 files changed, 96 insertions(+), 41 deletions(-) diff --git a/lib/downloader.cpp b/lib/downloader.cpp index 547e4fc7..d74e3aff 100644 --- a/lib/downloader.cpp +++ b/lib/downloader.cpp @@ -168,17 +168,18 @@ namespace HTTP{ /// Do a HEAD request to download the HTTP headers only, returns true on success bool Downloader::head(const HTTP::URL &link, uint8_t maxRecursiveDepth){ if (!canRequest(link)){return false;} - size_t loop = retryCount + 1; // max 5 attempts - while (--loop){// loop while we are unsuccessful - MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); + size_t loop = 0; + while (++loop <= retryCount){// loop while we are unsuccessful + MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); doRequest(link, "HEAD"); if (!getSocket()){ FAIL_MSG("Could not retrieve %s: %s", link.getUrl().c_str(), getSocket().getError().c_str()); return false; } H.headerOnly = true; - uint64_t reqTime = Util::bootSecs(); - while (getSocket() && Util::bootSecs() < reqTime + dataTimeout){ + uint64_t reqTime = Util::bootMS(); + uint64_t lastOff = getSocket().dataDown(); + while (getSocket() && Util::bootMS() < reqTime + dataTimeout*1000){ // No data? Wait for a second or so. if (!getSocket().spool()){ if (progressCallback != 0){ @@ -188,7 +189,7 @@ namespace HTTP{ return false; } } - Util::sleep(250); + Util::sleep(25); continue; } // Data! Check if we can parse it... @@ -213,35 +214,36 @@ namespace HTTP{ return true; // Success! } // reset the data timeout - if (reqTime != Util::bootSecs()){ + if (reqTime+1000 < Util::bootMS()){ if (progressCallback != 0){ if (!progressCallback()){ WARN_MSG("Download aborted by callback"); H.headerOnly = false; + getSocket().close(); return false; } } - reqTime = Util::bootSecs(); + if (getSocket().dataDown() > lastOff + 25600){ + reqTime = Util::bootMS(); + lastOff = getSocket().dataDown(); + } } } H.headerOnly = false; if (getSocket()){ - FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), - retryCount - loop + 1, retryCount); + FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); H.Clean(); getSocket().close(); }else{ - if (retryCount - loop + 1 > 2){ - INFO_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), - retryCount - loop + 1, retryCount); + if (loop > 1){ + INFO_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); }else{ - MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), - retryCount - loop + 1, retryCount); + MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); } H.Clean(); } - Util::sleep(500); // wait a bit before retrying + Util::sleep(100); // wait a bit before retrying } FAIL_MSG("Could not retrieve %s", link.getUrl().c_str()); return false; @@ -292,6 +294,7 @@ namespace HTTP{ isComplete = false; doRequest(nbLink); nbReqTime = Util::bootSecs(); + nbLastOff = getSocket().dataDown(); return true; } @@ -305,7 +308,7 @@ namespace HTTP{ if (nbLoop < 2){ FAIL_MSG("Exceeded retry limit while retrieving %s (%zu/%" PRIu32 ")", nbLink.getUrl().c_str(), retryCount - nbLoop + 1, retryCount); - Util::sleep(1000); + Util::sleep(100); return true; } nbLoop--; @@ -334,6 +337,7 @@ namespace HTTP{ return true; } nbReqTime = Util::bootSecs(); + nbLastOff = getSocket().dataDown(); } if (Util::bootSecs() >= nbReqTime + dataTimeout){ @@ -380,7 +384,10 @@ namespace HTTP{ return true; } } - nbReqTime = Util::bootSecs(); + if (getSocket().dataDown() > nbLastOff + 25600){ + nbReqTime = Util::bootSecs(); + nbLastOff = getSocket().dataDown(); + } } } WARN_MSG("Invalid connection state for HTTP request"); @@ -394,27 +401,65 @@ namespace HTTP{ bool Downloader::post(const HTTP::URL &link, const void *payload, const size_t payloadLen, bool sync, uint8_t maxRecursiveDepth){ if (!canRequest(link)){return false;} - size_t loop = retryCount; // max 5 attempts - while (--loop){// loop while we are unsuccessful - MEDIUM_MSG("Posting to %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); - doRequest(link, "POST", payload, payloadLen); + size_t loop = 0; + while (++loop <= retryCount){// loop while we are unsuccessful + MEDIUM_MSG("Posting to %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount); + uint64_t prerequest = Util::getMicros(); + doRequest(link, "POST", 0, payloadLen); + Socket::Connection & s = getSocket(); + if (payloadLen && payload){ + unsigned int payOff = 0; + uint64_t bodyActive = Util::bootMS(); + unsigned int lastOff = 0; + bool wasBlocking = s.isBlocking(); + s.setBlocking(false); + while (s && payOff < payloadLen){ + unsigned int bytes = s.iwrite((char*)payload+payOff, payloadLen-payOff); + payOff += bytes; + if (!bytes){ + Util::sleep(100); + }else{ + if (payOff > lastOff + 25600){ + bodyActive = Util::bootMS(); + lastOff = payOff; + } + } + if (Util::bootMS() >= bodyActive + dataTimeout*1000){ + uint64_t postrequest = Util::getMicros(); + FAIL_MSG("Timeout during sending of POST body after %u bytes and %.2fms!", payOff, (postrequest-prerequest)/1000.0); + s.close(); + } + } + s.setBlocking(wasBlocking); + } + if (!s){continue;} + uint64_t postrequest = Util::getMicros(); + uint64_t preresponse = 0; // Not synced? Ignore the response and immediately return true. - if (!sync){return true;} - uint64_t reqTime = Util::bootSecs(); - while (getSocket() && Util::bootSecs() < reqTime + dataTimeout){ + if (!sync){ + VERYHIGH_MSG("Post to %s completed in %.2f ms", link.getUrl().c_str(), (postrequest-prerequest)/1000.0); + return true; + } + uint64_t reqTime = Util::bootMS(); + uint64_t lastOff = getSocket().dataDown(); + while (s && Util::bootMS() < reqTime + dataTimeout*1000){ // No data? Wait for a second or so. - if (!getSocket().spool()){ + if (!s.spool()){ if (progressCallback != 0){ if (!progressCallback()){ WARN_MSG("Download aborted by callback"); + s.close(); return false; } } - Util::sleep(250); + Util::sleep(25); continue; } + if (!preresponse){preresponse = Util::getMicros();} // Data! Check if we can parse it... - if (H.Read(getSocket())){ + if (H.Read(s)){ + uint64_t postresponse = Util::getMicros(); + HIGH_MSG("Post to %s completed in %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0); if (shouldContinue()){ if (maxRecursiveDepth == 0){ FAIL_MSG("Maximum recursion depth reached"); @@ -430,24 +475,32 @@ namespace HTTP{ return true; // Success! } // reset the data timeout - if (reqTime != Util::bootSecs()){ + if (reqTime+1000 < Util::bootMS()){ if (progressCallback != 0){ if (!progressCallback()){ - WARN_MSG("Download aborted by callback"); + uint64_t postresponse = Util::getMicros(); + WARN_MSG("Post to %s aborted by callback after %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0); + s.close(); return false; } } - reqTime = Util::bootSecs(); + if (s.dataDown() > lastOff + 25600){ + reqTime = Util::bootMS(); + lastOff = s.dataDown(); + } } } - if (getSocket()){ - FAIL_MSG("Timeout while retrieving %s", link.getUrl().c_str()); - getSocket().close(); - return false; + if (!preresponse){preresponse = Util::getMicros();} + if (s){ + uint64_t postresponse = Util::getMicros(); + FAIL_MSG("Post to %s timed out after %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0); + s.close(); + continue; } - Util::sleep(500); // wait a bit before retrying + uint64_t postresponse = Util::getMicros(); + WARN_MSG("Post to %s failed after %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0); + Util::sleep(100); // wait a bit before retrying } - FAIL_MSG("Could not retrieve %s", link.getUrl().c_str()); return false; } diff --git a/lib/downloader.h b/lib/downloader.h index 494a1b7f..04697f27 100644 --- a/lib/downloader.h +++ b/lib/downloader.h @@ -67,6 +67,7 @@ namespace HTTP{ HTTP::URL nbLink; uint8_t nbMaxRecursiveDepth; uint64_t nbReqTime; + uint64_t nbLastOff; }; }// namespace HTTP diff --git a/lib/http_parser.cpp b/lib/http_parser.cpp index f2aed4b0..373e0f77 100644 --- a/lib/http_parser.cpp +++ b/lib/http_parser.cpp @@ -186,7 +186,7 @@ void HTTP::Parser::sendRequest(Socket::Connection &conn, const void *reqbody, } builder += "\r\n"; if (reqbodyLen){ - builder += std::string((char *)reqbody, reqbodyLen); + if (reqbody){builder += std::string((char *)reqbody, reqbodyLen);} }else{ builder += body; } @@ -206,7 +206,7 @@ void HTTP::Parser::sendRequest(Socket::Connection &conn, const void *reqbody, } conn.SendNow("\r\n", 2); if (reqbodyLen){ - conn.SendNow((char *)reqbody, reqbodyLen); + if (reqbody){conn.SendNow((char *)reqbody, reqbodyLen);} }else{ conn.SendNow(body); } diff --git a/lib/socket.h b/lib/socket.h index d5935f77..9f06ca10 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -88,9 +88,7 @@ namespace Socket{ long long int conntime; Buffer downbuffer; ///< Stores temporary data coming in. int iread(void *buffer, int len, int flags = 0); ///< Incremental read call. - unsigned int iwrite(const void *buffer, int len); ///< Incremental write call. bool iread(Buffer &buffer, int flags = 0); ///< Incremental write call that is compatible with Socket::Buffer. - bool iwrite(std::string &buffer); ///< Write call that is compatible with std::string. void setBoundAddr(); protected: @@ -151,6 +149,9 @@ namespace Socket{ size_t len); ///< Will not buffer anything but always send right away. Blocks. void skipBytes(uint32_t byteCount); uint32_t skipCount; + // unbuffered i/o methods + unsigned int iwrite(const void *buffer, int len); ///< Incremental write call. + bool iwrite(std::string &buffer); ///< Write call that is compatible with std::string. // stats related methods unsigned int connTime(); ///< Returns the time this socket has been connected. uint64_t dataUp(); ///< Returns total amount of bytes sent.