HTTP::URIReader fixes/improvements
This commit is contained in:
parent
c5d0a9a8ad
commit
679ff219db
2 changed files with 74 additions and 146 deletions
|
@ -10,6 +10,8 @@ namespace HTTP{
|
||||||
|
|
||||||
|
|
||||||
void URIReader::init(){
|
void URIReader::init(){
|
||||||
|
handle = -1;
|
||||||
|
mapped = 0;
|
||||||
char workDir[512];
|
char workDir[512];
|
||||||
getcwd(workDir, 512);
|
getcwd(workDir, 512);
|
||||||
myURI = HTTP::URL(std::string("file://") + workDir + "/");
|
myURI = HTTP::URL(std::string("file://") + workDir + "/");
|
||||||
|
@ -22,6 +24,8 @@ namespace HTTP{
|
||||||
totalSize = std::string::npos;
|
totalSize = std::string::npos;
|
||||||
stateType = HTTP::Closed;
|
stateType = HTTP::Closed;
|
||||||
clearPointer = true;
|
clearPointer = true;
|
||||||
|
curPos = 0;
|
||||||
|
bufPos = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
URIReader::URIReader(){
|
URIReader::URIReader(){
|
||||||
|
@ -42,13 +46,15 @@ namespace HTTP{
|
||||||
|
|
||||||
/// Internal callback function, used to buffer data.
|
/// Internal callback function, used to buffer data.
|
||||||
void URIReader::dataCallback(const char *ptr, size_t size){
|
void URIReader::dataCallback(const char *ptr, size_t size){
|
||||||
std::string t = std::string(ptr, size);
|
allData.append(ptr, size);
|
||||||
allData.append(t.c_str(), size);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool URIReader::open(const HTTP::URL &uri){
|
bool URIReader::open(const HTTP::URL &uri){
|
||||||
|
close();
|
||||||
myURI = uri;
|
myURI = uri;
|
||||||
curPos = 0;
|
curPos = 0;
|
||||||
|
allData.truncate(0);
|
||||||
|
bufPos = 0;
|
||||||
|
|
||||||
if (!myURI.protocol.size() || myURI.protocol == "file"){
|
if (!myURI.protocol.size() || myURI.protocol == "file"){
|
||||||
if (!myURI.path.size() || myURI.path == "-"){
|
if (!myURI.path.size() || myURI.path == "-"){
|
||||||
|
@ -65,7 +71,7 @@ namespace HTTP{
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}else{
|
}else{
|
||||||
int handle = ::open(myURI.getFilePath().c_str(), O_RDONLY);
|
handle = ::open(myURI.getFilePath().c_str(), O_RDONLY);
|
||||||
if (handle == -1){
|
if (handle == -1){
|
||||||
FAIL_MSG("Opening file '%s' failed: %s", myURI.getFilePath().c_str(), strerror(errno));
|
FAIL_MSG("Opening file '%s' failed: %s", myURI.getFilePath().c_str(), strerror(errno));
|
||||||
stateType = HTTP::Closed;
|
stateType = HTTP::Closed;
|
||||||
|
@ -101,6 +107,7 @@ namespace HTTP{
|
||||||
stateType = HTTP::HTTP;
|
stateType = HTTP::HTTP;
|
||||||
|
|
||||||
// Send HEAD request to determine range request is supported, and get total length
|
// Send HEAD request to determine range request is supported, and get total length
|
||||||
|
downer.clearHeaders();
|
||||||
if (!downer.head(myURI) || !downer.isOk()){
|
if (!downer.head(myURI) || !downer.isOk()){
|
||||||
FAIL_MSG("Error getting URI info for '%s': %" PRIu32 " %s", myURI.getUrl().c_str(), downer.getStatusCode(), downer.getStatusText().c_str());
|
FAIL_MSG("Error getting URI info for '%s': %" PRIu32 " %s", myURI.getUrl().c_str(), downer.getStatusCode(), downer.getStatusText().c_str());
|
||||||
if (!downer.isOk()){
|
if (!downer.isOk()){
|
||||||
|
@ -141,6 +148,8 @@ namespace HTTP{
|
||||||
// seek to pos, return true if succeeded.
|
// seek to pos, return true if succeeded.
|
||||||
bool URIReader::seek(const uint64_t pos){
|
bool URIReader::seek(const uint64_t pos){
|
||||||
if (isSeekable()){
|
if (isSeekable()){
|
||||||
|
allData.truncate(0);
|
||||||
|
bufPos = 0;
|
||||||
if (stateType == HTTP::File){
|
if (stateType == HTTP::File){
|
||||||
curPos = pos;
|
curPos = pos;
|
||||||
return true;
|
return true;
|
||||||
|
@ -166,16 +175,8 @@ namespace HTTP{
|
||||||
|
|
||||||
/// Read all blocking function, which internally uses the Nonblocking function.
|
/// Read all blocking function, which internally uses the Nonblocking function.
|
||||||
void URIReader::readAll(char *&dataPtr, size_t &dataLen){
|
void URIReader::readAll(char *&dataPtr, size_t &dataLen){
|
||||||
size_t s = 0;
|
if (getSize() != std::string::npos){allData.allocate(getSize());}
|
||||||
char *tmp = 0;
|
while (!isEOF()){readSome(10046, *this);}
|
||||||
std::string t;
|
|
||||||
|
|
||||||
allData.allocate(68401307);
|
|
||||||
|
|
||||||
while (!isEOF()){
|
|
||||||
readSome(10046, *this);
|
|
||||||
// readSome(1048576, *this);
|
|
||||||
}
|
|
||||||
dataPtr = allData;
|
dataPtr = allData;
|
||||||
dataLen = allData.size();
|
dataLen = allData.size();
|
||||||
}
|
}
|
||||||
|
@ -210,22 +211,8 @@ namespace HTTP{
|
||||||
curPos += dataLen;
|
curPos += dataLen;
|
||||||
|
|
||||||
}else if (stateType == HTTP::HTTP){
|
}else if (stateType == HTTP::HTTP){
|
||||||
|
|
||||||
bool res = downer.continueNonBlocking(cb);
|
bool res = downer.continueNonBlocking(cb);
|
||||||
|
curPos = downer.const_data().size();
|
||||||
if (res){
|
|
||||||
if (downer.completed()){
|
|
||||||
MEDIUM_MSG("completed");
|
|
||||||
}else{
|
|
||||||
if (supportRangeRequest){
|
|
||||||
MEDIUM_MSG("do new range request, previous request not completed yet!, curpos: %zu, "
|
|
||||||
"length: %zu",
|
|
||||||
curPos, getSize());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
Util::sleep(10);
|
|
||||||
}
|
|
||||||
}else{// streaming mode
|
}else{// streaming mode
|
||||||
int s;
|
int s;
|
||||||
static int totaal = 0;
|
static int totaal = 0;
|
||||||
|
@ -241,88 +228,41 @@ namespace HTTP{
|
||||||
|
|
||||||
/// Readsome blocking function.
|
/// Readsome blocking function.
|
||||||
void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){
|
void URIReader::readSome(char *&dataPtr, size_t &dataLen, size_t wantedLen){
|
||||||
if (stateType == HTTP::File){
|
//Clear the buffer if we're finished with it
|
||||||
|
if (allData.size() && bufPos == allData.size()){
|
||||||
dataPtr = mapped + curPos;
|
allData.truncate(0);
|
||||||
|
bufPos = 0;
|
||||||
if (wantedLen < totalSize){
|
|
||||||
if ((wantedLen + curPos) > totalSize){
|
|
||||||
dataLen = totalSize - curPos; // restant
|
|
||||||
}else{
|
|
||||||
dataLen = wantedLen;
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
dataLen = totalSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
curPos += dataLen;
|
|
||||||
}else if (stateType == HTTP::HTTP){
|
|
||||||
|
|
||||||
dataLen = downer.data().size();
|
|
||||||
curPos += dataLen;
|
|
||||||
dataPtr = (char *)downer.data().data();
|
|
||||||
}else{
|
|
||||||
if (clearPointer){
|
|
||||||
rPtr.assign(0, 0);
|
|
||||||
clearPointer = false;
|
|
||||||
dataLen = 0;
|
|
||||||
rPtr.allocate(wantedLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
int s;
|
|
||||||
bool run = true;
|
|
||||||
while (downer.getSocket() && run){
|
|
||||||
if (downer.getSocket().spool()){
|
|
||||||
|
|
||||||
if (wantedLen < 8000){
|
|
||||||
s = downer.getSocket().Received().bytes(wantedLen);
|
|
||||||
}else{
|
|
||||||
s = downer.getSocket().Received().bytes(8000);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string buf = downer.getSocket().Received().remove(s);
|
|
||||||
rPtr.append(buf.c_str(), s);
|
|
||||||
|
|
||||||
dataLen += s;
|
|
||||||
curPos += s;
|
|
||||||
|
|
||||||
if (rPtr.size() >= wantedLen){
|
|
||||||
dataLen = rPtr.size();
|
|
||||||
dataPtr = rPtr;
|
|
||||||
// INFO_MSG("laatste stukje, datalen: %llu, wanted: %llu", dataLen,
|
|
||||||
// wantedLen); dataCallback(ptr, len);
|
|
||||||
clearPointer = true;
|
|
||||||
run = false;
|
|
||||||
}
|
|
||||||
//}
|
|
||||||
}else{
|
|
||||||
// INFO_MSG("data not yet available!");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if (!downer.getSocket()){
|
|
||||||
totalSize = curPos;
|
|
||||||
dataLen = rPtr.size();
|
|
||||||
//}
|
|
||||||
// INFO_MSG("size: %llu, datalen: %llu", totalSize, rPtr.size());
|
|
||||||
dataPtr = rPtr;
|
|
||||||
}
|
}
|
||||||
|
//Read more data if needed
|
||||||
|
while (allData.size() < wantedLen + bufPos && *this){
|
||||||
|
readSome(wantedLen - (allData.size() - bufPos), *this);
|
||||||
|
}
|
||||||
|
//Return wantedLen bytes if we have them
|
||||||
|
if (allData.size() >= wantedLen + bufPos){
|
||||||
|
dataPtr = allData + bufPos;
|
||||||
|
dataLen = wantedLen;
|
||||||
|
bufPos += wantedLen;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//Ok, we have a short count. Return the amount we actually got.
|
||||||
|
dataPtr = allData + bufPos;
|
||||||
|
dataLen = allData.size() - bufPos;
|
||||||
|
bufPos = allData.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
void URIReader::close(){
|
void URIReader::close(){
|
||||||
if (stateType == HTTP::File){
|
//Close downloader socket if open
|
||||||
if (mapped){
|
downer.getSocket().close();
|
||||||
munmap(mapped, totalSize);
|
//Unmap file if mapped
|
||||||
mapped = 0;
|
if (mapped){
|
||||||
totalSize = 0;
|
munmap(mapped, totalSize);
|
||||||
}
|
mapped = 0;
|
||||||
}else if (stateType == HTTP::Stream){
|
totalSize = 0;
|
||||||
downer.getSocket().close();
|
}
|
||||||
}else if (stateType == HTTP::HTTP){
|
//Close file handle if open
|
||||||
downer.getSocket().close();
|
if (handle != -1){
|
||||||
}else{
|
::close(handle);
|
||||||
// INFO_MSG("already closed");
|
handle = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,45 +275,31 @@ namespace HTTP{
|
||||||
maxLen = newMaxLen;
|
maxLen = newMaxLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool URIReader::isSeekable(){
|
bool URIReader::isSeekable() const{
|
||||||
if (stateType == HTTP::HTTP){
|
if (stateType == HTTP::HTTP){
|
||||||
|
|
||||||
if (supportRangeRequest && totalSize != std::string::npos){return true;}
|
if (supportRangeRequest && totalSize != std::string::npos){return true;}
|
||||||
}
|
}
|
||||||
|
|
||||||
return (stateType == HTTP::File);
|
return (stateType == HTTP::File);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool URIReader::isEOF(){
|
bool URIReader::isEOF() const{
|
||||||
if (stateType == HTTP::File){
|
if (stateType == HTTP::File){
|
||||||
return (curPos >= totalSize);
|
return (curPos >= totalSize);
|
||||||
}else if (stateType == HTTP::Stream){
|
}else if (stateType == HTTP::Stream){
|
||||||
if (!downer.getSocket()){return true;}
|
if (!downer.getSocket() && !downer.getSocket().Received().available(1)){return true;}
|
||||||
|
return false;
|
||||||
// if ((totalSize > 0) && (curPos >= totalSize)){return true;}
|
|
||||||
}else if (stateType == HTTP::HTTP){
|
}else if (stateType == HTTP::HTTP){
|
||||||
// INFO_MSG("iseof, C: %s, seekable: %s", C?"connected":"disconnected", isSeekable()?"yes":"no");
|
|
||||||
if (!downer.getSocket() && !downer.getSocket().Received().available(1) && !isSeekable()){
|
if (!downer.getSocket() && !downer.getSocket().Received().available(1) && !isSeekable()){
|
||||||
|
if (allData.size() && bufPos < allData.size()){return false;}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if ((totalSize > 0) && (curPos >= totalSize)){return true;}
|
if ((totalSize > 0 && curPos >= totalSize) || downer.completed() || (!totalSize && !downer.getSocket())){
|
||||||
|
if (allData.size() && bufPos < allData.size()){return false;}
|
||||||
// mark as complete if downer reports download is completed, or when socket connection is closed when totalsize is not known.
|
|
||||||
if (downer.completed() || (!totalSize && !downer.getSocket())){
|
|
||||||
// INFO_MSG("complete totalsize: %llu, %s", totalSize, downer.getSocket() ? "Connected" : "disconnected");
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}else{
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool URIReader::isGood() const{
|
|
||||||
return true;
|
return true;
|
||||||
/// TODO: Implement
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t URIReader::getPos(){return curPos;}
|
uint64_t URIReader::getPos(){return curPos;}
|
||||||
|
|
|
@ -45,40 +45,42 @@ namespace HTTP{
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
// Configuration setters
|
// Configuration setters
|
||||||
|
|
||||||
/// Progress callback, called whenever transfer stalls. Not called if unset.
|
/// Progress callback, called whenever transfer stalls. Not called if unset.
|
||||||
void onProgress(bool (*progressCallback)(uint8_t));
|
void onProgress(bool (*progressCallback)(uint8_t));
|
||||||
/// Sets minimum and maximum buffer size for read calls that use callbacks
|
/// Sets minimum and maximum buffer size for read calls that use callbacks
|
||||||
void setBounds(size_t minLen = 0, size_t maxLen = 0);
|
void setBounds(size_t minLen = 0, size_t maxLen = 0);
|
||||||
|
|
||||||
// Static getters
|
// Static getters
|
||||||
bool isSeekable(); /// Returns true if seeking is possible in this URI.
|
bool isSeekable() const; ///< Returns true if seeking is possible in this URI.
|
||||||
bool isEOF(); /// Returns true if the end of the URI has been reached.
|
bool isEOF() const; ///< Returns true if the end of the URI has been reached.
|
||||||
bool isGood() const; /// Returns true if more data can still be read.
|
operator bool() const {return !isEOF();} ///< Returns !isEOF()
|
||||||
uint64_t getPos(); /// Returns the current byte position in the URI.
|
uint64_t getPos(); ///< Returns the current byte position in the URI.
|
||||||
const HTTP::URL &getURI() const; /// Returns the most recently open URI, or the current working directory if not set.
|
const HTTP::URL &getURI() const; ///< Returns the most recently open URI, or the current working directory if not set.
|
||||||
size_t getSize() const; /// Returns the size of the currently open URI, if known. Returns std::string::npos if unknown size.
|
size_t getSize() const; ///< Returns the size of the currently open URI, if known. Returns std::string::npos if unknown size.
|
||||||
|
|
||||||
void (*httpBodyCallback)(const char *ptr, size_t size);
|
void (*httpBodyCallback)(const char *ptr, size_t size);
|
||||||
void dataCallback(const char *ptr, size_t size);
|
void dataCallback(const char *ptr, size_t size);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Internal state variables
|
// Internal state variables
|
||||||
bool (*cbProgress)(uint8_t); /// The progress callback, if any. Not called if set to a null pointer.
|
bool (*cbProgress)(uint8_t); ///< The progress callback, if any. Not called if set to a null pointer.
|
||||||
HTTP::URL myURI; /// The most recently open URI, or the current working directory if nothing has been opened yet.
|
HTTP::URL myURI; ///< The most recently open URI, or the current working directory if nothing has been opened yet.
|
||||||
size_t minLen; /// Minimum buffer size for dataCallback.
|
size_t minLen; ///< Minimum buffer size for dataCallback.
|
||||||
size_t maxLen; /// Maximum buffer size for dataCallback.
|
size_t maxLen; ///< Maximum buffer size for dataCallback.
|
||||||
size_t startPos; /// Start position for byte offsets.
|
size_t startPos; ///< Start position for byte offsets.
|
||||||
size_t endPos; /// End position for byte offsets.
|
size_t endPos; ///< End position for byte offsets.
|
||||||
size_t totalSize; /// Total size in bytes of the current URI. May be incomplete before read finished.
|
size_t totalSize; ///< Total size in bytes of the current URI. May be incomplete before read finished.
|
||||||
size_t curPos;
|
size_t curPos; ///< Current read position in source
|
||||||
char *mapped;
|
size_t bufPos; ///< Current read position in buffer
|
||||||
|
int handle; ///< Open file handle, if file-based.
|
||||||
|
char *mapped; ///< Memory-map of open file handle, if file-based.
|
||||||
bool supportRangeRequest;
|
bool supportRangeRequest;
|
||||||
Util::ResizeablePointer rPtr;
|
Util::ResizeablePointer rPtr;
|
||||||
Util::ResizeablePointer allData;
|
Util::ResizeablePointer allData;
|
||||||
bool clearPointer;
|
bool clearPointer;
|
||||||
URIType stateType; /// Holds the type of URI this is, for internal processing purposes.
|
URIType stateType; ///< Holds the type of URI this is, for internal processing purposes.
|
||||||
std::ifstream fReader; /// For file-based URIs, the ifstream used for the file.
|
HTTP::Downloader downer; ///< For HTTP(S)-based URIs, the Downloader instance used for the download.
|
||||||
HTTP::Downloader downer; /// For HTTP(S)-based URIs, the Downloader instance used for the download.
|
|
||||||
void init();
|
void init();
|
||||||
};
|
};
|
||||||
}// namespace HTTP
|
}// namespace HTTP
|
||||||
|
|
Loading…
Add table
Reference in a new issue