From b4fb0ff4a2fd15850606095725249541182c853a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 9 Mar 2020 19:26:21 +0100 Subject: [PATCH 1/8] Socket library error handling --- lib/socket.cpp | 64 +++++++++++++++++++++++++++++++++----------------- lib/socket.h | 1 + 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/lib/socket.cpp b/lib/socket.cpp index 55bb596a..bd8a5275 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -79,14 +79,23 @@ bool Socket::isLocal(const std::string &remotehost){ tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); INSANE_MSG("Comparing '%s' to '%s'", remotehost.c_str(), addressBuffer); - if (remotehost == addressBuffer){ret = true; break;} + if (remotehost == addressBuffer){ + ret = true; + break; + } INSANE_MSG("Comparing '%s' to '::ffff:%s'", remotehost.c_str(), addressBuffer); - if (remotehost == std::string("::ffff:") + addressBuffer){ret = true; break;} + if (remotehost == std::string("::ffff:") + addressBuffer){ + ret = true; + break; + } }else if (ifa->ifa_addr->sa_family == AF_INET6){// check it is IP6 tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); INSANE_MSG("Comparing '%s' to '%s'", remotehost.c_str(), addressBuffer); - if (remotehost == addressBuffer){ret = true; break;} + if (remotehost == addressBuffer){ + ret = true; + break; + } } } if (ifAddrStruct != NULL) freeifaddrs(ifAddrStruct); @@ -654,7 +663,7 @@ int Socket::Connection::getPureSocket(){ /// Only reports errors if an error actually occured - returns the host address or empty string /// otherwise. std::string Socket::Connection::getError(){ - return remotehost; + return lastErr; } /// Create a new Unix Socket. This socket will (try to) connect to the given address right away. @@ -673,8 +682,8 @@ void Socket::Connection::open(std::string address, bool nonblock){ isTrueSocket = true; sSend = socket(PF_UNIX, SOCK_STREAM, 0); if (sSend < 0){ - remotehost = strerror(errno); - FAIL_MSG("Could not create socket! Error: %s", remotehost.c_str()); + lastErr = strerror(errno); + FAIL_MSG("Could not create socket! Error: %s", lastErr.c_str()); return; } sockaddr_un addr; @@ -688,8 +697,8 @@ void Socket::Connection::open(std::string address, bool nonblock){ fcntl(sSend, F_SETFL, flags); } }else{ - remotehost = strerror(errno); - FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), remotehost.c_str()); + lastErr = strerror(errno); + FAIL_MSG("Could not connect to %s! Error: %s", address.c_str(), lastErr.c_str()); close(); } } @@ -732,6 +741,7 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi mbedtls_entropy_init(entropy); DONTEVEN_MSG("SSL init"); if (mbedtls_ctr_drbg_seed(ctr_drbg, mbedtls_entropy_func, entropy, (const unsigned char *)"meow", 4) != 0){ + lastErr = "SSL socket init failed"; FAIL_MSG("SSL socket init failed"); close(); return; @@ -740,12 +750,14 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi int ret = 0; if ((ret = mbedtls_net_connect(server_fd, host.c_str(), JSON::Value(port).asString().c_str(), MBEDTLS_NET_PROTO_TCP)) != 0){ + lastErr = "mbedtls_net_connect failed"; FAIL_MSG(" failed\n ! mbedtls_net_connect returned %d\n\n", ret); close(); return; } if ((ret = mbedtls_ssl_config_defaults(conf, MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM, MBEDTLS_SSL_PRESET_DEFAULT)) != 0){ + lastErr = "mbedtls_ssl_config_defaults failed"; FAIL_MSG(" failed\n ! mbedtls_ssl_config_defaults returned %d\n\n", ret); close(); return; @@ -756,7 +768,8 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi if ((ret = mbedtls_ssl_setup(ssl, conf)) != 0){ char estr[200]; mbedtls_strerror(ret, estr, 200); - FAIL_MSG("SSL setup error %d: %s", ret, estr); + lastErr = estr; + FAIL_MSG("SSL setup error %d: %s", ret, lastErr.c_str()); close(); return; } @@ -770,7 +783,8 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi if (ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE){ char estr[200]; mbedtls_strerror(ret, estr, 200); - FAIL_MSG("SSL handshake error %d: %s", ret, estr); + lastErr = estr; + FAIL_MSG("SSL handshake error %d: %s", ret, lastErr.c_str()); close(); return; } @@ -795,12 +809,13 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi hints.ai_flags = AI_ADDRCONFIG; int s = getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &result); if (s != 0){ - FAIL_MSG("Could not connect to %s:%i! Error: %s", host.c_str(), port, gai_strerror(s)); + lastErr = gai_strerror(s); + FAIL_MSG("Could not connect to %s:%i! Error: %s", host.c_str(), port, lastErr.c_str()); close(); return; } - remotehost = ""; + lastErr = ""; for (rp = result; rp != NULL; rp = rp->ai_next){ sSend = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sSend < 0){continue;} @@ -808,13 +823,13 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi remoteaddr = *((sockaddr_in6 *)rp->ai_addr); break; } - remotehost += strerror(errno); + lastErr += strerror(errno); ::close(sSend); } freeaddrinfo(result); if (rp == 0){ - FAIL_MSG("Could not connect to %s! Error: %s", host.c_str(), remotehost.c_str()); + FAIL_MSG("Could not connect to %s! Error: %s", host.c_str(), lastErr.c_str()); close(); }else{ if (nonblock){ @@ -923,7 +938,8 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ if (r < 0){ char estr[200]; mbedtls_strerror(r, estr, 200); - INFO_MSG("Write returns %d: %s", r, estr); + lastErr = estr; + INFO_MSG("Write returns %d: %s", r, lastErr.c_str()); } if (r < 0){ switch (errno){ @@ -932,7 +948,8 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ case EWOULDBLOCK: return 0; break; default: Error = true; - INSANE_MSG("Could not iwrite data! Error: %s", strerror(errno)); + lastErr = strerror(errno); + INSANE_MSG("Could not iwrite data! Error: %s", lastErr.c_str()); close(); return 0; break; @@ -970,7 +987,8 @@ unsigned int Socket::Connection::iwrite(const void *buffer, int len){ case EWOULDBLOCK: return 0; break; default: Error = true; - INSANE_MSG("Could not iwrite data! Error: %s", strerror(errno)); + lastErr = strerror(errno); + INSANE_MSG("Could not iwrite data! Error: %s", lastErr.c_str()); close(); return 0; break; @@ -1012,7 +1030,8 @@ int Socket::Connection::iread(void *buffer, int len, int flags){ Error = true; char estr[200]; mbedtls_strerror(r, estr, 200); - INFO_MSG("Read returns %d: %s (%s)", r, estr, strerror(errno)); + lastErr = estr; + INFO_MSG("Read returns %d: %s (%s)", r, estr, lastErr.c_str()); close(); return 0; break; @@ -1039,7 +1058,8 @@ int Socket::Connection::iread(void *buffer, int len, int flags){ case EINTR: return 0; break; default: Error = true; - INSANE_MSG("Could not iread data! Error: %s", strerror(errno)); + lastErr = strerror(errno); + INSANE_MSG("Could not iread data! Error: %s", lastErr.c_str()); close(); return 0; break; @@ -1149,6 +1169,7 @@ Socket::Connection::Connection(const Connection &rhs){ remotehost = rhs.remotehost; boundaddr = rhs.boundaddr; remoteaddr = rhs.remoteaddr; + lastErr = rhs.lastErr; up = rhs.up; down = rhs.down; downbuffer = rhs.downbuffer; @@ -1182,6 +1203,7 @@ Socket::Connection &Socket::Connection::operator=(const Socket::Connection &rhs) remotehost = rhs.remotehost; boundaddr = rhs.boundaddr; remoteaddr = rhs.remoteaddr; + lastErr = rhs.lastErr; up = rhs.up; down = rhs.down; downbuffer = rhs.downbuffer; @@ -1776,9 +1798,7 @@ uint16_t Socket::UDPConnection::bind(int port, std::string iface, const std::str // multicast has a "1110" bit prefix multicast = (((char *)&(addr4->sin_addr))[0] & 0xF0) == 0xE0; #ifdef __CYGWIN__ - if (multicast){ - ((sockaddr_in*)rp->ai_addr)->sin_addr.s_addr = htonl(INADDR_ANY); - } + if (multicast){((sockaddr_in *)rp->ai_addr)->sin_addr.s_addr = htonl(INADDR_ANY);} #endif } if (multicast){ diff --git a/lib/socket.h b/lib/socket.h index 34f4bc4d..786f5455 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -92,6 +92,7 @@ namespace Socket{ #ifdef SSL /// optional extension that uses mbedtls for SSL protected: + std::string lastErr; ///< Stores last error, if any. bool sslConnected; int ssl_iread(void *buffer, int len, int flags = 0); ///< Incremental read call. unsigned int ssl_iwrite(const void *buffer, int len); ///< Incremental write call. From 1d0e68c5a4787f7aff63baa2fe6e7577c5068789 Mon Sep 17 00:00:00 2001 From: Ramoe Date: Thu, 20 Feb 2020 10:34:46 +0100 Subject: [PATCH 2/8] Fixed MP4 output not setting Content-Length for HEAD/OPTIONS requests --- src/output/output_progressive_mp4.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp index 96ce9ba0..98617509 100644 --- a/src/output/output_progressive_mp4.cpp +++ b/src/output/output_progressive_mp4.cpp @@ -478,6 +478,13 @@ namespace Mist { } H.SetHeader("Content-Type", "video/MP4"); H.SetHeader("Accept-Ranges", "bytes, parsec"); + + if(!myMeta.live){ + fileSize = 0; + uint64_t headerSize = mp4HeaderSize(fileSize, myMeta.live); + H.SetHeader("Content-Length", fileSize); + } + H.SendResponse("200", "OK", myConn); return; } From 4ed5dd21e38bbea68e5dd344334f8d1bf4e49ee6 Mon Sep 17 00:00:00 2001 From: Ramoe Date: Thu, 20 Feb 2020 10:46:42 +0100 Subject: [PATCH 3/8] Added callback support to HTTP parser --- lib/http_parser.cpp | 128 ++++++++++++++++++++++++++++++++------------ lib/http_parser.h | 10 ++-- lib/util.cpp | 2 + lib/util.h | 10 ++++ 4 files changed, 111 insertions(+), 39 deletions(-) diff --git a/lib/http_parser.cpp b/lib/http_parser.cpp index 283b6c67..ab459b23 100644 --- a/lib/http_parser.cpp +++ b/lib/http_parser.cpp @@ -1,19 +1,20 @@ /// \file http_parser.cpp /// Holds all code for the HTTP namespace. -#include "http_parser.h" -#include "util.h" #include "auth.h" #include "defines.h" #include "encode.h" +#include "http_parser.h" #include "timing.h" #include "url.h" +#include "util.h" #include /// This constructor creates an empty HTTP::Parser, ready for use for either reading or writing. /// All this constructor does is call HTTP::Parser::Clean(). HTTP::Parser::Parser(){ headerOnly = false; + bodyCallback = 0; Clean(); std::stringstream nStr; nStr << std::hex << std::setw(16) << std::setfill('0') << (uint64_t)(Util::bootMS()); @@ -267,8 +268,7 @@ void HTTP::Parser::SendResponse(std::string code, std::string message, Socket::C void HTTP::Parser::StartResponse(std::string code, std::string message, HTTP::Parser &request, Socket::Connection &conn, bool bufferAllChunks){ std::string prot = request.protocol; - sendingChunks = - (!bufferAllChunks && protocol == "HTTP/1.1" && request.GetHeader("Connection") != "close"); + sendingChunks = (!bufferAllChunks && protocol == "HTTP/1.1" && request.GetHeader("Connection") != "close"); CleanPreserveHeaders(); protocol = prot; if (sendingChunks){ @@ -286,8 +286,7 @@ void HTTP::Parser::StartResponse(std::string code, std::string message, HTTP::Pa /// a zero-content-length HTTP/1.0 response. This call simply calls StartResponse("200", "OK", /// request, conn) \param request The HTTP request to respond to. \param conn The connection to send /// over. -void HTTP::Parser::StartResponse(HTTP::Parser &request, Socket::Connection &conn, - bool bufferAllChunks){ +void HTTP::Parser::StartResponse(HTTP::Parser &request, Socket::Connection &conn, bool bufferAllChunks){ StartResponse("200", "OK", request, conn, bufferAllChunks); } @@ -300,8 +299,7 @@ void HTTP::Parser::Proxy(Socket::Connection &from, Socket::Connection &to){ if (getChunks){ unsigned int proxyingChunk = 0; while (to.connected() && from.connected()){ - if ((from.Received().size() && - (from.Received().size() > 1 || *(from.Received().get().rbegin()) == '\n')) || + if ((from.Received().size() && (from.Received().size() > 1 || *(from.Received().get().rbegin()) == '\n')) || from.spool()){ if (proxyingChunk){ while (proxyingChunk && from.Received().size()){ @@ -483,26 +481,30 @@ void HTTP::Parser::SetVar(std::string i, std::string v){ /// If a whole request could be read, it is removed from the front of the socket buffer and true /// returned. If not, as much as can be interpreted is removed and false returned. \param conn The /// socket to read from. \return True if a whole request or response was read, false otherwise. -bool HTTP::Parser::Read(Socket::Connection &conn){ - // Make sure the received data ends in a newline (\n). - while ((!seenHeaders || (getChunks && !doingChunk)) && conn.Received().get().size() && - *(conn.Received().get().rbegin()) != '\n'){ - if (conn.Received().size() > 1){ - // make a copy of the first part - std::string tmp = conn.Received().get(); - // clear the first part, wiping it from the partlist - conn.Received().get().clear(); - conn.Received().size(); - // take the now first (was second) part, insert the stored part in front of it - conn.Received().get().insert(0, tmp); - }else{ - return false; +bool HTTP::Parser::Read(Socket::Connection &conn, Util::DataCallback &cb){ + while (conn.Received().size()){ + // Make sure the received data ends in a newline (\n). + while ((!seenHeaders || (getChunks && !doingChunk)) && conn.Received().get().size() && + *(conn.Received().get().rbegin()) != '\n'){ + if (conn.Received().size() > 1){ + // make a copy of the first part + std::string tmp = conn.Received().get(); + // clear the first part, wiping it from the partlist + conn.Received().get().clear(); + conn.Received().size(); + // take the now first (was second) part, insert the stored part in front of it + conn.Received().get().insert(0, tmp); + }else{ + return false; + } + } + + // return true if a parse succeeds, and is not a request + if (parse(conn.Received().get(), cb) && (!JSON::Value(url).isInt() || headerOnly || length || + getChunks || (!conn && !conn.Received().size()))){ + return true; } } - // if a parse succeeds, simply return true - if (parse(conn.Received().get())){return true;} - // otherwise, if we have parts left, call ourselves recursively - if (conn.Received().size()){return Read(conn);} return false; }// HTTPReader::Read @@ -526,7 +528,7 @@ uint8_t HTTP::Parser::getPercentage() const{ /// from the data buffer. /// \param HTTPbuffer The data buffer to read from. /// \return True on success, false otherwise. -bool HTTP::Parser::parse(std::string &HTTPbuffer){ +bool HTTP::Parser::parse(std::string &HTTPbuffer, Util::DataCallback &cb){ size_t f; std::string tmpA, tmpB, tmpC; /// \todo Make this not resize HTTPbuffer in parts, but read all at once and then remove the @@ -588,11 +590,15 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer){ body.clear(); if (GetHeader("Content-Length") != ""){ length = atoi(GetHeader("Content-Length").c_str()); - if (body.capacity() < length){body.reserve(length);} + if (!bodyCallback && (&cb == &Util::defaultDataCallback) && body.capacity() < length){ + body.reserve(length); + } } if (GetHeader("Content-length") != ""){ length = atoi(GetHeader("Content-length").c_str()); - if (body.capacity() < length){body.reserve(length);} + if (!bodyCallback && (&cb == &Util::defaultDataCallback) && body.capacity() < length){ + body.reserve(length); + } } if (GetHeader("Transfer-Encoding") == "chunked"){ getChunks = true; @@ -611,12 +617,33 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer){ if (length > 0){ if (headerOnly){return true;} unsigned int toappend = length - body.length(); + + // limit the amount of bytes that will be appended to the amount there + // is available + if (toappend > HTTPbuffer.size()){toappend = HTTPbuffer.size();} + if (toappend > 0){ - body.append(HTTPbuffer, 0, toappend); + bool shouldAppend = true; + // check if pointer callback function is set and run callback. remove partial data from buffer + if (bodyCallback){ + bodyCallback(HTTPbuffer.data(), toappend); + length -= toappend; + shouldAppend = false; + } + + // check if reference callback function is set and run callback. remove partial data from buffer + if (&cb != &Util::defaultDataCallback){ + cb.dataCallback(HTTPbuffer.data(), toappend); + length -= toappend; + shouldAppend = false; + } + + if (shouldAppend){body.append(HTTPbuffer, 0, toappend);} HTTPbuffer.erase(0, toappend); + currentLength += toappend; } if (length == body.length()){ - //parse POST variables + // parse POST variables if (method == "POST"){parseVars(body, vars);} return true; }else{ @@ -624,16 +651,32 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer){ } }else{ if (getChunks){ + + // toappend + currentLength += HTTPbuffer.size(); + if (headerOnly){return true;} if (doingChunk){ unsigned int toappend = HTTPbuffer.size(); if (toappend > doingChunk){toappend = doingChunk;} - body.append(HTTPbuffer, 0, toappend); + + bool shouldAppend = true; + if (bodyCallback){ + bodyCallback(HTTPbuffer.data(), toappend); + shouldAppend = false; + } + + if (&cb != &Util::defaultDataCallback){ + cb.dataCallback(HTTPbuffer.data(), toappend); + shouldAppend = false; + } + + if (shouldAppend){body.append(HTTPbuffer, 0, toappend);} HTTPbuffer.erase(0, toappend); doingChunk -= toappend; }else{ f = HTTPbuffer.find('\n'); - if (f == std::string::npos) return false; + if (f == std::string::npos){return false;} tmpA = HTTPbuffer.substr(0, f); while (tmpA.find('\r') != std::string::npos){tmpA.erase(tmpA.find('\r'));} unsigned int chunkLen = 0; @@ -655,7 +698,23 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer){ } return false; }else{ - return true; + unsigned int toappend = HTTPbuffer.size(); + bool shouldAppend = true; + if (bodyCallback){ + bodyCallback(HTTPbuffer.data(), toappend); + shouldAppend = false; + } + + if (&cb != &Util::defaultDataCallback){ + cb.dataCallback(HTTPbuffer.data(), toappend); + shouldAppend = false; + } + + if (shouldAppend){body.append(HTTPbuffer, 0, toappend);} + HTTPbuffer.erase(0, toappend); + + // return false when callbacks are used. + return shouldAppend; } } } @@ -743,4 +802,3 @@ void HTTP::Parser::Chunkify(const char *data, unsigned int size, Socket::Connect } } } - diff --git a/lib/http_parser.h b/lib/http_parser.h index 302437cb..b35ee08d 100644 --- a/lib/http_parser.h +++ b/lib/http_parser.h @@ -3,6 +3,7 @@ #pragma once #include "socket.h" +#include "util.h" #include #include #include @@ -16,10 +17,10 @@ namespace HTTP{ void parseVars(const std::string &data, std::map &storage); /// Simple class for reading and writing HTTP 1.0 and 1.1. - class Parser{ + class Parser : public Util::DataCallback{ public: Parser(); - bool Read(Socket::Connection &conn); + bool Read(Socket::Connection &conn, Util::DataCallback &cb = Util::defaultDataCallback); bool Read(std::string &strbuf); const std::string &GetHeader(const std::string &i) const; bool hasHeader(const std::string &i) const; @@ -54,10 +55,12 @@ namespace HTTP{ std::string url; std::string protocol; unsigned int length; + unsigned int currentLength; bool headerOnly; ///< If true, do not parse body if the length is a known size. bool bufferChunks; // this bool was private bool sendingChunks; + void (*bodyCallback)(const char *, size_t); private: std::string cnonce; @@ -65,7 +68,7 @@ namespace HTTP{ bool seenReq; bool getChunks; unsigned int doingChunk; - bool parse(std::string &HTTPbuffer); + bool parse(std::string &HTTPbuffer, Util::DataCallback &cb = Util::defaultDataCallback); std::string builder; std::string read_buffer; std::map headers; @@ -74,4 +77,3 @@ namespace HTTP{ }; }// namespace HTTP - diff --git a/lib/util.cpp b/lib/util.cpp index 7c0b5490..aec2668a 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -29,6 +29,8 @@ #define RAX_REQDFIELDS_LEN 36 namespace Util{ + Util::DataCallback defaultDataCallback; + /// Helper function that cross-platform checks if a given directory exists. bool isDirectory(const std::string &path){ #if defined(_WIN32) diff --git a/lib/util.h b/lib/util.h index 4d49d7a2..08d81aaf 100644 --- a/lib/util.h +++ b/lib/util.h @@ -6,6 +6,7 @@ #include #include #include +#include "defines.h" namespace Util{ bool isDirectory(const std::string &path); @@ -18,6 +19,15 @@ namespace Util{ uint64_t ftell(FILE *stream); uint64_t fseek(FILE *stream, uint64_t offset, int whence); + class DataCallback{ + public: + virtual void dataCallback(const char * ptr, size_t size){ + INFO_MSG("default callback, size: %llu", size); + } + }; + + extern Util::DataCallback defaultDataCallback; + //Forward declaration class FieldAccX; From c1c35c75f9a4865e3f266c0e2f7b2c305dcc15ea Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 9 Mar 2020 19:26:03 +0100 Subject: [PATCH 4/8] HTTP::Parser: Content length fix, made ::GetHeader and ::hasHeader case-insensitive, preferring identical case if it is present. --- lib/http_parser.cpp | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/http_parser.cpp b/lib/http_parser.cpp index ab459b23..8c6a79c4 100644 --- a/lib/http_parser.cpp +++ b/lib/http_parser.cpp @@ -8,6 +8,7 @@ #include "timing.h" #include "url.h" #include "util.h" +#include #include /// This constructor creates an empty HTTP::Parser, ready for use for either reading or writing. @@ -412,17 +413,28 @@ std::string HTTP::Parser::getUrl(){ /// Returns header i, if set. const std::string &HTTP::Parser::GetHeader(const std::string &i) const{ - if (headers.count(i)){ - return headers.at(i); - }else{ - static const std::string empty; - return empty; + if (headers.count(i)){return headers.at(i);} + for (std::map::const_iterator it = headers.begin(); it != headers.end(); ++it){ + if (it->first.length() != i.length()){continue;} + if (strncasecmp(it->first.c_str(), i.c_str(), i.length()) == 0){ + return it->second; + } } + //Return empty string if not found + static const std::string empty; + return empty; } /// Returns header i, if set. bool HTTP::Parser::hasHeader(const std::string &i) const{ - return headers.count(i); + if (headers.count(i)){return true;} + for (std::map::const_iterator it = headers.begin(); it != headers.end(); ++it){ + if (it->first.length() != i.length()){continue;} + if (strncasecmp(it->first.c_str(), i.c_str(), i.length()) == 0){ + return true; + } + } + return false; } /// Returns POST variable i, if set. @@ -594,12 +606,6 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer, Util::DataCallback &cb){ body.reserve(length); } } - if (GetHeader("Content-length") != ""){ - length = atoi(GetHeader("Content-length").c_str()); - if (!bodyCallback && (&cb == &Util::defaultDataCallback) && body.capacity() < length){ - body.reserve(length); - } - } if (GetHeader("Transfer-Encoding") == "chunked"){ getChunks = true; doingChunk = 0; @@ -713,8 +719,8 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer, Util::DataCallback &cb){ if (shouldAppend){body.append(HTTPbuffer, 0, toappend);} HTTPbuffer.erase(0, toappend); - // return false when callbacks are used. - return shouldAppend; + // return true if there is no body, otherwise we only stop when the connection is dropped + return true; } } } From 3fd55b16476132cd18974594e7f8b3aec2d4ef89 Mon Sep 17 00:00:00 2001 From: Ramoe Date: Thu, 20 Feb 2020 10:47:17 +0100 Subject: [PATCH 5/8] Added callbacks, range requests and head requests to Downloader --- lib/downloader.cpp | 193 +++++++++++++++++++++++++++++++++++++++------ lib/downloader.h | 22 +++++- 2 files changed, 187 insertions(+), 28 deletions(-) diff --git a/lib/downloader.cpp b/lib/downloader.cpp index 32b2a4b3..8955c385 100644 --- a/lib/downloader.cpp +++ b/lib/downloader.cpp @@ -43,9 +43,9 @@ namespace HTTP{ } /// Simply turns link into a HTTP::URL and calls get(const HTTP::URL&) - bool Downloader::get(const std::string &link){ + bool Downloader::get(const std::string &link, Util::DataCallback &cb){ HTTP::URL uri(link); - return get(uri); + return get(uri, 6, cb); } /// Sets an extra (or overridden) header to be sent with outgoing requests. @@ -60,24 +60,18 @@ namespace HTTP{ Parser &Downloader::getHTTP(){return H;} /// Returns a reference to the internal Socket::Connection class instance. - Socket::Connection &Downloader::getSocket(){ - return S; - } + Socket::Connection &Downloader::getSocket(){return S;} - Downloader::~Downloader(){ - S.close(); - } + Downloader::~Downloader(){S.close();} /// Sends a request for the given URL, does no waiting. - void Downloader::doRequest(const HTTP::URL &link, const std::string &method, - const std::string &body){ + void Downloader::doRequest(const HTTP::URL &link, const std::string &method, const std::string &body){ if (!canRequest(link)){return;} bool needSSL = (link.protocol == "https"); H.Clean(); // Reconnect if needed if (!proxied || needSSL){ - if (!getSocket() || link.host != connectedHost || link.getPort() != connectedPort || - needSSL != ssl){ + if (!getSocket() || link.host != connectedHost || link.getPort() != connectedPort || needSSL != ssl){ getSocket().close(); connectedHost = link.host; connectedPort = link.getPort(); @@ -92,8 +86,7 @@ namespace HTTP{ #endif } }else{ - if (!getSocket() || proxyUrl.host != connectedHost || proxyUrl.getPort() != connectedPort || - needSSL != ssl){ + if (!getSocket() || proxyUrl.host != connectedHost || proxyUrl.getPort() != connectedPort || needSSL != ssl){ getSocket().close(); connectedHost = proxyUrl.host; connectedPort = proxyUrl.getPort(); @@ -120,6 +113,7 @@ namespace HTTP{ H.SetHeader("Host", link.host); } } + if (method.size()){H.method = method;} H.SetHeader("User-Agent", "MistServer " PACKAGE_VERSION); H.SetHeader("X-Version", PACKAGE_VERSION); @@ -136,19 +130,19 @@ namespace HTTP{ H.SetHeader(it->first, it->second); } } + H.SendRequest(getSocket(), body); H.Clean(); } - /// Downloads the given URL into 'H', returns true on success. - /// Makes at most 5 attempts, and will wait no longer than 5 seconds without receiving data. - bool Downloader::get(const HTTP::URL &link, uint8_t maxRecursiveDepth){ + /// Do a HEAD request to download the HTTP headers only, returns true on success + bool Downloader::head(const HTTP::URL &link, uint8_t maxRecursiveDepth){ if (!canRequest(link)){return false;} size_t loop = retryCount + 1; // max 5 attempts while (--loop){// loop while we are unsuccessful - MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, - retryCount); - doRequest(link); + MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); + doRequest(link, "HEAD"); + H.headerOnly = true; uint64_t reqTime = Util::bootSecs(); while (getSocket() && Util::bootSecs() < reqTime + dataTimeout){ // No data? Wait for a second or so. @@ -156,6 +150,7 @@ namespace HTTP{ if (progressCallback != 0){ if (!progressCallback()){ WARN_MSG("Download aborted by callback"); + H.headerOnly = false; return false; } } @@ -164,6 +159,7 @@ namespace HTTP{ } // Data! Check if we can parse it... if (H.Read(getSocket())){ + H.headerOnly = false; if (shouldContinue()){ if (maxRecursiveDepth == 0){ FAIL_MSG("Maximum recursion depth reached"); @@ -171,11 +167,16 @@ namespace HTTP{ } if (!canContinue(link)){return false;} if (getStatusCode() >= 300 && getStatusCode() < 400){ - return get(link.link(getHeader("Location")), --maxRecursiveDepth); + return head(link.link(getHeader("Location")), --maxRecursiveDepth); }else{ - return get(link, --maxRecursiveDepth); + return head(link, --maxRecursiveDepth); } } + + if(H.protocol == "HTTP/1.0"){ + getSocket().close(); + } + return true; // Success! } // reset the data timeout @@ -183,12 +184,15 @@ namespace HTTP{ if (progressCallback != 0){ if (!progressCallback()){ WARN_MSG("Download aborted by callback"); + H.headerOnly = false; return false; } } reqTime = Util::bootSecs(); } } + H.headerOnly = false; + if (getSocket()){ FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); @@ -206,13 +210,150 @@ namespace HTTP{ return false; } - bool Downloader::post(const HTTP::URL &link, const std::string &payload, bool sync, - uint8_t maxRecursiveDepth){ + bool Downloader::getRangeNonBlocking(const HTTP::URL &link, size_t byteStart, size_t byteEnd, Util::DataCallback &cb){ + char tmp[32]; + if (byteEnd <= 0){// get range from byteStart til eof + sprintf(tmp, "bytes=%llu-", byteStart); + }else{ + sprintf(tmp, "bytes=%llu-%llu", byteStart, byteEnd - 1); + } + setHeader("Range", tmp); + return getNonBlocking(link, 6); + } + + bool Downloader::getRange(const HTTP::URL &link, size_t byteStart, size_t byteEnd, Util::DataCallback &cb){ + char tmp[32]; + if (byteEnd <= 0){// get range from byteStart til eof + sprintf(tmp, "bytes=%llu-", byteStart); + }else{ + sprintf(tmp, "bytes=%llu-%llu", byteStart, byteEnd - 1); + } + setHeader("Range", tmp); + return get(link, 6, cb); + } + + /// Downloads the given URL into 'H', returns true on success. + /// Makes at most 5 attempts, and will wait no longer than 5 seconds without receiving data. + bool Downloader::get(const HTTP::URL &link, uint8_t maxRecursiveDepth, Util::DataCallback &cb){ + if (!getNonBlocking(link, maxRecursiveDepth)){return false;} + + while (!continueNonBlocking(cb)){Util::sleep(100);} + + if (isComplete){return true;} + + FAIL_MSG("Could not retrieve %s", link.getUrl().c_str()); + return false; + } + + // prepare a request to be handled in a nonblocking fashion by the continueNonbBocking() + bool Downloader::getNonBlocking(const HTTP::URL &link, uint8_t maxRecursiveDepth){ + if (!canRequest(link)){return false;} + nbLink = link; + nbMaxRecursiveDepth = maxRecursiveDepth; + nbLoop = retryCount + 1; // max 5 attempts + isComplete = false; + doRequest(nbLink); + nbReqTime = Util::bootSecs(); + return true; + } + + // continue handling a request, origininally set up by the getNonBlocking() function + // returns true if the request is complete + bool Downloader::continueNonBlocking(Util::DataCallback &cb){ + while (true){ + if (!getSocket() && !isComplete){ + if (nbLoop < 2){ + FAIL_MSG("Exceeded retry limit while retrieving %s (%zu/%" PRIu32 ")", + nbLink.getUrl().c_str(), retryCount - nbLoop + 1, retryCount); + Util::sleep(1000); + return true; + } + nbLoop--; + if (nbLoop == retryCount){ + MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", nbLink.getUrl().c_str(), + retryCount - nbLoop + 1, retryCount); + }else{ + if (retryCount - nbLoop + 1 > 2){ + INFO_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", + nbLink.getUrl().c_str(), retryCount - nbLoop + 1, retryCount); + }else{ + MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", + nbLink.getUrl().c_str(), retryCount - nbLoop + 1, retryCount); + } + } + + if (H.hasHeader("Accept-Ranges") && getHeader("Accept-Ranges").size() > 0){ + INFO_MSG("new request? range! len: %llu, currlength: %llu", H.length, H.currentLength); + getRangeNonBlocking(nbLink, H.currentLength, 0, cb); + return true; + }else{ + doRequest(nbLink); + } + + if (!getSocket()){ + WARN_MSG("Aborting download: could not open connection"); + return true; + } + nbReqTime = Util::bootSecs(); + } + + if (Util::bootSecs() >= nbReqTime + dataTimeout){ + FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", nbLink.getUrl().c_str(), + retryCount - nbLoop + 1, retryCount); + getSocket().close(); + return false; // because we may have retries left + } + + // No data? Wait for a second or so. + if (!getSocket().spool()){ + if (progressCallback != 0){ + if (!progressCallback()){ + WARN_MSG("Download aborted by callback"); + return true; + } + } + return false; + } + // Data! Check if we can parse it... + if (H.Read(getSocket(), cb)){ + if (shouldContinue()){ + if (nbMaxRecursiveDepth == 0){ + FAIL_MSG("Maximum recursion depth reached"); + return true; + } + if (!canContinue(nbLink)){return false;} + --nbMaxRecursiveDepth; + if (getStatusCode() >= 300 && getStatusCode() < 400){ + doRequest(nbLink.link(getHeader("Location"))); + }else{ + doRequest(nbLink); + } + return false; + } + + isComplete = true; // Success + return true; + } + // reset the data timeout + if (nbReqTime != Util::bootSecs()){ + if (progressCallback != 0){ + if (!progressCallback()){ + WARN_MSG("Download aborted by callback"); + return true; + } + } + nbReqTime = Util::bootSecs(); + } + } + + return false; //we should never get here + } + + bool Downloader::post(const HTTP::URL &link, const std::string &payload, bool sync, uint8_t maxRecursiveDepth){ if (!canRequest(link)){return false;} size_t loop = retryCount; // max 5 attempts while (--loop){// loop while we are unsuccessful - MEDIUM_MSG("Posting to %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, - retryCount); + MEDIUM_MSG("Posting to %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); doRequest(link, "POST", payload); // Not synced? Ignore the response and immediately return true. if (!sync){return true;} diff --git a/lib/downloader.h b/lib/downloader.h index 35584b42..215c9208 100644 --- a/lib/downloader.h +++ b/lib/downloader.h @@ -1,6 +1,7 @@ #include "http_parser.h" #include "url.h" #include "socket.h" +#include "util.h" namespace HTTP{ class Downloader{ @@ -11,10 +12,17 @@ namespace HTTP{ const std::string &const_data() const; void doRequest(const HTTP::URL &link, const std::string &method = "", const std::string &body = ""); - bool get(const std::string &link); - bool get(const HTTP::URL &link, uint8_t maxRecursiveDepth = 6); + bool get(const std::string &link, Util::DataCallback &cb = Util::defaultDataCallback); + bool get(const HTTP::URL &link, uint8_t maxRecursiveDepth = 6, Util::DataCallback &cb = Util::defaultDataCallback); + bool head(const HTTP::URL &link,uint8_t maxRecursiveDepth = 6); + bool getRange(const HTTP::URL &link, size_t byteStart, size_t byteEnd, Util::DataCallback &cb = Util::defaultDataCallback); + bool getRangeNonBlocking(const HTTP::URL &link, size_t byteStart, size_t byteEnd, Util::DataCallback &cb = Util::defaultDataCallback); bool post(const HTTP::URL &link, const std::string &payload, bool sync = true, uint8_t maxRecursiveDepth = 6); + + bool getNonBlocking(const HTTP::URL &link, uint8_t maxRecursiveDepth = 6); + bool continueNonBlocking(Util::DataCallback &cb); + std::string getHeader(const std::string &headerName); std::string &getStatusText(); uint32_t getStatusCode(); @@ -27,12 +35,14 @@ namespace HTTP{ void setHeader(const std::string &name, const std::string &val); void clearHeaders(); bool canRequest(const HTTP::URL &link); + bool completed(){return isComplete;} Parser &getHTTP(); Socket::Connection &getSocket(); uint32_t retryCount, dataTimeout; bool isProxied() const; private: + bool isComplete; std::map extraHeaders; ///< Holds extra headers to sent with request std::string connectedHost; ///< Currently connected host name uint32_t connectedPort; ///< Currently connected port number @@ -43,6 +53,14 @@ namespace HTTP{ std::string proxyAuthStr; ///< Most recently seen Proxy-Authenticate request bool proxied; ///< True if proxy server is configured. HTTP::URL proxyUrl; ///< Set to the URL of the configured proxy. + size_t nbLoop; + HTTP::URL nbLink; + uint8_t nbMaxRecursiveDepth; + uint64_t nbReqTime; }; + + + + }// namespace HTTP From 7f8013fef28940ab980e61fe26c81daf72c31d86 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 9 Mar 2020 19:26:38 +0100 Subject: [PATCH 6/8] Downloader improvements, fixes --- lib/downloader.cpp | 33 ++++++++++++++++++++++----------- lib/downloader.h | 1 + 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/lib/downloader.cpp b/lib/downloader.cpp index 8955c385..592b60ef 100644 --- a/lib/downloader.cpp +++ b/lib/downloader.cpp @@ -95,6 +95,7 @@ namespace HTTP{ } ssl = needSSL; if (!getSocket()){ + H.method = S.getError(); return; // socket is closed } if (proxied && !ssl){ @@ -131,6 +132,7 @@ namespace HTTP{ } } + nbLink = link; H.SendRequest(getSocket(), body); H.Clean(); } @@ -142,6 +144,10 @@ namespace HTTP{ while (--loop){// loop while we are unsuccessful MEDIUM_MSG("Retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); doRequest(link, "HEAD"); + if (!getSocket()){ + FAIL_MSG("Could not retrieve %s: %s", link.getUrl().c_str(), getSocket().getError().c_str()); + return false; + } H.headerOnly = true; uint64_t reqTime = Util::bootSecs(); while (getSocket() && Util::bootSecs() < reqTime + dataTimeout){ @@ -177,6 +183,7 @@ namespace HTTP{ getSocket().close(); } + H.headerOnly = false; return true; // Success! } // reset the data timeout @@ -196,6 +203,7 @@ namespace HTTP{ if (getSocket()){ FAIL_MSG("Timeout while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); + H.Clean(); getSocket().close(); }else{ if (retryCount - loop + 1 > 2){ @@ -203,6 +211,7 @@ namespace HTTP{ }else{ MEDIUM_MSG("Lost connection while retrieving %s (%zu/%" PRIu32 ")", link.getUrl().c_str(), retryCount - loop + 1, retryCount); } + H.Clean(); } Util::sleep(500); // wait a bit before retrying } @@ -213,9 +222,9 @@ namespace HTTP{ bool Downloader::getRangeNonBlocking(const HTTP::URL &link, size_t byteStart, size_t byteEnd, Util::DataCallback &cb){ char tmp[32]; if (byteEnd <= 0){// get range from byteStart til eof - sprintf(tmp, "bytes=%llu-", byteStart); + sprintf(tmp, "bytes=%zu-", byteStart); }else{ - sprintf(tmp, "bytes=%llu-%llu", byteStart, byteEnd - 1); + sprintf(tmp, "bytes=%zu-%zu", byteStart, byteEnd - 1); } setHeader("Range", tmp); return getNonBlocking(link, 6); @@ -224,9 +233,9 @@ namespace HTTP{ bool Downloader::getRange(const HTTP::URL &link, size_t byteStart, size_t byteEnd, Util::DataCallback &cb){ char tmp[32]; if (byteEnd <= 0){// get range from byteStart til eof - sprintf(tmp, "bytes=%llu-", byteStart); + sprintf(tmp, "bytes=%zu-", byteStart); }else{ - sprintf(tmp, "bytes=%llu-%llu", byteStart, byteEnd - 1); + sprintf(tmp, "bytes=%zu-%zu", byteStart, byteEnd - 1); } setHeader("Range", tmp); return get(link, 6, cb); @@ -257,6 +266,10 @@ namespace HTTP{ return true; } + const HTTP::URL & Downloader::lastURL(){ + return nbLink; + } + // continue handling a request, origininally set up by the getNonBlocking() function // returns true if the request is complete bool Downloader::continueNonBlocking(Util::DataCallback &cb){ @@ -283,7 +296,6 @@ namespace HTTP{ } if (H.hasHeader("Accept-Ranges") && getHeader("Accept-Ranges").size() > 0){ - INFO_MSG("new request? range! len: %llu, currlength: %llu", H.length, H.currentLength); getRangeNonBlocking(nbLink, H.currentLength, 0, cb); return true; }else{ @@ -324,10 +336,9 @@ namespace HTTP{ if (!canContinue(nbLink)){return false;} --nbMaxRecursiveDepth; if (getStatusCode() >= 300 && getStatusCode() < 400){ - doRequest(nbLink.link(getHeader("Location"))); - }else{ - doRequest(nbLink); + nbLink = nbLink.link(getHeader("Location")); } + doRequest(nbLink); return false; } @@ -345,7 +356,7 @@ namespace HTTP{ nbReqTime = Util::bootSecs(); } } - + WARN_MSG("Invalid connection state for HTTP request"); return false; //we should never get here } @@ -445,7 +456,7 @@ namespace HTTP{ FAIL_MSG("Authentication required but not included in URL"); return false; } - FAIL_MSG("Authenticating..."); + INFO_MSG("Authenticating..."); return true; } if (getStatusCode() == 407){ @@ -459,7 +470,7 @@ namespace HTTP{ FAIL_MSG("Proxy authentication required but not included in URL"); return false; } - FAIL_MSG("Authenticating proxy..."); + INFO_MSG("Authenticating proxy..."); return true; } if (getStatusCode() >= 300 && getStatusCode() < 400){ diff --git a/lib/downloader.h b/lib/downloader.h index 215c9208..55c1c3d6 100644 --- a/lib/downloader.h +++ b/lib/downloader.h @@ -40,6 +40,7 @@ namespace HTTP{ Socket::Connection &getSocket(); uint32_t retryCount, dataTimeout; bool isProxied() const; + const HTTP::URL & lastURL(); private: bool isComplete; From d8fb22a71fda2d2d351633a37e8179ef7bb8678f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 9 Mar 2020 20:01:00 +0100 Subject: [PATCH 7/8] URIReader --- CMakeLists.txt | 2 + lib/urireader.cpp | 375 ++++++++++++++++++++++++++++++++++++++++++++++ lib/urireader.h | 83 ++++++++++ 3 files changed, 460 insertions(+) create mode 100644 lib/urireader.cpp create mode 100644 lib/urireader.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c4bc71da..9dc001cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -139,6 +139,7 @@ set(libHeaders lib/ebml_socketglue.h lib/websocket.h lib/url.h + lib/urireader.h ) ######################################## @@ -183,6 +184,7 @@ add_library (mist lib/ebml_socketglue.cpp lib/websocket.cpp lib/url.cpp + lib/urireader.cpp ) if (NOT APPLE) set (LIBRT -lrt) diff --git a/lib/urireader.cpp b/lib/urireader.cpp new file mode 100644 index 00000000..3d97e49e --- /dev/null +++ b/lib/urireader.cpp @@ -0,0 +1,375 @@ +#include "defines.h" +#include "shared_memory.h" +#include "timing.h" +#include "urireader.h" +#include "util.h" +#include +#include + +namespace HTTP{ + + URIReader::URIReader(){ + char workDir[512]; + getcwd(workDir, 512); + myURI = HTTP::URL(std::string("file://") + workDir); + cbProgress = 0; + minLen = 1; + maxLen = std::string::npos; + startPos = 0; + supportRangeRequest = false; + endPos = std::string::npos; + totalSize = std::string::npos; + stateType = URIType::Closed; + clearPointer = true; + } + + URIReader::URIReader(const HTTP::URL &uri){ + URIReader(); + open(uri); + } + + URIReader::URIReader(const std::string &reluri){ + URIReader(); + open(reluri); + } + + bool URIReader::open(const std::string &reluri){return open(myURI.link(reluri));} + + /// Internal callback function, used to buffer data. + void URIReader::dataCallback(const char *ptr, size_t size){ + std::string t = std::string(ptr, size); + allData.append(t.c_str(), size); + } + + bool URIReader::open(const HTTP::URL &uri){ + myURI = uri; + curPos = 0; + + if (!myURI.protocol.size() || myURI.protocol == "file"){ + if (!myURI.path.size() || myURI.path == "-"){ + downer.getSocket().open(-1, fileno(stdin)); + stateType = URIType::Stream; + startPos = 0; + + endPos = std::string::npos; + totalSize = std::string::npos; + if (!downer.getSocket()){ + FAIL_MSG("Could not open '%s': %s", myURI.getUrl().c_str(), strerror(errno)); + stateType = URIType::Closed; + return false; + } + return true; + }else{ + // + // + /// \todo Use ACCESSPERMS instead of 0600? + int handle = ::open(myURI.getFilePath().c_str(), O_RDWR, (mode_t)0600); + if (handle == -1){ + FAIL_MSG("opening file: %s failed: %s", myURI.getFilePath().c_str(), strerror(errno)); + stateType = URIType::Closed; + return false; + } + + struct stat buffStats; + int xRes = fstat(handle, &buffStats); + if (xRes < 0){ + FAIL_MSG("Cheking size of %s failed: %s", myURI.getFilePath().c_str(), strerror(errno)); + stateType = URIType::Closed; + return false; + } + totalSize = buffStats.st_size; + // INFO_MSG("size: %llu", totalSize); + + mapped = (char *)mmap(0, totalSize, PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0); + if (mapped == MAP_FAILED){ + mapped = 0; + stateType = URIType::Closed; + return false; + } + startPos = 0; + + stateType = URIType::File; + return true; + } + } + + // HTTP, stream or regular download? + if (myURI.protocol == "http" || myURI.protocol == "https"){ + stateType = URIType::HTTP; + + // Send HEAD request to determine range request is supported, and get total length + if (!downer.head(myURI)){FAIL_MSG("Error sending HEAD request");} + + std::string header1 = downer.getHeader("Accept-Ranges"); + supportRangeRequest = (header1.size() > 0); + + header1 = downer.getHeader("Content-Length"); + totalSize = atoi(header1.c_str()); + + // streaming mode when size is unknown + if (!supportRangeRequest){ + downer.getNonBlocking(uri); + }else{ + MEDIUM_MSG("download file with range request: %s, totalsize: %llu", myURI.getUrl().c_str(), totalSize); + if (!downer.getRangeNonBlocking(myURI.getUrl(), curPos, 0)){ + FAIL_MSG("error loading url: %s", myURI.getUrl().c_str()); + } + } + + if (!downer.getSocket()){ + FAIL_MSG("Could not open '%s': %s", myURI.getUrl().c_str(), strerror(errno)); + stateType = URIType::Closed; + return false; + } + return true; + } + + FAIL_MSG("URI type not implemented: %s", myURI.getUrl().c_str()); + return false; + } + + // seek to pos, return true if succeeded. + bool URIReader::seek(const uint64_t pos){ + if (isSeekable()){ + if (stateType == URIType::File){ + curPos = pos; + return true; + }else if (stateType == URIType::HTTP && supportRangeRequest){ + INFO_MSG("SEEK: RangeRequest to %llu", pos); + if (!downer.getRangeNonBlocking(myURI.getUrl(), pos, 0)){ + FAIL_MSG("error loading request"); + } + } + } + + return false; + } + + void URIReader::readAll(size_t (*dataCallback)(const char *data, size_t len)){ + while (!isEOF()){readSome(dataCallback, 419430);} + } + + /// Read all function, with use of callbacks + void URIReader::readAll(Util::DataCallback &cb){ + while (!isEOF()){readSome(1048576, cb);} + } + + /// Read all blocking function, which internally uses the Nonblocking function. + void URIReader::readAll(char *&dataPtr, size_t &dataLen){ + size_t s = 0; + char *tmp = 0; + std::string t; + + allData.allocate(68401307); + + while (!isEOF()){ + readSome(10046, *this); + // readSome(1048576, *this); + } + dataPtr = allData; + dataLen = allData.size(); + } + + void httpBodyCallback(const char *ptr, size_t size){INFO_MSG("callback");} + + void URIReader::readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen){ + /// TODO: Implement + } + + // readsome with callback + void URIReader::readSome(size_t wantedLen, Util::DataCallback &cb){ + if (isEOF()){return;} + if (stateType == URIType::File){ + // dataPtr = mapped + curPos; + uint64_t dataLen = 0; + + if (wantedLen < totalSize){ + if ((wantedLen + curPos) > totalSize){ + dataLen = totalSize - curPos; // restant + // INFO_MSG("file curpos: %llu, dataLen: %llu, totalSize: %llu ", curPos, dataLen, totalSize); + }else{ + dataLen = wantedLen; + } + }else{ + dataLen = totalSize; + } + + std::string t = std::string(mapped + curPos, dataLen); + cb.dataCallback(t.c_str(), dataLen); + + curPos += dataLen; + + }else if (stateType == URIType::HTTP){ + + bool res = downer.continueNonBlocking(cb); + + if (res){ + if (downer.completed()){ + MEDIUM_MSG("completed"); + }else{ + if (supportRangeRequest){ + MEDIUM_MSG("do new range request, previous request not completed yet!, curpos: %llu, " + "length: %llu", + curPos, getSize()); + } + } + }else{ + Util::sleep(10); + } + }else{// streaming mode + int s; + static int totaal = 0; + if ((downer.getSocket() && downer.getSocket().spool())){// || downer.getSocket().Received().size() > 0){ + s = downer.getSocket().Received().bytes(wantedLen); + std::string buf = downer.getSocket().Received().remove(s); + + cb.dataCallback(buf.data(), s); + totaal += s; + } + } + } + + /// Readsome blocking function. + void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){ + if (stateType == URIType::File){ + + dataPtr = mapped + curPos; + + if (wantedLen < totalSize){ + if ((wantedLen + curPos) > totalSize){ + dataLen = totalSize - curPos; // restant + }else{ + dataLen = wantedLen; + } + }else{ + dataLen = totalSize; + } + + curPos += dataLen; + }else if (stateType == URIType::HTTP){ + + dataLen = downer.data().size(); + curPos += dataLen; + dataPtr = (char *)downer.data().data(); + }else{ + if (clearPointer){ + rPtr.assign(0, 0); + clearPointer = false; + dataLen = 0; + rPtr.allocate(wantedLen); + } + + int s; + bool run = true; + while (downer.getSocket() && run){ + if (downer.getSocket().spool()){ + + if (wantedLen < 8000){ + s = downer.getSocket().Received().bytes(wantedLen); + }else{ + s = downer.getSocket().Received().bytes(8000); + } + + std::string buf = downer.getSocket().Received().remove(s); + rPtr.append(buf.c_str(), s); + + dataLen += s; + curPos += s; + + if (rPtr.size() >= wantedLen){ + dataLen = rPtr.size(); + dataPtr = rPtr; + // INFO_MSG("laatste stukje, datalen: %llu, wanted: %llu", dataLen, + // wantedLen); dataCallback(ptr, len); + clearPointer = true; + run = false; + } + //} + }else{ + // INFO_MSG("data not yet available!"); + return; + } + } + + // if (!downer.getSocket()){ + totalSize = curPos; + dataLen = rPtr.size(); + //} + // INFO_MSG("size: %llu, datalen: %llu", totalSize, rPtr.size()); + dataPtr = rPtr; + } + } + + void URIReader::close(){ + if (stateType == URIType::File){ + if (mapped){ + munmap(mapped, totalSize); + mapped = 0; + totalSize = 0; + } + }else if (stateType == URIType::Stream){ + downer.getSocket().close(); + }else if (stateType == URIType::HTTP){ + downer.getSocket().close(); + }else{ + // INFO_MSG("already closed"); + } + } + + void URIReader::onProgress(bool (*progressCallback)(uint8_t)){ + /// TODO: Implement + } + + void URIReader::setBounds(size_t newMinLen, size_t newMaxLen){ + minLen = newMinLen; + maxLen = newMaxLen; + } + + bool URIReader::isSeekable(){ + if (stateType == URIType::HTTP){ + + if (supportRangeRequest && totalSize > 0){return true;} + } + + return (stateType == URIType::File); + } + + bool URIReader::isEOF(){ + if (stateType == URIType::File){ + return (curPos >= totalSize); + }else if (stateType == URIType::Stream){ + if (!downer.getSocket()){return true;} + + // if ((totalSize > 0) && (curPos >= totalSize)){return true;} + }else if (stateType == URIType::HTTP){ + // INFO_MSG("iseof, C: %s, seekable: %s", C?"connected":"disconnected", isSeekable()?"yes":"no"); + if (!downer.getSocket() && !downer.getSocket().Received().available(1) && !isSeekable()){ + return true; + } + if ((totalSize > 0) && (curPos >= totalSize)){return true;} + + // mark as complete if downer reports download is completed, or when socket connection is closed when totalsize is not known. + if (downer.completed() || (!totalSize && !downer.getSocket())){ + // INFO_MSG("complete totalsize: %llu, %s", totalSize, downer.getSocket() ? "Connected" : "disconnected"); + return true; + } + + }else{ + return true; + } + + return false; + } + + bool URIReader::isGood() const{ + return true; + /// TODO: Implement + } + + uint64_t URIReader::getPos(){return curPos;} + + const HTTP::URL &URIReader::getURI() const{return myURI;} + + size_t URIReader::getSize() const{return totalSize;} + +}// namespace HTTP diff --git a/lib/urireader.h b/lib/urireader.h new file mode 100644 index 00000000..e643e046 --- /dev/null +++ b/lib/urireader.h @@ -0,0 +1,83 @@ +#pragma once +#include "downloader.h" +#include "util.h" +#include +namespace HTTP{ + + enum URIType{Closed = 0, File, Stream, HTTP}; + + /// Opens a generic URI for reading. Supports streams/pipes, HTTP(S) and file access. + /// Supports seeking, partial and full reads; emulating behaviour where necessary. + /// Calls progress callback for long-duration operations, if set. + class URIReader : public Util::DataCallback{ + public: + // Setters/initers + + /// Sets the internal URI to the current working directory, but does not call open(). + URIReader(); + /// Calls open on the given uri during construction + URIReader(const HTTP::URL &uri); + /// Calls open on the given relative uri during construction + /// URI is resolved relative to the current working directory + URIReader(const std::string &reluri); + /// Sets the internal URI to the given URI and opens it, whatever that may mean for the given URI type. + bool open(const HTTP::URL &uri); + /// Links the internal URI to the given relative URI and opens it, whatever that may mean for the current URI type. + bool open(const std::string &reluri); + /// Seeks to the given position, relative to fragment's #start=X value or 0 if not set. + bool seek(const uint64_t pos); + /// Reads all data from start to end, calling the dataCallback whenever minLen/maxLen require it. + void readAll(size_t (*dataCallback)(const char *data, size_t len)); + + /// Reads all data from start to end, returning it in a single buffer with all data. + void readAll(char *&dataPtr, size_t &dataLen); + /// Reads all data from start to end, using callbacks + void readAll(Util::DataCallback &cb); + + /// Reads wantedLen bytes of data from current position, calling the dataCallback whenever minLen/maxLen require it. + void readSome(size_t (*dataCallback)(const char *data, size_t len), size_t wantedLen); + /// Reads wantedLen bytes of data from current position, returning it in a single buffer. + void readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen); + + void readSome(size_t wantedLen, Util::DataCallback &cb); + + /// Closes the currently open URI. Does not change the internal URI value. + void close(); + + // Configuration setters + /// Progress callback, called whenever transfer stalls. Not called if unset. + void onProgress(bool (*progressCallback)(uint8_t)); + /// Sets minimum and maximum buffer size for read calls that use callbacks + void setBounds(size_t minLen = 0, size_t maxLen = 0); + + // Static getters + bool isSeekable(); /// Returns true if seeking is possible in this URI. + bool isEOF(); /// Returns true if the end of the URI has been reached. + bool isGood() const; /// Returns true if more data can still be read. + uint64_t getPos(); /// Returns the current byte position in the URI. + const HTTP::URL &getURI() const; /// Returns the most recently open URI, or the current working directory if not set. + size_t getSize() const; /// Returns the size of the currently open URI, if known. Returns std::string::npos if unknown size. + + void (*httpBodyCallback)(const char *ptr, size_t size); + void dataCallback(const char *ptr, size_t size); + + private: + // Internal state variables + bool (*cbProgress)(uint8_t); /// The progress callback, if any. Not called if set to a null pointer. + HTTP::URL myURI; /// The most recently open URI, or the current working directory if nothing has been opened yet. + size_t minLen; /// Minimum buffer size for dataCallback. + size_t maxLen; /// Maximum buffer size for dataCallback. + size_t startPos; /// Start position for byte offsets. + size_t endPos; /// End position for byte offsets. + size_t totalSize; /// Total size in bytes of the current URI. May be incomplete before read finished. + size_t curPos; + char *mapped; + bool supportRangeRequest; + Util::ResizeablePointer rPtr; + Util::ResizeablePointer allData; + bool clearPointer; + URIType stateType; /// Holds the type of URI this is, for internal processing purposes. + std::ifstream fReader; /// For file-based URIs, the ifstream used for the file. + HTTP::Downloader downer; /// For HTTP(S)-based URIs, the Downloader instance used for the download. + }; +}// namespace HTTP From ae6ccb36158952c76b611592e520c7d421c8416b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 9 Mar 2020 20:01:32 +0100 Subject: [PATCH 8/8] URIReader fixes / improvements --- lib/urireader.cpp | 110 +++++++++++++++++++++++++--------------------- lib/urireader.h | 1 + 2 files changed, 61 insertions(+), 50 deletions(-) diff --git a/lib/urireader.cpp b/lib/urireader.cpp index 3d97e49e..9d8751ea 100644 --- a/lib/urireader.cpp +++ b/lib/urireader.cpp @@ -8,10 +8,11 @@ namespace HTTP{ - URIReader::URIReader(){ + + void URIReader::init(){ char workDir[512]; getcwd(workDir, 512); - myURI = HTTP::URL(std::string("file://") + workDir); + myURI = HTTP::URL(std::string("file://") + workDir + "/"); cbProgress = 0; minLen = 1; maxLen = std::string::npos; @@ -19,17 +20,21 @@ namespace HTTP{ supportRangeRequest = false; endPos = std::string::npos; totalSize = std::string::npos; - stateType = URIType::Closed; + stateType = HTTP::Closed; clearPointer = true; } + URIReader::URIReader(){ + init(); + } + URIReader::URIReader(const HTTP::URL &uri){ - URIReader(); + init(); open(uri); } URIReader::URIReader(const std::string &reluri){ - URIReader(); + init(); open(reluri); } @@ -48,77 +53,82 @@ namespace HTTP{ if (!myURI.protocol.size() || myURI.protocol == "file"){ if (!myURI.path.size() || myURI.path == "-"){ downer.getSocket().open(-1, fileno(stdin)); - stateType = URIType::Stream; + stateType = HTTP::Stream; startPos = 0; endPos = std::string::npos; totalSize = std::string::npos; if (!downer.getSocket()){ - FAIL_MSG("Could not open '%s': %s", myURI.getUrl().c_str(), strerror(errno)); - stateType = URIType::Closed; + FAIL_MSG("Could not open '%s': %s", myURI.getUrl().c_str(), downer.getSocket().getError().c_str()); + stateType = HTTP::Closed; return false; } return true; }else{ - // - // - /// \todo Use ACCESSPERMS instead of 0600? - int handle = ::open(myURI.getFilePath().c_str(), O_RDWR, (mode_t)0600); + int handle = ::open(myURI.getFilePath().c_str(), O_RDONLY); if (handle == -1){ - FAIL_MSG("opening file: %s failed: %s", myURI.getFilePath().c_str(), strerror(errno)); - stateType = URIType::Closed; + FAIL_MSG("Opening file '%s' failed: %s", myURI.getFilePath().c_str(), strerror(errno)); + stateType = HTTP::Closed; return false; } struct stat buffStats; int xRes = fstat(handle, &buffStats); if (xRes < 0){ - FAIL_MSG("Cheking size of %s failed: %s", myURI.getFilePath().c_str(), strerror(errno)); - stateType = URIType::Closed; + FAIL_MSG("Checking size of '%s' failed: %s", myURI.getFilePath().c_str(), strerror(errno)); + stateType = HTTP::Closed; return false; } totalSize = buffStats.st_size; // INFO_MSG("size: %llu", totalSize); - mapped = (char *)mmap(0, totalSize, PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0); + mapped = (char *)mmap(0, totalSize, PROT_READ, MAP_SHARED, handle, 0); if (mapped == MAP_FAILED){ + FAIL_MSG("Memory-mapping file '%s' failed: %s", myURI.getFilePath().c_str(), strerror(errno)); mapped = 0; - stateType = URIType::Closed; + stateType = HTTP::Closed; return false; } startPos = 0; - stateType = URIType::File; + stateType = HTTP::File; return true; } } // HTTP, stream or regular download? if (myURI.protocol == "http" || myURI.protocol == "https"){ - stateType = URIType::HTTP; + stateType = HTTP::HTTP; // Send HEAD request to determine range request is supported, and get total length - if (!downer.head(myURI)){FAIL_MSG("Error sending HEAD request");} - - std::string header1 = downer.getHeader("Accept-Ranges"); - supportRangeRequest = (header1.size() > 0); - - header1 = downer.getHeader("Content-Length"); - totalSize = atoi(header1.c_str()); + if (!downer.head(myURI) || !downer.isOk()){ + FAIL_MSG("Error getting URI info for '%s': %" PRIu32 " %s", myURI.getUrl().c_str(), downer.getStatusCode(), downer.getStatusText().c_str()); + if (!downer.isOk()){ + return false; + } + supportRangeRequest = false; + totalSize = std::string::npos; + }else{ + supportRangeRequest = (downer.getHeader("Accept-Ranges").size() > 0); + std::string header1 = downer.getHeader("Content-Length"); + if (header1.size()){totalSize = atoi(header1.c_str());} + myURI = downer.lastURL(); + } // streaming mode when size is unknown if (!supportRangeRequest){ - downer.getNonBlocking(uri); + MEDIUM_MSG("URI get without range request: %s, totalsize: %zu", myURI.getUrl().c_str(), totalSize); + downer.getNonBlocking(myURI); }else{ - MEDIUM_MSG("download file with range request: %s, totalsize: %llu", myURI.getUrl().c_str(), totalSize); - if (!downer.getRangeNonBlocking(myURI.getUrl(), curPos, 0)){ + MEDIUM_MSG("URI get with range request: %s, totalsize: %zu", myURI.getUrl().c_str(), totalSize); + if (!downer.getRangeNonBlocking(myURI, curPos, 0)){ FAIL_MSG("error loading url: %s", myURI.getUrl().c_str()); } } if (!downer.getSocket()){ - FAIL_MSG("Could not open '%s': %s", myURI.getUrl().c_str(), strerror(errno)); - stateType = URIType::Closed; + FAIL_MSG("Could not open '%s': %s", myURI.getUrl().c_str(), downer.getStatusText().c_str()); + stateType = HTTP::Closed; return false; } return true; @@ -131,11 +141,11 @@ namespace HTTP{ // seek to pos, return true if succeeded. bool URIReader::seek(const uint64_t pos){ if (isSeekable()){ - if (stateType == URIType::File){ + if (stateType == HTTP::File){ curPos = pos; return true; - }else if (stateType == URIType::HTTP && supportRangeRequest){ - INFO_MSG("SEEK: RangeRequest to %llu", pos); + }else if (stateType == HTTP::HTTP && supportRangeRequest){ + INFO_MSG("SEEK: RangeRequest to %" PRIu64, pos); if (!downer.getRangeNonBlocking(myURI.getUrl(), pos, 0)){ FAIL_MSG("error loading request"); } @@ -179,7 +189,7 @@ namespace HTTP{ // readsome with callback void URIReader::readSome(size_t wantedLen, Util::DataCallback &cb){ if (isEOF()){return;} - if (stateType == URIType::File){ + if (stateType == HTTP::File){ // dataPtr = mapped + curPos; uint64_t dataLen = 0; @@ -199,7 +209,7 @@ namespace HTTP{ curPos += dataLen; - }else if (stateType == URIType::HTTP){ + }else if (stateType == HTTP::HTTP){ bool res = downer.continueNonBlocking(cb); @@ -208,8 +218,8 @@ namespace HTTP{ MEDIUM_MSG("completed"); }else{ if (supportRangeRequest){ - MEDIUM_MSG("do new range request, previous request not completed yet!, curpos: %llu, " - "length: %llu", + MEDIUM_MSG("do new range request, previous request not completed yet!, curpos: %zu, " + "length: %zu", curPos, getSize()); } } @@ -231,7 +241,7 @@ namespace HTTP{ /// Readsome blocking function. void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){ - if (stateType == URIType::File){ + if (stateType == HTTP::File){ dataPtr = mapped + curPos; @@ -246,7 +256,7 @@ namespace HTTP{ } curPos += dataLen; - }else if (stateType == URIType::HTTP){ + }else if (stateType == HTTP::HTTP){ dataLen = downer.data().size(); curPos += dataLen; @@ -301,15 +311,15 @@ namespace HTTP{ } void URIReader::close(){ - if (stateType == URIType::File){ + if (stateType == HTTP::File){ if (mapped){ munmap(mapped, totalSize); mapped = 0; totalSize = 0; } - }else if (stateType == URIType::Stream){ + }else if (stateType == HTTP::Stream){ downer.getSocket().close(); - }else if (stateType == URIType::HTTP){ + }else if (stateType == HTTP::HTTP){ downer.getSocket().close(); }else{ // INFO_MSG("already closed"); @@ -326,22 +336,22 @@ namespace HTTP{ } bool URIReader::isSeekable(){ - if (stateType == URIType::HTTP){ + if (stateType == HTTP::HTTP){ - if (supportRangeRequest && totalSize > 0){return true;} + if (supportRangeRequest && totalSize != std::string::npos){return true;} } - return (stateType == URIType::File); + return (stateType == HTTP::File); } bool URIReader::isEOF(){ - if (stateType == URIType::File){ + if (stateType == HTTP::File){ return (curPos >= totalSize); - }else if (stateType == URIType::Stream){ + }else if (stateType == HTTP::Stream){ if (!downer.getSocket()){return true;} // if ((totalSize > 0) && (curPos >= totalSize)){return true;} - }else if (stateType == URIType::HTTP){ + }else if (stateType == HTTP::HTTP){ // INFO_MSG("iseof, C: %s, seekable: %s", C?"connected":"disconnected", isSeekable()?"yes":"no"); if (!downer.getSocket() && !downer.getSocket().Received().available(1) && !isSeekable()){ return true; diff --git a/lib/urireader.h b/lib/urireader.h index e643e046..128ab581 100644 --- a/lib/urireader.h +++ b/lib/urireader.h @@ -79,5 +79,6 @@ namespace HTTP{ URIType stateType; /// Holds the type of URI this is, for internal processing purposes. std::ifstream fReader; /// For file-based URIs, the ifstream used for the file. HTTP::Downloader downer; /// For HTTP(S)-based URIs, the Downloader instance used for the download. + void init(); }; }// namespace HTTP