From 68c87a44d0a82ae228aaac4c5850d34b06608724 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 8 Feb 2018 12:40:06 +0100 Subject: [PATCH 1/5] ResizeablePointer const fix --- lib/util.cpp | 4 ++-- lib/util.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/util.cpp b/lib/util.cpp index 9f03fa4a..57e71f9a 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -171,14 +171,14 @@ namespace Util{ maxSize = 0; } - bool ResizeablePointer::assign(void * p, uint32_t l){ + bool ResizeablePointer::assign(const void * p, uint32_t l){ if (!allocate(l)){return false;} memcpy(ptr, p, l); currSize = l; return true; } - bool ResizeablePointer::append(void * p, uint32_t l){ + bool ResizeablePointer::append(const void * p, uint32_t l){ if (!allocate(l+currSize)){return false;} memcpy(((char*)ptr)+currSize, p, l); currSize += l; diff --git a/lib/util.h b/lib/util.h index d721885f..212e5e28 100644 --- a/lib/util.h +++ b/lib/util.h @@ -21,8 +21,8 @@ namespace Util{ ResizeablePointer(); ~ResizeablePointer(); inline uint32_t& size(){return currSize;} - bool assign(void * p, uint32_t l); - bool append(void * p, uint32_t l); + bool assign(const void * p, uint32_t l); + bool append(const void * p, uint32_t l); bool allocate(uint32_t l); inline operator char*(){return (char*)ptr;} inline operator void*(){return ptr;} From 7ea42685a65944dab4e30e2b22600eecc14c76fb Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Tue, 27 Feb 2018 16:30:00 +0100 Subject: [PATCH 2/5] Edits to RelAccX library --- lib/defines.h | 3 +- lib/util.cpp | 179 +++++++++++++++++++++++++++++++++++++++----------- lib/util.h | 111 ++++++++++++++++++++----------- 3 files changed, 215 insertions(+), 78 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index 2a2fa24f..5774b6ee 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -17,7 +17,8 @@ #if DEBUG > -1 #include -#include +#include +#include #include "config.h" static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "MEDIUM", "HIGH", "VERYHIGH", "EXTREME", "INSANE", "DONTEVEN"}; diff --git a/lib/util.cpp b/lib/util.cpp index 57e71f9a..06f41db5 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -8,6 +8,7 @@ #include "procs.h" #include // errno, ENOENT, EEXIST #include +#include #include #include // stat #if defined(_WIN32) @@ -23,7 +24,8 @@ #define RAXHDR_DELETED *(uint64_t *)(p + 14) #define RAXHDR_PRESENT *(uint32_t *)(p + 22) #define RAXHDR_OFFSET *(uint16_t *)(p + 26) -#define RAX_REQDFIELDS_LEN 28 +#define RAXHDR_ENDPOS *(uint64_t*)(p + 28) +#define RAX_REQDFIELDS_LEN 36 namespace Util{ /// Helper function that cross-platform checks if a given directory exists. @@ -155,6 +157,7 @@ namespace Util{ /// 64-bits version of fseek uint64_t fseek(FILE *stream, uint64_t offset, int whence){ /// \TODO Windows implementation (e.g. _fseeki64 ?) + clearerr(stream); return fseeko(stream, offset, whence); } @@ -189,7 +192,7 @@ namespace Util{ if (l > maxSize){ void *tmp = realloc(ptr, l); if (!tmp){ - FAIL_MSG("Could not allocate %lu bytes of memory", l); + FAIL_MSG("Could not allocate %" PRIu32 " bytes of memory", l); return false; } ptr = tmp; @@ -319,7 +322,7 @@ namespace Util{ if (lineno && strlen(lineno)){ dprintf(out, " (%s) ", lineno); } - dprintf(out, "\n", lineno); + dprintf(out, "\n"); }else{ //could not be parsed as log string - print the whole thing dprintf(out, "%s\n", buf); @@ -329,8 +332,27 @@ namespace Util{ close(in); } + FieldAccX::FieldAccX(RelAccX * _src, RelAccXFieldData _field, char * _data) : src(_src), field(_field), data(_data) {} + + uint64_t FieldAccX::uint(size_t recordNo) const { + return src->getInt(field, recordNo); + } + + std::string FieldAccX::string(size_t recordNo) const { + return std::string(src->getPointer(field, recordNo), field.size) ; + } + + void FieldAccX::set(uint64_t val, size_t recordNo){ + src->setInt(field, val, recordNo); + } + + void FieldAccX::set(const std::string & val, size_t recordNo){ + char * place = src->getPointer(field, recordNo); + memcpy(place, val.data(), std::min((size_t)field.size, val.size())); + } + /// If waitReady is true (default), waits for isReady() to return true in 50ms sleep increments. - RelAccX::RelAccX(char *data, bool waitReady){ + RelAccX::RelAccX(char * data, bool waitReady){ if (!data){ p = 0; return; @@ -346,7 +368,7 @@ namespace Util{ p = 0; return; } - uint32_t dataOffset = 0; + uint64_t dataOffset = 0; while (offset < getOffset()){ const uint8_t sizeByte = p[offset]; const uint8_t nameLen = sizeByte >> 3; @@ -376,8 +398,7 @@ namespace Util{ default: WARN_MSG("Unhandled field data size!"); break; } fields[fieldName] = RelAccXFieldData(fieldType, size, dataOffset); - DONTEVEN_MSG("Field %s: type %u, size %lu, offset %lu", fieldName.c_str(), fieldType, size, - dataOffset); + DONTEVEN_MSG("Field %s: type %u, size %" PRIu32 ", offset %" PRIu64, fieldName.c_str(), fieldType, size, dataOffset); dataOffset += size; offset += nameLen + typeLen + 1; } @@ -396,11 +417,11 @@ namespace Util{ /// Gets the number of deleted records uint64_t RelAccX::getDeleted() const{return RAXHDR_DELETED;} - /// Gets the number of records present - /// Defaults to the record count if set to zero. - uint32_t RelAccX::getPresent() const{ - return (RAXHDR_PRESENT ? RAXHDR_PRESENT : RAXHDR_RECORDCNT); - } + //Gets the number of the last valid index + uint64_t RelAccX::getEndPos() const{return RAXHDR_ENDPOS;} + + ///Gets the number of fields per recrd + uint32_t RelAccX::getFieldCount() const{return fields.size();} /// Gets the offset from the structure start where records begin. uint16_t RelAccX::getOffset() const{return *(uint16_t *)(p + 26);} @@ -419,7 +440,7 @@ namespace Util{ // Check if the record has been deleted if (getDeleted() > recordNo){return false;} // Check if the record hasn't been created yet - if (recordNo - getDeleted() >= getPresent()){return false;} + if (recordNo >= getEndPos()){return false;} return true; } @@ -455,7 +476,10 @@ namespace Util{ /// Returns a null pointer if the field does not exist. char *RelAccX::getPointer(const std::string &name, uint64_t recordNo) const{ if (!fields.count(name)){return 0;} - const RelAccXFieldData &fd = fields.at(name); + return getPointer(fields.at(name), recordNo); + } + + char * RelAccX::getPointer(const RelAccXFieldData & fd, uint64_t recordNo) const{ return RECORD_POINTER; } @@ -463,9 +487,12 @@ namespace Util{ /// Returns 0 if the field does not exist or is not an integer type. uint64_t RelAccX::getInt(const std::string &name, uint64_t recordNo) const{ if (!fields.count(name)){return 0;} - const RelAccXFieldData &fd = fields.at(name); - char *ptr = RECORD_POINTER; - if ((fd.type & 0xF0) == RAX_UINT){// unsigned int + return getInt(fields.at(name), recordNo); + } + + uint64_t RelAccX::getInt(const RelAccXFieldData & fd, uint64_t recordNo) const{ + char * ptr = RECORD_POINTER; + if ((fd.type & 0xF0) == RAX_UINT){//unsigned int switch (fd.size){ case 1: return *(uint8_t *)ptr; case 2: return *(uint16_t *)ptr; @@ -488,28 +515,82 @@ namespace Util{ return 0; // Not an integer type, or not implemented } - std::string RelAccX::toPrettyString() const{ + + std::string RelAccX::toPrettyString(size_t indent) const{ std::stringstream r; uint64_t delled = getDeleted(); - uint64_t max = delled + getRCount(); - r << "RelAccX: " << getRCount() << " x " << getRSize() << "b @" << getOffset() << " (#" - << getDeleted() << " - #" << (getDeleted() + getPresent() - 1) << ")" << std::endl; + uint64_t max = getEndPos(); + r << std::string(indent, ' ') << "RelAccX: " << getRCount() << " x " << getRSize() << "b @" << getOffset() << " (#" << getDeleted() << " - #" << getEndPos()-1 << ")" << std::endl; for (uint64_t i = delled; i < max; ++i){ - r << " #" << i << ":" << std::endl; - for (std::map::const_iterator it = fields.begin(); - it != fields.end(); ++it){ - r << " " << it->first << ": "; + r << std::string(indent + 2, ' ') << "#" << i << ":" << std::endl; + for (std::map::const_iterator it = fields.begin(); it != fields.end(); ++it){ + r << std::string(indent + 4, ' ') << it->first << ": "; switch (it->second.type & 0xF0){ - case RAX_INT: r << (int64_t)getInt(it->first, i) << std::endl; break; - case RAX_UINT: r << getInt(it->first, i) << std::endl; break; - case RAX_STRING: r << getPointer(it->first, i) << std::endl; break; - default: r << "[UNIMPLEMENTED]" << std::endl; break; + case RAX_INT: r << (int64_t)getInt(it->first, i) << std::endl; break; + case RAX_UINT: r << getInt(it->first, i) << std::endl; break; + case RAX_STRING: r << getPointer(it->first, i) << std::endl; break; + case 0: { //RAX_NESTED + RelAccX n(getPointer(it->first, i), false); + if (n.isReady()){ + r << "Nested RelAccX:" << std::endl; + r << (n.getFieldCount() > 6 ? n.toPrettyString(indent + 6) : n.toCompactString(indent + 6)); + }else{ + r << "Nested RelAccX: not ready" << std::endl; + } + break; + } + case RAX_RAW: { + char * ptr = getPointer(it->first, i); + size_t sz = getSize(it->first, i); + size_t zeroCount = 0; + for (size_t j = 0; j < sz && j < 100 && zeroCount < 10; ++j){ + r << "0x" << std::hex << std::setw(2) << std::setfill('0') << (int)ptr[j] << std::dec << " "; + if (ptr[j] == 0x00){ + zeroCount++; + }else{ + zeroCount = 0; + } + } + r << std::endl; + break; + } + default: r << "[UNIMPLEMENTED]" << std::endl; break; } } } return r.str(); } + std::string RelAccX::toCompactString(size_t indent) const{ + std::stringstream r; + uint64_t delled = getDeleted(); + uint64_t max = getEndPos(); + r << std::string(indent, ' ') << "RelAccX: " << getRCount() << " x " << getRSize() << "b @" << getOffset() << " (#" << getDeleted() << " - #" << getEndPos()-1 << ")" << std::endl; + for (uint64_t i = delled; i < max; ++i){ + r << std::string(indent + 2, ' ') << "#" << i << ": "; + for (std::map::const_iterator it = fields.begin(); it != fields.end(); ++it){ + r << it->first << ": "; + switch (it->second.type & 0xF0){ + case RAX_INT: r << (int64_t)getInt(it->first, i) << ", "; break; + case RAX_UINT: r << getInt(it->first, i) << ", "; break; + case RAX_STRING: r << getPointer(it->first, i) << ", "; break; + case 0: { //RAX_NESTED + RelAccX n(getPointer(it->first, i), false); + if (n.isReady()){ + r << (n.getFieldCount() > 6 ? n.toPrettyString(indent + 2) : n.toCompactString(indent + 2)); + }else{ + r << "Nested RelAccX not ready" << std::endl; + } + break; + } + default: r << "[UNIMPLEMENTED], "; break; + } + } + r << std::endl; + } + return r.str(); + } + /// Returns the default size in bytes of the data component of a field type number. /// Returns zero if not implemented, unknown or the type has no default. uint32_t RelAccX::getDefaultSize(uint8_t fType){ @@ -532,8 +613,7 @@ namespace Util{ return; } if (!name.size() || name.size() > 31){ - WARN_MSG("Attempting to add a field with illegal name: %s (%u chars)", name.c_str(), - name.size()); + WARN_MSG("Attempting to add a field with illegal name: %s (%zu chars)", name.c_str(), name.size()); return; } // calculate fLen if missing @@ -636,9 +716,12 @@ namespace Util{ WARN_MSG("Setting non-existent integer %s", name.c_str()); return; } - const RelAccXFieldData &fd = fields.at(name); - char *ptr = RECORD_POINTER; - if ((fd.type & 0xF0) == RAX_UINT){// unsigned int + return setInt(fields.at(name), val, recordNo); + } + + void RelAccX::setInt(const RelAccXFieldData & fd, uint64_t val, uint64_t recordNo){ + char * ptr = RECORD_POINTER; + if ((fd.type & 0xF0) == RAX_UINT){//unsigned int switch (fd.size){ case 1: *(uint8_t *)ptr = val; return; case 2: *(uint16_t *)ptr = val; return; @@ -658,7 +741,18 @@ namespace Util{ default: WARN_MSG("Unimplemented integer size %u", fd.size); return; } } - WARN_MSG("Setting non-integer %s", name.c_str()); + WARN_MSG("Setting non-integer field (%u) to integer value!", fd.type); + } + + void RelAccX::setInts(const std::string & name, uint64_t * values, size_t len){ + if (!fields.count(name)){ + WARN_MSG("Setting non-existent integer %s", name.c_str()); + return; + } + const RelAccXFieldData & fd = fields.at(name); + for (uint64_t recordNo = 0; recordNo < len; recordNo++){ + setInt(fd, values[recordNo], recordNo); + } } /// Updates the deleted record counter, the start position and the present record counter, @@ -674,6 +768,7 @@ namespace Util{ if (recsPresent >= amount){ recsPresent -= amount; // decrease records present }else{ + WARN_MSG("Depleting recordCount!"); recsPresent = 0; } } @@ -683,13 +778,21 @@ namespace Util{ /// If the records present counter would be pushed past the record counter by this function, sets /// it to zero, defaulting it to the record count for all relevant purposes. void RelAccX::addRecords(uint32_t amount){ - uint32_t &recsPresent = RAXHDR_PRESENT; - uint32_t &recordsCount = RAXHDR_RECORDCNT; - if (recsPresent + amount > recordsCount){ + uint32_t & recsPresent = RAXHDR_PRESENT; + uint32_t & recordsCount = RAXHDR_RECORDCNT; + uint64_t & recordEndPos = RAXHDR_ENDPOS; + if (recsPresent+amount > recordsCount){ + BACKTRACE + WARN_MSG("Exceeding recordCount (%d [%d + %d] > %d)", recsPresent + amount, recsPresent, amount, recordsCount); recsPresent = 0; }else{ recsPresent += amount; } + recordEndPos += amount; + } + + FieldAccX RelAccX::getFieldAccX(const std::string & fName){ + return FieldAccX(this, fields.at(fName), p + getOffset()); } } diff --git a/lib/util.h b/lib/util.h index 212e5e28..13bd6f9e 100644 --- a/lib/util.h +++ b/lib/util.h @@ -1,8 +1,11 @@ #pragma once -#include -#include #include #include +#include +#include +#include +#include +#include namespace Util{ bool isDirectory(const std::string &path); @@ -15,6 +18,9 @@ namespace Util{ uint64_t ftell(FILE *stream); uint64_t fseek(FILE *stream, uint64_t offset, int whence); + //Forward declaration + class FieldAccX; + /// Helper class that maintains a resizeable pointer and will free it upon deletion of the class. class ResizeablePointer{ public: @@ -82,6 +88,7 @@ namespace Util{ /// 8 bytes records deleted - amount of records no longer present /// 4 bytes records present - amount of record currently present /// 2 bytes record offset + /// 8 bytes record endpos - index after the last valid record in the buffer /// @field_offset: offset-field_offset bytes fields: /// 5 bits field name len (< 32), 3 bits type len (1-5) /// len bytes field name string (< 32 bytes) @@ -102,44 +109,70 @@ namespace Util{ /// Setting reload means the writer needed to change fields, and the pointer should be closed and /// re-opened through outside means (e.g. closing and re-opening the containing shm page). class RelAccX{ - public: - RelAccX(char *data, bool waitReady = true); - // Read-only functions: - uint32_t getRCount() const; - uint32_t getRSize() const; - uint16_t getOffset() const; - uint32_t getStartPos() const; - uint64_t getDeleted() const; - uint32_t getPresent() const; - bool isReady() const; - bool isExit() const; - bool isReload() const; - bool isRecordAvailable(uint64_t recordNo) const; - uint32_t getRecordPosition(uint64_t recordNo) const; - uint32_t getSize(const std::string &name, uint64_t recordNo = 0) const; - char *getPointer(const std::string &name, uint64_t recordNo = 0) const; - uint64_t getInt(const std::string &name, uint64_t recordNo = 0) const; - std::string toPrettyString() const; - // Read-write functions: - void addField(const std::string &name, uint8_t fType, uint32_t fLen = 0); - void setRCount(uint32_t count); - void setStartPos(uint32_t n); - void setDeleted(uint64_t n); - void setPresent(uint32_t n); - void setReady(); - void setExit(); - void setReload(); - void setString(const std::string &name, const std::string &val, uint64_t recordNo = 0); - void setInt(const std::string &name, uint64_t val, uint64_t recordNo = 0); - void deleteRecords(uint32_t amount); - void addRecords(uint32_t amount); + public: + RelAccX(char * data = NULL, bool waitReady = true); + //Read-only functions: + uint32_t getRCount() const; + uint32_t getRSize() const; + uint16_t getOffset() const; + uint32_t getStartPos() const; + uint64_t getDeleted() const; + uint64_t getEndPos() const; + size_t getPresent() const; + uint32_t getFieldCount() const; + bool isReady() const; + bool isExit() const; + bool isReload() const; + bool isRecordAvailable(uint64_t recordNo) const; + uint32_t getRecordPosition(uint64_t recordNo) const; + uint32_t getSize(const std::string & name, uint64_t recordNo=0) const; - protected: - static uint32_t getDefaultSize(uint8_t fType); + char * getPointer(const std::string & name, uint64_t recordNo=0) const; + char * getPointer(const RelAccXFieldData & fd, uint64_t recordNo=0) const; + + uint64_t getInt(const std::string & name, uint64_t recordNo=0) const; + uint64_t getInt(const RelAccXFieldData & fd, uint64_t recordNo=0) const; - private: - char *p; - std::map fields; + std::string toPrettyString(size_t indent = 0) const; + std::string toCompactString(size_t indent = 0) const; + //Read-write functions: + void addField(const std::string & name, uint8_t fType, uint32_t fLen=0); + void setRCount(uint32_t count); + void setStartPos(uint32_t n); + void setDeleted(uint64_t n); + void setPresent(uint32_t n); + void setReady(); + void setExit(); + void setReload(); + void setString(const std::string & name, const std::string & val, uint64_t recordNo=0); + void setInt(const std::string & name, uint64_t val, uint64_t recordNo=0); + void setInt(const RelAccXFieldData & fd, uint64_t val, uint64_t recordNo=0); + void setInts(const std::string & name, uint64_t * values, size_t len); + void deleteRecords(uint32_t amount); + void addRecords(uint32_t amount); + + void minimalFrom(const RelAccX & src); + void copyFieldsFrom(const RelAccX & src, bool minimal = false); + void flowFrom(const RelAccX & src); + + FieldAccX getFieldAccX(const std::string & fName); + protected: + static uint32_t getDefaultSize(uint8_t fType); + std::map fields; + private: + char * p; + }; + + class FieldAccX { + public: + FieldAccX(RelAccX * _src = NULL, RelAccXFieldData _field = RelAccXFieldData(), char * _data = NULL); + uint64_t uint(size_t recordNo) const; + std::string string(size_t recordNo) const; + void set(uint64_t val, size_t recordNo = 0); + void set(const std::string & val, size_t recordNo = 0); + private: + RelAccX * src; + RelAccXFieldData field; + char * data; }; } - From 12d18bd7c5139db4b8ee00d6c705dc0b1c02e2be Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 13 Mar 2018 22:39:54 +0100 Subject: [PATCH 3/5] RelAccX improvements --- lib/util.cpp | 41 +++++------------------------------------ lib/util.h | 6 ++---- 2 files changed, 7 insertions(+), 40 deletions(-) diff --git a/lib/util.cpp b/lib/util.cpp index 06f41db5..5e2106ab 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -411,13 +411,10 @@ namespace Util{ /// Gets the size in bytes of a single record in the structure. uint32_t RelAccX::getRSize() const{return RAXHDR_RECORDSIZE;} - /// Gets the position in the records where the entries start - uint32_t RelAccX::getStartPos() const{return RAXHDR_STARTPOS;} - /// Gets the number of deleted records uint64_t RelAccX::getDeleted() const{return RAXHDR_DELETED;} - //Gets the number of the last valid index + /// Gets the number of the last valid index uint64_t RelAccX::getEndPos() const{return RAXHDR_ENDPOS;} ///Gets the number of fields per recrd @@ -662,15 +659,11 @@ namespace Util{ /// Sets the record counter to the given value. void RelAccX::setRCount(uint32_t count){RAXHDR_RECORDCNT = count;} - /// Sets the position in the records where the entries start - void RelAccX::setStartPos(uint32_t n){RAXHDR_STARTPOS = n;} - /// Sets the number of deleted records void RelAccX::setDeleted(uint64_t n){RAXHDR_DELETED = n;} - /// Sets the number of records present - /// Defaults to the record count if set to zero. - void RelAccX::setPresent(uint32_t n){RAXHDR_PRESENT = n;} + /// Sets the number of the last valid index + void RelAccX::setEndPos(uint64_t n){RAXHDR_ENDPOS = n;} /// Sets the ready flag. /// After calling this function, addField() may no longer be called. @@ -757,38 +750,14 @@ namespace Util{ /// Updates the deleted record counter, the start position and the present record counter, /// shifting the ring buffer start position forward without moving the ring buffer end position. - /// If the records present counter would be pushed into the negative by this function, sets it to - /// zero, defaulting it to the record count for all relevant purposes. void RelAccX::deleteRecords(uint32_t amount){ - uint32_t &startPos = RAXHDR_STARTPOS; - uint64_t &deletedRecs = RAXHDR_DELETED; - uint32_t &recsPresent = RAXHDR_PRESENT; - startPos += amount; // update start position - deletedRecs += amount; // update deleted record counter - if (recsPresent >= amount){ - recsPresent -= amount; // decrease records present - }else{ - WARN_MSG("Depleting recordCount!"); - recsPresent = 0; - } + RAXHDR_DELETED += amount; // update deleted record counter } /// Updates the present record counter, shifting the ring buffer end position forward without /// moving the ring buffer start position. - /// If the records present counter would be pushed past the record counter by this function, sets - /// it to zero, defaulting it to the record count for all relevant purposes. void RelAccX::addRecords(uint32_t amount){ - uint32_t & recsPresent = RAXHDR_PRESENT; - uint32_t & recordsCount = RAXHDR_RECORDCNT; - uint64_t & recordEndPos = RAXHDR_ENDPOS; - if (recsPresent+amount > recordsCount){ - BACKTRACE - WARN_MSG("Exceeding recordCount (%d [%d + %d] > %d)", recsPresent + amount, recsPresent, amount, recordsCount); - recsPresent = 0; - }else{ - recsPresent += amount; - } - recordEndPos += amount; + RAXHDR_ENDPOS += amount; } FieldAccX RelAccX::getFieldAccX(const std::string & fName){ diff --git a/lib/util.h b/lib/util.h index 13bd6f9e..a3b9835d 100644 --- a/lib/util.h +++ b/lib/util.h @@ -70,6 +70,7 @@ namespace Util{ #define RAX_64STRING 0x32 #define RAX_128STRING 0x33 #define RAX_256STRING 0x34 + #define RAX_512STRING 0x35 #define RAX_RAW 0x40 #define RAX_256RAW 0x44 #define RAX_512RAW 0x45 @@ -115,10 +116,8 @@ namespace Util{ uint32_t getRCount() const; uint32_t getRSize() const; uint16_t getOffset() const; - uint32_t getStartPos() const; uint64_t getDeleted() const; uint64_t getEndPos() const; - size_t getPresent() const; uint32_t getFieldCount() const; bool isReady() const; bool isExit() const; @@ -138,9 +137,8 @@ namespace Util{ //Read-write functions: void addField(const std::string & name, uint8_t fType, uint32_t fLen=0); void setRCount(uint32_t count); - void setStartPos(uint32_t n); void setDeleted(uint64_t n); - void setPresent(uint32_t n); + void setEndPos(uint64_t n); void setReady(); void setExit(); void setReload(); From bea8678df95e9496f26074e570e0e2ec339f44e3 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 9 Feb 2018 13:44:53 +0100 Subject: [PATCH 4/5] Added sharedPage::exists() call --- lib/shared_memory.cpp | 12 ++++++++++++ lib/shared_memory.h | 2 ++ 2 files changed, 14 insertions(+) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index cb8e81c6..bcb3405e 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -348,6 +348,18 @@ namespace IPC { close(); } + /// Returns true if the open file still exists. + /// Not implemented under Windows. + bool sharedPage::exists(){ +#ifdef SHM_ENABLED + struct stat sb; + if (fstat(handle, &sb)){return false;} + return (sb.st_nlink > 0); +#else + return true; +#endif + } + #ifdef SHM_ENABLED ///\brief Unmaps a shared page if allowed void sharedPage::unmap() { diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 5e329e68..f615acdd 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -109,6 +109,7 @@ namespace IPC { bool operator < (const sharedFile & rhs) const { return name < rhs.name; } + bool exists(){return true;} void close(); void unmap(); ///\brief The fd handle of the opened shared file @@ -143,6 +144,7 @@ namespace IPC { } void unmap(); void close(); + bool exists(); #if defined(__CYGWIN__) || defined(_WIN32) ///\brief The handle of the opened shared memory page HANDLE handle; From 74baf8d4a40e1a726fcc03d080e793555edfbd01 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 20 Mar 2018 14:22:04 +0100 Subject: [PATCH 5/5] Added Output::disconnect() call --- src/output/output.cpp | 50 ++++++++++++++++++++++++++++++------------- src/output/output.h | 1 + 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index f79cffce..12da4359 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -123,6 +123,7 @@ namespace Mist{ /// The standard implementation will set isInitialized to false and close the client connection, /// thus causing the process to exit cleanly. void Output::onFail(){ + MEDIUM_MSG("onFail"); isInitialized = false; wantRequest = true; parseData= false; @@ -131,6 +132,7 @@ namespace Mist{ } void Output::initialize(){ + MEDIUM_MSG("initialize"); if (isInitialized){ return; } @@ -140,7 +142,6 @@ namespace Mist{ if (streamName.size() < 1){ return; //abort - no stream to initialize... } - isInitialized = true; reconnect(); //if the connection failed, fail if (streamName.size() < 1){ @@ -176,6 +177,21 @@ namespace Mist{ return false; } + /// Disconnects from all stat/user-related shared structures. + void Output::disconnect(){ + MEDIUM_MSG("disconnect"); + if (statsPage.getData()){ + statsPage.finish(); + myConn.resetCounter(); + } + if (nProxy.userClient.getData()){ + nProxy.userClient.finish(); + } + isInitialized = false; + myMeta.reset(); + nProxy.metaPages.clear(); + } + /// Connects or reconnects to the stream. /// Assumes streamName class member has been set already. /// Will start input if not currently active, calls onFail() if this does not succeed. @@ -197,12 +213,7 @@ namespace Mist{ return; } } - if (statsPage.getData()){ - statsPage.finish(); - } - if (nProxy.userClient.getData()){ - nProxy.userClient.finish(); - } + disconnect(); nProxy.streamName = streamName; char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); @@ -217,13 +228,13 @@ namespace Mist{ } char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); - nProxy.metaPages.clear(); nProxy.metaPages[0].init(pageId, DEFAULT_STRM_PAGE_SIZE); if (!nProxy.metaPages[0].mapped){ FAIL_MSG("Could not connect to data for %s", streamName.c_str()); onFail(); return; } + isInitialized = true; statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); stats(true); updateMeta(); @@ -245,7 +256,7 @@ namespace Mist{ void Output::selectDefaultTracks(){ if (!isInitialized){ initialize(); - return; + if (!isInitialized){return;} } //check which tracks don't actually exist std::set toRemove; @@ -1058,8 +1069,11 @@ namespace Mist{ nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); if (!nProxy.userClient.getData()){ WARN_MSG("Player connection failure - aborting output"); - onFinish(); - myConn.close(); + if (!onFinish()){ + myConn.close(); + }else{ + disconnect(); + } return; } } @@ -1068,14 +1082,20 @@ namespace Mist{ waitForStreamPushReady(); if (!nProxy.userClient.isAlive()){ WARN_MSG("Failed to wait for buffer, aborting incoming push"); - onFinish(); - myConn.close(); + if (!onFinish()){ + myConn.close(); + }else{ + disconnect(); + } return; } }else{ INFO_MSG("Received disconnect request from input"); - onFinish(); - myConn.close(); + if (!onFinish()){ + myConn.close(); + }else{ + disconnect(); + } return; } } diff --git a/src/output/output.h b/src/output/output.h index 1f3a536e..d58cdfae 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -69,6 +69,7 @@ namespace Mist { return false; } void reconnect(); + void disconnect(); virtual void initialize(); virtual void sendHeader(); virtual void onFail();