Downloader improvements:

- Added metrics to Downloader::post (only at HIGH level for successful requests)
- Improved downloader timing
- Robustness improvements
This commit is contained in:
Thulinma 2020-08-30 01:20:06 +02:00
parent 36501a618e
commit f88a8fc51c
4 changed files with 96 additions and 41 deletions

View file

@ -168,17 +168,18 @@ namespace HTTP{
/// Do a HEAD request to download the HTTP headers only, returns true on success
bool Downloader::head(const HTTP::URL &link, uint8_t maxRecursiveDepth){
if (!canRequest(link)){return false;}
size_t loop = retryCount + 1; // max 5 attempts
while (--loop){// loop while we are unsuccessful
MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount);
size_t loop = 0;
while (++loop <= retryCount){// loop while we are unsuccessful
MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount);
doRequest(link, "HEAD");
if (!getSocket()){
FAIL_MSG("Could not retrieve %s: %s", link.getUrl().c_str(), getSocket().getError().c_str());
return false;
}
H.headerOnly = true;
uint64_t reqTime = Util::bootSecs();
while (getSocket() && Util::bootSecs() < reqTime + dataTimeout){
uint64_t reqTime = Util::bootMS();
uint64_t lastOff = getSocket().dataDown();
while (getSocket() && Util::bootMS() < reqTime + dataTimeout*1000){
// No data? Wait for a second or so.
if (!getSocket().spool()){
if (progressCallback != 0){
@ -188,7 +189,7 @@ namespace HTTP{
return false;
}
}
Util::sleep(250);
Util::sleep(25);
continue;
}
// Data! Check if we can parse it...
@ -213,35 +214,36 @@ namespace HTTP{
return true; // Success!
}
// reset the data timeout
if (reqTime != Util::bootSecs()){
if (reqTime+1000 < Util::bootMS()){
if (progressCallback != 0){
if (!progressCallback()){
WARN_MSG("Download aborted by callback");
H.headerOnly = false;
getSocket().close();
return false;
}
}
reqTime = Util::bootSecs();
if (getSocket().dataDown() > lastOff + 25600){
reqTime = Util::bootMS();
lastOff = getSocket().dataDown();
}
}
}
H.headerOnly = false;
if (getSocket()){
FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(),
retryCount - loop + 1, retryCount);
FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount);
H.Clean();
getSocket().close();
}else{
if (retryCount - loop + 1 > 2){
INFO_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(),
retryCount - loop + 1, retryCount);
if (loop > 1){
INFO_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount);
}else{
MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(),
retryCount - loop + 1, retryCount);
MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount);
}
H.Clean();
}
Util::sleep(500); // wait a bit before retrying
Util::sleep(100); // wait a bit before retrying
}
FAIL_MSG("Could not retrieve %s", link.getUrl().c_str());
return false;
@ -292,6 +294,7 @@ namespace HTTP{
isComplete = false;
doRequest(nbLink);
nbReqTime = Util::bootSecs();
nbLastOff = getSocket().dataDown();
return true;
}
@ -305,7 +308,7 @@ namespace HTTP{
if (nbLoop < 2){
FAIL_MSG("Exceeded retry limit while retrieving %s (%zu/%" PRIu32 ")",
nbLink.getUrl().c_str(), retryCount - nbLoop + 1, retryCount);
Util::sleep(1000);
Util::sleep(100);
return true;
}
nbLoop--;
@ -334,6 +337,7 @@ namespace HTTP{
return true;
}
nbReqTime = Util::bootSecs();
nbLastOff = getSocket().dataDown();
}
if (Util::bootSecs() >= nbReqTime + dataTimeout){
@ -380,7 +384,10 @@ namespace HTTP{
return true;
}
}
if (getSocket().dataDown() > nbLastOff + 25600){
nbReqTime = Util::bootSecs();
nbLastOff = getSocket().dataDown();
}
}
}
WARN_MSG("Invalid connection state for HTTP request");
@ -394,27 +401,65 @@ namespace HTTP{
bool Downloader::post(const HTTP::URL &link, const void *payload, const size_t payloadLen,
bool sync, uint8_t maxRecursiveDepth){
if (!canRequest(link)){return false;}
size_t loop = retryCount; // max 5 attempts
while (--loop){// loop while we are unsuccessful
MEDIUM_MSG("Posting to %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount);
doRequest(link, "POST", payload, payloadLen);
size_t loop = 0;
while (++loop <= retryCount){// loop while we are unsuccessful
MEDIUM_MSG("Posting to %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), loop, retryCount);
uint64_t prerequest = Util::getMicros();
doRequest(link, "POST", 0, payloadLen);
Socket::Connection & s = getSocket();
if (payloadLen && payload){
unsigned int payOff = 0;
uint64_t bodyActive = Util::bootMS();
unsigned int lastOff = 0;
bool wasBlocking = s.isBlocking();
s.setBlocking(false);
while (s && payOff < payloadLen){
unsigned int bytes = s.iwrite((char*)payload+payOff, payloadLen-payOff);
payOff += bytes;
if (!bytes){
Util::sleep(100);
}else{
if (payOff > lastOff + 25600){
bodyActive = Util::bootMS();
lastOff = payOff;
}
}
if (Util::bootMS() >= bodyActive + dataTimeout*1000){
uint64_t postrequest = Util::getMicros();
FAIL_MSG("Timeout during sending of POST body after %u bytes and %.2fms!", payOff, (postrequest-prerequest)/1000.0);
s.close();
}
}
s.setBlocking(wasBlocking);
}
if (!s){continue;}
uint64_t postrequest = Util::getMicros();
uint64_t preresponse = 0;
// Not synced? Ignore the response and immediately return true.
if (!sync){return true;}
uint64_t reqTime = Util::bootSecs();
while (getSocket() && Util::bootSecs() < reqTime + dataTimeout){
if (!sync){
VERYHIGH_MSG("Post to %s completed in %.2f ms", link.getUrl().c_str(), (postrequest-prerequest)/1000.0);
return true;
}
uint64_t reqTime = Util::bootMS();
uint64_t lastOff = getSocket().dataDown();
while (s && Util::bootMS() < reqTime + dataTimeout*1000){
// No data? Wait for a second or so.
if (!getSocket().spool()){
if (!s.spool()){
if (progressCallback != 0){
if (!progressCallback()){
WARN_MSG("Download aborted by callback");
s.close();
return false;
}
}
Util::sleep(250);
Util::sleep(25);
continue;
}
if (!preresponse){preresponse = Util::getMicros();}
// Data! Check if we can parse it...
if (H.Read(getSocket())){
if (H.Read(s)){
uint64_t postresponse = Util::getMicros();
HIGH_MSG("Post to %s completed in %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0);
if (shouldContinue()){
if (maxRecursiveDepth == 0){
FAIL_MSG("Maximum recursion depth reached");
@ -430,24 +475,32 @@ namespace HTTP{
return true; // Success!
}
// reset the data timeout
if (reqTime != Util::bootSecs()){
if (reqTime+1000 < Util::bootMS()){
if (progressCallback != 0){
if (!progressCallback()){
WARN_MSG("Download aborted by callback");
uint64_t postresponse = Util::getMicros();
WARN_MSG("Post to %s aborted by callback after %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0);
s.close();
return false;
}
}
reqTime = Util::bootSecs();
if (s.dataDown() > lastOff + 25600){
reqTime = Util::bootMS();
lastOff = s.dataDown();
}
}
if (getSocket()){
FAIL_MSG("Timeout while retrieving %s", link.getUrl().c_str());
getSocket().close();
return false;
}
Util::sleep(500); // wait a bit before retrying
if (!preresponse){preresponse = Util::getMicros();}
if (s){
uint64_t postresponse = Util::getMicros();
FAIL_MSG("Post to %s timed out after %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0);
s.close();
continue;
}
uint64_t postresponse = Util::getMicros();
WARN_MSG("Post to %s failed after %.2f ms (%.2f ms upload, %.2f ms wait, %.2f ms download)", link.getUrl().c_str(), (postresponse-prerequest)/1000.0, (postrequest-prerequest)/1000.0, (preresponse-postrequest)/1000.0, (postresponse-preresponse)/1000.0);
Util::sleep(100); // wait a bit before retrying
}
FAIL_MSG("Could not retrieve %s", link.getUrl().c_str());
return false;
}

View file

@ -67,6 +67,7 @@ namespace HTTP{
HTTP::URL nbLink;
uint8_t nbMaxRecursiveDepth;
uint64_t nbReqTime;
uint64_t nbLastOff;
};
}// namespace HTTP

View file

@ -186,7 +186,7 @@ void HTTP::Parser::sendRequest(Socket::Connection &conn, const void *reqbody,
}
builder += "\r\n";
if (reqbodyLen){
builder += std::string((char *)reqbody, reqbodyLen);
if (reqbody){builder += std::string((char *)reqbody, reqbodyLen);}
}else{
builder += body;
}
@ -206,7 +206,7 @@ void HTTP::Parser::sendRequest(Socket::Connection &conn, const void *reqbody,
}
conn.SendNow("\r\n", 2);
if (reqbodyLen){
conn.SendNow((char *)reqbody, reqbodyLen);
if (reqbody){conn.SendNow((char *)reqbody, reqbodyLen);}
}else{
conn.SendNow(body);
}

View file

@ -88,9 +88,7 @@ namespace Socket{
long long int conntime;
Buffer downbuffer; ///< Stores temporary data coming in.
int iread(void *buffer, int len, int flags = 0); ///< Incremental read call.
unsigned int iwrite(const void *buffer, int len); ///< Incremental write call.
bool iread(Buffer &buffer, int flags = 0); ///< Incremental write call that is compatible with Socket::Buffer.
bool iwrite(std::string &buffer); ///< Write call that is compatible with std::string.
void setBoundAddr();
protected:
@ -151,6 +149,9 @@ namespace Socket{
size_t len); ///< Will not buffer anything but always send right away. Blocks.
void skipBytes(uint32_t byteCount);
uint32_t skipCount;
// unbuffered i/o methods
unsigned int iwrite(const void *buffer, int len); ///< Incremental write call.
bool iwrite(std::string &buffer); ///< Write call that is compatible with std::string.
// stats related methods
unsigned int connTime(); ///< Returns the time this socket has been connected.
uint64_t dataUp(); ///< Returns total amount of bytes sent.