diff --git a/lib/downloader.cpp b/lib/downloader.cpp index 7d9c39ef..45ed307e 100644 --- a/lib/downloader.cpp +++ b/lib/downloader.cpp @@ -61,6 +61,7 @@ namespace HTTP{ /// Returns a reference to the internal Socket::Connection class instance. Socket::Connection &Downloader::getSocket(){return S;} + const Socket::Connection &Downloader::getSocket() const{return S;} Downloader::~Downloader(){S.close();} @@ -280,7 +281,7 @@ namespace HTTP{ return nbLink; } - // continue handling a request, origininally set up by the getNonBlocking() function + // continue handling a request, originally set up by the getNonBlocking() function // returns true if the request is complete bool Downloader::continueNonBlocking(Util::DataCallback &cb){ while (true){ diff --git a/lib/downloader.h b/lib/downloader.h index 5f49ccdb..34c6ad8e 100644 --- a/lib/downloader.h +++ b/lib/downloader.h @@ -39,9 +39,10 @@ namespace HTTP{ void setHeader(const std::string &name, const std::string &val); void clearHeaders(); bool canRequest(const HTTP::URL &link); - bool completed(){return isComplete;} + bool completed() const{return isComplete;} Parser &getHTTP(); Socket::Connection &getSocket(); + const Socket::Connection &getSocket() const; uint32_t retryCount, dataTimeout; bool isProxied() const; const HTTP::URL & lastURL(); diff --git a/lib/socket.cpp b/lib/socket.cpp index cc98322c..7e2abfae 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -411,6 +411,16 @@ bool Socket::Buffer::available(unsigned int count){ return false; } +/// Returns true if at least count bytes are available in this buffer. +bool Socket::Buffer::available(unsigned int count) const{ + unsigned int i = 0; + for (std::deque::const_iterator it = data.begin(); it != data.end(); ++it){ + i += (*it).size(); + if (i >= count){return true;} + } + return false; +} + /// Removes count bytes from the buffer, returning them by value. /// Returns an empty string if not all count bytes are available. std::string Socket::Buffer::remove(unsigned int count){ @@ -858,7 +868,7 @@ void Socket::Connection::open(std::string host, int port, bool nonblock, bool wi sSend = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sSend < 0){continue;} if (connect(sSend, rp->ai_addr, rp->ai_addrlen) == 0){ - remoteaddr = *((sockaddr_in6 *)rp->ai_addr); + memcpy(&remoteaddr, rp->ai_addr, rp->ai_addrlen); break; } lastErr += strerror(errno); @@ -931,6 +941,11 @@ Socket::Buffer &Socket::Connection::Received(){ return downbuffer; } +/// Returns a reference to the download buffer. +const Socket::Buffer &Socket::Connection::Received() const{ + return downbuffer; +} + /// Will not buffer anything but always send right away. Blocks. /// Any data that could not be send will block until it can be send or the connection is severed. void Socket::Connection::SendNow(const char *data, size_t len){ @@ -1169,7 +1184,9 @@ void Socket::Connection::setHost(std::string host){ hints.ai_next = NULL; int s = getaddrinfo(host.c_str(), 0, &hints, &result); if (s != 0){return;} - if (result){remoteaddr = *((sockaddr_in6 *)result->ai_addr);} + if (result){ + memcpy(&remoteaddr, result->ai_addr, result->ai_addrlen); + } freeaddrinfo(result); } diff --git a/lib/socket.h b/lib/socket.h index f652fbc4..61e58a2f 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -63,6 +63,7 @@ namespace Socket{ void prepend(const char *newdata, const unsigned int newdatasize); std::string &get(); bool available(unsigned int count); + bool available(unsigned int count) const; std::string remove(unsigned int count); std::string copy(unsigned int count); void clear(); @@ -141,6 +142,7 @@ namespace Socket{ bool spool(); ///< Updates the downbufferinternal variables. bool peek(); ///< Clears the downbuffer and fills it with peek Buffer &Received(); ///< Returns a reference to the download buffer. + const Buffer &Received() const; ///< Returns a reference to the download buffer. void SendNow(const std::string &data); ///< Will not buffer anything but always send right away. Blocks. void SendNow(const char *data); ///< Will not buffer anything but always send right away. Blocks. void SendNow(const char *data, diff --git a/lib/urireader.cpp b/lib/urireader.cpp index 9d8751ea..b2dbb562 100644 --- a/lib/urireader.cpp +++ b/lib/urireader.cpp @@ -10,6 +10,8 @@ namespace HTTP{ void URIReader::init(){ + handle = -1; + mapped = 0; char workDir[512]; getcwd(workDir, 512); myURI = HTTP::URL(std::string("file://") + workDir + "/"); @@ -22,6 +24,8 @@ namespace HTTP{ totalSize = std::string::npos; stateType = HTTP::Closed; clearPointer = true; + curPos = 0; + bufPos = 0; } URIReader::URIReader(){ @@ -42,13 +46,15 @@ namespace HTTP{ /// Internal callback function, used to buffer data. void URIReader::dataCallback(const char *ptr, size_t size){ - std::string t = std::string(ptr, size); - allData.append(t.c_str(), size); + allData.append(ptr, size); } bool URIReader::open(const HTTP::URL &uri){ + close(); myURI = uri; curPos = 0; + allData.truncate(0); + bufPos = 0; if (!myURI.protocol.size() || myURI.protocol == "file"){ if (!myURI.path.size() || myURI.path == "-"){ @@ -65,7 +71,7 @@ namespace HTTP{ } return true; }else{ - int handle = ::open(myURI.getFilePath().c_str(), O_RDONLY); + handle = ::open(myURI.getFilePath().c_str(), O_RDONLY); if (handle == -1){ FAIL_MSG("Opening file '%s' failed: %s", myURI.getFilePath().c_str(), strerror(errno)); stateType = HTTP::Closed; @@ -101,6 +107,7 @@ namespace HTTP{ stateType = HTTP::HTTP; // Send HEAD request to determine range request is supported, and get total length + downer.clearHeaders(); if (!downer.head(myURI) || !downer.isOk()){ FAIL_MSG("Error getting URI info for '%s': %" PRIu32 " %s", myURI.getUrl().c_str(), downer.getStatusCode(), downer.getStatusText().c_str()); if (!downer.isOk()){ @@ -141,6 +148,8 @@ namespace HTTP{ // seek to pos, return true if succeeded. bool URIReader::seek(const uint64_t pos){ if (isSeekable()){ + allData.truncate(0); + bufPos = 0; if (stateType == HTTP::File){ curPos = pos; return true; @@ -166,16 +175,8 @@ namespace HTTP{ /// Read all blocking function, which internally uses the Nonblocking function. void URIReader::readAll(char *&dataPtr, size_t &dataLen){ - size_t s = 0; - char *tmp = 0; - std::string t; - - allData.allocate(68401307); - - while (!isEOF()){ - readSome(10046, *this); - // readSome(1048576, *this); - } + if (getSize() != std::string::npos){allData.allocate(getSize());} + while (!isEOF()){readSome(10046, *this);} dataPtr = allData; dataLen = allData.size(); } @@ -210,22 +211,8 @@ namespace HTTP{ curPos += dataLen; }else if (stateType == HTTP::HTTP){ - bool res = downer.continueNonBlocking(cb); - - if (res){ - if (downer.completed()){ - MEDIUM_MSG("completed"); - }else{ - if (supportRangeRequest){ - MEDIUM_MSG("do new range request, previous request not completed yet!, curpos: %zu, " - "length: %zu", - curPos, getSize()); - } - } - }else{ - Util::sleep(10); - } + curPos = downer.const_data().size(); }else{// streaming mode int s; static int totaal = 0; @@ -241,88 +228,41 @@ namespace HTTP{ /// Readsome blocking function. void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){ - if (stateType == HTTP::File){ - - dataPtr = mapped + curPos; - - if (wantedLen < totalSize){ - if ((wantedLen + curPos) > totalSize){ - dataLen = totalSize - curPos; // restant - }else{ - dataLen = wantedLen; - } - }else{ - dataLen = totalSize; - } - - curPos += dataLen; - }else if (stateType == HTTP::HTTP){ - - dataLen = downer.data().size(); - curPos += dataLen; - dataPtr = (char *)downer.data().data(); - }else{ - if (clearPointer){ - rPtr.assign(0, 0); - clearPointer = false; - dataLen = 0; - rPtr.allocate(wantedLen); - } - - int s; - bool run = true; - while (downer.getSocket() && run){ - if (downer.getSocket().spool()){ - - if (wantedLen < 8000){ - s = downer.getSocket().Received().bytes(wantedLen); - }else{ - s = downer.getSocket().Received().bytes(8000); - } - - std::string buf = downer.getSocket().Received().remove(s); - rPtr.append(buf.c_str(), s); - - dataLen += s; - curPos += s; - - if (rPtr.size() >= wantedLen){ - dataLen = rPtr.size(); - dataPtr = rPtr; - // INFO_MSG("laatste stukje, datalen: %llu, wanted: %llu", dataLen, - // wantedLen); dataCallback(ptr, len); - clearPointer = true; - run = false; - } - //} - }else{ - // INFO_MSG("data not yet available!"); - return; - } - } - - // if (!downer.getSocket()){ - totalSize = curPos; - dataLen = rPtr.size(); - //} - // INFO_MSG("size: %llu, datalen: %llu", totalSize, rPtr.size()); - dataPtr = rPtr; + //Clear the buffer if we're finished with it + if (allData.size() && bufPos == allData.size()){ + allData.truncate(0); + bufPos = 0; } + //Read more data if needed + while (allData.size() < wantedLen + bufPos && *this){ + readSome(wantedLen - (allData.size() - bufPos), *this); + } + //Return wantedLen bytes if we have them + if (allData.size() >= wantedLen + bufPos){ + dataPtr = allData + bufPos; + dataLen = wantedLen; + bufPos += wantedLen; + return; + } + //Ok, we have a short count. Return the amount we actually got. + dataPtr = allData + bufPos; + dataLen = allData.size() - bufPos; + bufPos = allData.size(); } void URIReader::close(){ - if (stateType == HTTP::File){ - if (mapped){ - munmap(mapped, totalSize); - mapped = 0; - totalSize = 0; - } - }else if (stateType == HTTP::Stream){ - downer.getSocket().close(); - }else if (stateType == HTTP::HTTP){ - downer.getSocket().close(); - }else{ - // INFO_MSG("already closed"); + //Close downloader socket if open + downer.getSocket().close(); + //Unmap file if mapped + if (mapped){ + munmap(mapped, totalSize); + mapped = 0; + totalSize = 0; + } + //Close file handle if open + if (handle != -1){ + ::close(handle); + handle = -1; } } @@ -335,45 +275,31 @@ namespace HTTP{ maxLen = newMaxLen; } - bool URIReader::isSeekable(){ + bool URIReader::isSeekable() const{ if (stateType == HTTP::HTTP){ - if (supportRangeRequest && totalSize != std::string::npos){return true;} } - return (stateType == HTTP::File); } - bool URIReader::isEOF(){ + bool URIReader::isEOF() const{ if (stateType == HTTP::File){ return (curPos >= totalSize); }else if (stateType == HTTP::Stream){ - if (!downer.getSocket()){return true;} - - // if ((totalSize > 0) && (curPos >= totalSize)){return true;} + if (!downer.getSocket() && !downer.getSocket().Received().available(1)){return true;} + return false; }else if (stateType == HTTP::HTTP){ - // INFO_MSG("iseof, C: %s, seekable: %s", C?"connected":"disconnected", isSeekable()?"yes":"no"); if (!downer.getSocket() && !downer.getSocket().Received().available(1) && !isSeekable()){ + if (allData.size() && bufPos < allData.size()){return false;} return true; } - if ((totalSize > 0) && (curPos >= totalSize)){return true;} - - // mark as complete if downer reports download is completed, or when socket connection is closed when totalsize is not known. - if (downer.completed() || (!totalSize && !downer.getSocket())){ - // INFO_MSG("complete totalsize: %llu, %s", totalSize, downer.getSocket() ? "Connected" : "disconnected"); + if ((totalSize > 0 && curPos >= totalSize) || downer.completed() || (!totalSize && !downer.getSocket())){ + if (allData.size() && bufPos < allData.size()){return false;} return true; } - - }else{ - return true; + return false; } - - return false; - } - - bool URIReader::isGood() const{ return true; - /// TODO: Implement } uint64_t URIReader::getPos(){return curPos;} diff --git a/lib/urireader.h b/lib/urireader.h index 128ab581..816368c8 100644 --- a/lib/urireader.h +++ b/lib/urireader.h @@ -45,40 +45,42 @@ namespace HTTP{ void close(); // Configuration setters + /// Progress callback, called whenever transfer stalls. Not called if unset. void onProgress(bool (*progressCallback)(uint8_t)); /// Sets minimum and maximum buffer size for read calls that use callbacks void setBounds(size_t minLen = 0, size_t maxLen = 0); // Static getters - bool isSeekable(); /// Returns true if seeking is possible in this URI. - bool isEOF(); /// Returns true if the end of the URI has been reached. - bool isGood() const; /// Returns true if more data can still be read. - uint64_t getPos(); /// Returns the current byte position in the URI. - const HTTP::URL &getURI() const; /// Returns the most recently open URI, or the current working directory if not set. - size_t getSize() const; /// Returns the size of the currently open URI, if known. Returns std::string::npos if unknown size. + bool isSeekable() const; ///< Returns true if seeking is possible in this URI. + bool isEOF() const; ///< Returns true if the end of the URI has been reached. + operator bool() const {return !isEOF();} ///< Returns !isEOF() + uint64_t getPos(); ///< Returns the current byte position in the URI. + const HTTP::URL &getURI() const; ///< Returns the most recently open URI, or the current working directory if not set. + size_t getSize() const; ///< Returns the size of the currently open URI, if known. Returns std::string::npos if unknown size. void (*httpBodyCallback)(const char *ptr, size_t size); void dataCallback(const char *ptr, size_t size); private: // Internal state variables - bool (*cbProgress)(uint8_t); /// The progress callback, if any. Not called if set to a null pointer. - HTTP::URL myURI; /// The most recently open URI, or the current working directory if nothing has been opened yet. - size_t minLen; /// Minimum buffer size for dataCallback. - size_t maxLen; /// Maximum buffer size for dataCallback. - size_t startPos; /// Start position for byte offsets. - size_t endPos; /// End position for byte offsets. - size_t totalSize; /// Total size in bytes of the current URI. May be incomplete before read finished. - size_t curPos; - char *mapped; + bool (*cbProgress)(uint8_t); ///< The progress callback, if any. Not called if set to a null pointer. + HTTP::URL myURI; ///< The most recently open URI, or the current working directory if nothing has been opened yet. + size_t minLen; ///< Minimum buffer size for dataCallback. + size_t maxLen; ///< Maximum buffer size for dataCallback. + size_t startPos; ///< Start position for byte offsets. + size_t endPos; ///< End position for byte offsets. + size_t totalSize; ///< Total size in bytes of the current URI. May be incomplete before read finished. + size_t curPos; ///< Current read position in source + size_t bufPos; ///< Current read position in buffer + int handle; ///< Open file handle, if file-based. + char *mapped; ///< Memory-map of open file handle, if file-based. bool supportRangeRequest; Util::ResizeablePointer rPtr; Util::ResizeablePointer allData; bool clearPointer; - URIType stateType; /// Holds the type of URI this is, for internal processing purposes. - std::ifstream fReader; /// For file-based URIs, the ifstream used for the file. - HTTP::Downloader downer; /// For HTTP(S)-based URIs, the Downloader instance used for the download. + URIType stateType; ///< Holds the type of URI this is, for internal processing purposes. + HTTP::Downloader downer; ///< For HTTP(S)-based URIs, the Downloader instance used for the download. void init(); }; }// namespace HTTP diff --git a/lib/util.cpp b/lib/util.cpp index aec2668a..2a4402dc 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -184,13 +184,32 @@ namespace Util{ return true; } + bool ResizeablePointer::assign(const std::string & str){ + return assign(str.data(), str.length()); + } + + bool ResizeablePointer::append(const void * p, uint32_t l){ + //We're writing to ourselves or from null pointer - assume outside write (e.g. fread or socket operation) and update the size + if (!p || p == ((char*)ptr)+currSize){ + if (currSize+l > maxSize){ + FAIL_MSG("Pointer write went beyond allocated size! Memory corruption likely."); + BACKTRACE; + return false; + } + currSize += l; + return true; + } if (!allocate(l+currSize)){return false;} memcpy(((char*)ptr)+currSize, p, l); currSize += l; return true; } + bool ResizeablePointer::append(const std::string & str){ + return append(str.data(), str.length()); + } + bool ResizeablePointer::allocate(uint32_t l){ if (l > maxSize){ void *tmp = realloc(ptr, l); @@ -204,6 +223,15 @@ namespace Util{ return true; } + /// Returns amount of space currently reserved for this pointer + uint32_t ResizeablePointer::rsize(){ + return maxSize; + } + + + void ResizeablePointer::truncate(const size_t newLen){ + if (currSize > newLen){currSize = newLen;} + } /// Redirects stderr to log parser, writes log parser to the old stderr. /// Does nothing if the MIST_CONTROL environment variable is set. diff --git a/lib/util.h b/lib/util.h index 08d81aaf..1ce47c5c 100644 --- a/lib/util.h +++ b/lib/util.h @@ -37,10 +37,16 @@ namespace Util{ ResizeablePointer(); ~ResizeablePointer(); inline size_t& size(){return currSize;} + inline const size_t size() const{return currSize;} bool assign(const void * p, uint32_t l); + bool assign(const std::string & str); bool append(const void * p, uint32_t l); + bool append(const std::string & str); bool allocate(uint32_t l); + uint32_t rsize(); + void truncate(const size_t newLen); inline operator char*(){return (char*)ptr;} + inline operator const char *() const{return (const char *)ptr;} inline operator void*(){return ptr;} private: void * ptr;