Backported Pro shared_memory edits to OS edition

This commit is contained in:
Thulinma 2016-02-15 14:42:19 +01:00
parent 243595ff4d
commit 0d9108d1d6
2 changed files with 165 additions and 66 deletions

View file

@ -12,16 +12,24 @@
#include "shared_memory.h" #include "shared_memory.h"
#include "stream.h" #include "stream.h"
#include "procs.h" #include "procs.h"
#include "bitfields.h"
#include "timing.h" #include "timing.h"
#if defined(__CYGWIN__) || defined(_WIN32)
#include <windows.h>
#include <aclapi.h>
#include <accctrl.h>
#endif
namespace IPC { namespace IPC {
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
static std::map<std::string, sharedPage> preservedPages; static std::map<std::string, sharedPage> preservedPages;
void preservePage(std::string p){ void preservePage(std::string p) {
preservedPages[p].init(p, 0, false, false); preservedPages[p].init(p, 0, false, false);
} }
void releasePage(std::string p){ void releasePage(std::string p) {
preservedPages.erase(p); preservedPages.erase(p);
} }
#endif #endif
@ -107,7 +115,7 @@ namespace IPC {
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
return mySem != 0; return mySem != 0;
#else #else
return mySem != SEM_FAILED; return mySem && mySem != SEM_FAILED;
#endif #endif
} }
@ -121,23 +129,34 @@ namespace IPC {
void semaphore::open(const char * name, int oflag, mode_t mode, unsigned int value) { void semaphore::open(const char * name, int oflag, mode_t mode, unsigned int value) {
close(); close();
int timer = 0; int timer = 0;
while (!(*this) && timer++ < 10){ while (!(*this) && timer++ < 10) {
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
std::string semaName = "Global\\"; std::string semaName = "Global\\";
semaName += name; semaName += name;
if (oflag & O_CREAT){ if (oflag & O_CREAT) {
if (oflag & O_EXCL){ if (oflag & O_EXCL) {
//attempt opening, if succes, close handle and return false; //attempt opening, if succes, close handle and return false;
HANDLE tmpSem = OpenSemaphore(0, false, semaName.c_str()); HANDLE tmpSem = OpenMutex(SYNCHRONIZE, false, semaName.c_str());
if (tmpSem){ if (tmpSem) {
CloseHandle(tmpSem); CloseHandle(tmpSem);
mySem = 0; mySem = 0;
break; break;
} }
} }
mySem = CreateSemaphore(0, value, 1 , semaName.c_str()); SECURITY_ATTRIBUTES security = getSecurityAttributes();
}else{ mySem = CreateMutex(&security, true, semaName.c_str());
mySem = OpenSemaphore(0, false, semaName.c_str()); if (value){
ReleaseMutex(mySem);
}
} else {
mySem = OpenMutex(SYNCHRONIZE, false, semaName.c_str());
}
if (!(*this)) {
if (GetLastError() == ERROR_FILE_NOT_FOUND){//Error code 2
Util::wait(500);
} else {
break;
}
} }
#else #else
if (oflag & O_CREAT) { if (oflag & O_CREAT) {
@ -145,17 +164,16 @@ namespace IPC {
} else { } else {
mySem = sem_open(name, oflag); mySem = sem_open(name, oflag);
} }
#endif if (!(*this)) {
if (!(*this)){ if (errno == ENOENT) {
if (errno == ENOENT){
Util::wait(500); Util::wait(500);
}else{ } else {
break; break;
} }
} }
#endif
} }
if (!(*this)){ if (!(*this)) {
DEBUG_MSG(DLVL_VERYHIGH, "Attempt to open semaphore %s: %s", name, strerror(errno));
} }
myName = (char *)name; myName = (char *)name;
} }
@ -176,7 +194,7 @@ namespace IPC {
void semaphore::post() { void semaphore::post() {
if (*this) { if (*this) {
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
ReleaseSemaphore(mySem, 1, 0); ReleaseMutex(mySem);
#else #else
sem_post(mySem); sem_post(mySem);
#endif #endif
@ -202,7 +220,7 @@ namespace IPC {
int result; int result;
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
result = WaitForSingleObject(mySem, 0);//wait at most 1ms result = WaitForSingleObject(mySem, 0);//wait at most 1ms
if (result == 0x80){ if (result == 0x80) {
WARN_MSG("Consistency error caught on semaphore %s", myName.c_str()); WARN_MSG("Consistency error caught on semaphore %s", myName.c_str());
result = 0; result = 0;
} }
@ -217,7 +235,7 @@ namespace IPC {
int result; int result;
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
result = WaitForSingleObject(mySem, 1000);//wait at most 1s result = WaitForSingleObject(mySem, 1000);//wait at most 1s
if (result == 0x80){ if (result == 0x80) {
WARN_MSG("Consistency error caught on semaphore %s", myName.c_str()); WARN_MSG("Consistency error caught on semaphore %s", myName.c_str());
result = 0; result = 0;
} }
@ -267,12 +285,38 @@ namespace IPC {
} }
#if defined(__CYGWIN__) || defined(_WIN32)
SECURITY_ATTRIBUTES semaphore::getSecurityAttributes() {
///\todo We really should clean this up sometime probably
///We currently have everything static, because the result basically depends on everything
static SECURITY_ATTRIBUTES result;
static bool resultValid = false;
static SECURITY_DESCRIPTOR securityDescriptor;
if (resultValid) {
return result;
}
InitializeSecurityDescriptor(&securityDescriptor, SECURITY_DESCRIPTOR_REVISION);
if (!SetSecurityDescriptorDacl(&securityDescriptor, TRUE, NULL, FALSE)){
FAIL_MSG("Failed to set pSecurityDescriptor: %u", GetLastError());
return result;
}
result.nLength = sizeof(SECURITY_ATTRIBUTES);
result.lpSecurityDescriptor = &securityDescriptor;
result.bInheritHandle = FALSE;
resultValid = true;
return result;
}
#endif
///brief Creates a shared page ///brief Creates a shared page
///\param name_ The name of the page to be created ///\param name_ The name of the page to be created
///\param len_ The size to make the page ///\param len_ The size to make the page
///\param master_ Whether to create or merely open the page ///\param master_ Whether to create or merely open the page
///\param autoBackoff When only opening the page, wait for it to appear or fail ///\param autoBackoff When only opening the page, wait for it to appear or fail
sharedPage::sharedPage(std::string name_, unsigned int len_, bool master_, bool autoBackoff){ sharedPage::sharedPage(std::string name_, unsigned int len_, bool master_, bool autoBackoff) {
handle = 0; handle = 0;
len = 0; len = 0;
master = false; master = false;
@ -301,7 +345,7 @@ namespace IPC {
if (mapped && len) { if (mapped && len) {
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
//under Cygwin, the mapped location is shifted by 4 to contain the page size. //under Cygwin, the mapped location is shifted by 4 to contain the page size.
UnmapViewOfFile(mapped-4); UnmapViewOfFile(mapped - 4);
#else #else
munmap(mapped, len); munmap(mapped, len);
#endif #endif
@ -314,7 +358,7 @@ namespace IPC {
void sharedPage::close() { void sharedPage::close() {
unmap(); unmap();
if (handle > 0) { if (handle > 0) {
INSANE_MSG("Closing page %s in %s mode", name.c_str(), master?"master":"client"); INSANE_MSG("Closing page %s in %s mode", name.c_str(), master ? "master" : "client");
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
CloseHandle(handle); CloseHandle(handle);
#else #else
@ -352,11 +396,11 @@ namespace IPC {
master = master_; master = master_;
mapped = 0; mapped = 0;
if (name.size()) { if (name.size()) {
INSANE_MSG("Opening page %s in %s mode %s auto-backoff", name.c_str(), master?"master":"client", autoBackoff?"with":"without"); INSANE_MSG("Opening page %s in %s mode %s auto-backoff", name.c_str(), master ? "master" : "client", autoBackoff ? "with" : "without");
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
if (master) { if (master) {
//Under cygwin, all pages are 4 bytes longer than claimed. //Under cygwin, all pages are 4 bytes longer than claimed.
handle = CreateFileMappingA(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, len+4, name.c_str()); handle = CreateFileMappingA(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, len + 4, name.c_str());
} else { } else {
int i = 0; int i = 0;
do { do {
@ -377,10 +421,10 @@ namespace IPC {
return; return;
} }
//Under cygwin, the extra 4 bytes contain the real size of the page. //Under cygwin, the extra 4 bytes contain the real size of the page.
if (master){ if (master) {
((unsigned int*)mapped)[0] = len_; ((unsigned int *)mapped)[0] = len_;
}else{ } else {
len = ((unsigned int*)mapped)[0]; len = ((unsigned int *)mapped)[0];
} }
//Now shift by those 4 bytes. //Now shift by those 4 bytes.
mapped += 4; mapped += 4;
@ -400,7 +444,7 @@ namespace IPC {
} }
} }
if (handle == -1) { if (handle == -1) {
if (!master_ && autoBackoff){ if (!master_ && autoBackoff) {
FAIL_MSG("shm_open for page %s failed: %s", name.c_str(), strerror(errno)); FAIL_MSG("shm_open for page %s failed: %s", name.c_str(), strerror(errno));
} }
return; return;
@ -615,8 +659,8 @@ namespace IPC {
///\brief Sets the host of this connection ///\brief Sets the host of this connection
void statExchange::host(std::string name) { void statExchange::host(std::string name) {
if (name.size() < 16){ if (name.size() < 16) {
memset(data+32, 0, 16); memset(data + 32, 0, 16);
} }
memcpy(data + 32, name.c_str(), std::min((int)name.size(), 16)); memcpy(data + 32, name.c_str(), std::min((int)name.size(), 16));
} }
@ -629,7 +673,7 @@ namespace IPC {
///\brief Sets the name of the stream this user is viewing ///\brief Sets the name of the stream this user is viewing
void statExchange::streamName(std::string name) { void statExchange::streamName(std::string name) {
size_t splitChar = name.find_first_of("+ "); size_t splitChar = name.find_first_of("+ ");
if (splitChar != std::string::npos){ if (splitChar != std::string::npos) {
name[splitChar] = '+'; name[splitChar] = '+';
} }
memcpy(data + 48, name.c_str(), std::min((int)name.size(), 100)); memcpy(data + 48, name.c_str(), std::min((int)name.size(), 100));
@ -729,7 +773,7 @@ namespace IPC {
///\brief Creates the next page with the correct size ///\brief Creates the next page with the correct size
void sharedServer::newPage() { void sharedServer::newPage() {
sharedPage tmp(std::string(baseName.substr(1) + (char)(myPages.size() + (int)'A')), std::min(((8192 * 2)<< myPages.size()), (32 * 1024 * 1024)), true); sharedPage tmp(std::string(baseName.substr(1) + (char)(myPages.size() + (int)'A')), std::min(((8192 * 2) << myPages.size()), (32 * 1024 * 1024)), true);
myPages.insert(tmp); myPages.insert(tmp);
tmp.master = false; tmp.master = false;
DEBUG_MSG(DLVL_VERYHIGH, "Created a new page: %s", tmp.name.c_str()); DEBUG_MSG(DLVL_VERYHIGH, "Created a new page: %s", tmp.name.c_str());
@ -783,7 +827,7 @@ namespace IPC {
} }
semGuard tmpGuard(&mySemaphore); semGuard tmpGuard(&mySemaphore);
unsigned int id = 0; unsigned int id = 0;
unsigned int userCount=0; unsigned int userCount = 0;
unsigned int emptyCount = 0; unsigned int emptyCount = 0;
for (std::set<sharedPage>::iterator it = myPages.begin(); it != myPages.end(); it++) { for (std::set<sharedPage>::iterator it = myPages.begin(); it != myPages.end(); it++) {
if (!it->mapped || !it->len) { if (!it->mapped || !it->len) {
@ -795,15 +839,15 @@ namespace IPC {
while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) {
if (hasCounter) { if (hasCounter) {
if (it->mapped[offset] != 0) { if (it->mapped[offset] != 0) {
char * counter = it->mapped+offset; char * counter = it->mapped + offset;
//increase the count if needed //increase the count if needed
++userCount; ++userCount;
if (id >= amount) { if (id >= amount) {
amount = id + 1; amount = id + 1;
DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount);
} }
unsigned short tmpPID = *((unsigned short *)(it->mapped+1+offset+payLen-2)); unsigned short tmpPID = *((unsigned short *)(it->mapped + 1 + offset + payLen - 2));
if(!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127 || *counter == 254 || *counter == 255)){ if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127 || *counter == 254 || *counter == 255)) {
WARN_MSG("process disappeared, timing out. (pid %d)", tmpPID); WARN_MSG("process disappeared, timing out. (pid %d)", tmpPID);
*counter = 126; //if process is already dead, instant timeout. *counter = 126; //if process is already dead, instant timeout.
} }
@ -822,12 +866,12 @@ namespace IPC {
DEBUG_MSG(DLVL_WARN, "Client %u disconnect timed out", id); DEBUG_MSG(DLVL_WARN, "Client %u disconnect timed out", id);
break; break;
default: default:
#ifndef NOCRASHCHECK #ifndef NOCRASHCHECK
if (tmpPID){ if (tmpPID) {
if(*counter > 10 && *counter < 126 ){ if (*counter > 10 && *counter < 126) {
if(*counter < 30){ if (*counter < 30) {
if (*counter > 15){ if (*counter > 15) {
WARN_MSG("Process %d is unresponsive",tmpPID); WARN_MSG("Process %d is unresponsive", tmpPID);
} }
Util::Procs::Stop(tmpPID); //soft kill Util::Procs::Stop(tmpPID); //soft kill
} else { } else {
@ -836,7 +880,7 @@ namespace IPC {
} }
} }
} }
#endif #endif
break; break;
} }
if (*counter == 127 || *counter == 126 || *counter == 255 || *counter == 254) { if (*counter == 127 || *counter == 126 || *counter == 255 || *counter == 254) {
@ -885,16 +929,16 @@ namespace IPC {
offset += payLen + (hasCounter ? 1 : 0); offset += payLen + (hasCounter ? 1 : 0);
id ++; id ++;
} }
if(userCount==0) { if (userCount == 0) {
++emptyCount; ++emptyCount;
} else { } else {
emptyCount=0; emptyCount = 0;
} }
} }
if( emptyCount > 1){ if (emptyCount > 1) {
deletePage(); deletePage();
} else if( !emptyCount ){ } else if (!emptyCount) {
newPage(); newPage();
} }
@ -910,6 +954,7 @@ namespace IPC {
offsetOnPage = 0; offsetOnPage = 0;
} }
///\brief Copy constructor for sharedClients ///\brief Copy constructor for sharedClients
///\param rhs The client ro copy ///\param rhs The client ro copy
sharedClient::sharedClient(const sharedClient & rhs) { sharedClient::sharedClient(const sharedClient & rhs) {
@ -955,7 +1000,7 @@ namespace IPC {
///\param name The basename of the server to connect to ///\param name The basename of the server to connect to
///\param len The size of the payload to allocate ///\param len The size of the payload to allocate
///\param withCounter Whether or not this payload has a counter ///\param withCounter Whether or not this payload has a counter
sharedClient::sharedClient(std::string name, int len, bool withCounter) : baseName("/"+name), payLen(len), offsetOnPage(-1), hasCounter(withCounter) { sharedClient::sharedClient(std::string name, int len, bool withCounter) : baseName("/" + name), payLen(len), offsetOnPage(-1), hasCounter(withCounter) {
#ifdef __APPLE__ #ifdef __APPLE__
//note: O_CREAT is only needed for mac, probably //note: O_CREAT is only needed for mac, probably
mySemaphore.open(baseName.c_str(), O_RDWR | O_CREAT, 0); mySemaphore.open(baseName.c_str(), O_RDWR | O_CREAT, 0);
@ -966,6 +1011,7 @@ namespace IPC {
DEBUG_MSG(DLVL_FAIL, "Creating semaphore %s failed: %s", baseName.c_str(), strerror(errno)); DEBUG_MSG(DLVL_FAIL, "Creating semaphore %s failed: %s", baseName.c_str(), strerror(errno));
return; return;
} }
//Empty is used to compare for emptyness. This is not needed when the page uses a counter
char * empty = 0; char * empty = 0;
if (!hasCounter) { if (!hasCounter) {
empty = (char *)malloc(payLen * sizeof(char)); empty = (char *)malloc(payLen * sizeof(char));
@ -975,12 +1021,12 @@ namespace IPC {
} }
memset(empty, 0, payLen); memset(empty, 0, payLen);
} }
while (offsetOnPage == -1){ while (offsetOnPage == -1) {
{ {
semGuard tmpGuard(&mySemaphore); semGuard tmpGuard(&mySemaphore);
for (char i = 'A'; i <= 'Z'; i++) { for (char i = 'A'; i <= 'Z'; i++) {
myPage.init(baseName.substr(1) + i, (4096 << (i - 'A')), false, false); myPage.init(baseName.substr(1) + i, (4096 << (i - 'A')), false, false);
if (!myPage.mapped){ if (!myPage.mapped) {
break; break;
} }
int offset = 0; int offset = 0;
@ -989,7 +1035,7 @@ namespace IPC {
offsetOnPage = offset; offsetOnPage = offset;
if (hasCounter) { if (hasCounter) {
myPage.mapped[offset] = 1; myPage.mapped[offset] = 1;
*((unsigned short *)(myPage.mapped+1+offset+len-2))=getpid(); *((unsigned short *)(myPage.mapped + 1 + offset + len - 2)) = getpid();
} }
break; break;
} }
@ -1000,12 +1046,14 @@ namespace IPC {
} }
} }
} }
if (offsetOnPage == -1){ if (offsetOnPage == -1) {
Util::wait(500); Util::wait(500);
} }
} }
if (empty) {
free(empty); free(empty);
} }
}
///\brief The deconstructor ///\brief The deconstructor
sharedClient::~sharedClient() { sharedClient::~sharedClient() {
@ -1055,5 +1103,43 @@ namespace IPC {
} }
return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0)); return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0));
} }
userConnection::userConnection(char * _data) {
data = _data;
}
unsigned long userConnection::getTrackId(size_t offset) const {
if (offset >= SIMUL_TRACKS) {
WARN_MSG("Trying to get track id for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return 0;
}
return Bit::btohl(data + (offset * 6));
}
void userConnection::setTrackId(size_t offset, unsigned long trackId) const {
if (offset >= SIMUL_TRACKS) {
WARN_MSG("Trying to set track id for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return;
}
Bit::htobl(data + (offset * 6), trackId);
}
unsigned long userConnection::getKeynum(size_t offset) const {
if (offset >= SIMUL_TRACKS) {
WARN_MSG("Trying to get keynum for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return 0;
}
return Bit::btohs(data + (offset * 6) + 4);
}
void userConnection::setKeynum(size_t offset, unsigned long keynum) {
if (offset >= SIMUL_TRACKS) {
WARN_MSG("Trying to set keynum for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return;
}
Bit::htobs(data + (offset * 6) + 4, keynum);
}
} }

View file

@ -69,6 +69,8 @@ namespace IPC {
void unlink(); void unlink();
private: private:
#if defined(__CYGWIN__) || defined(_WIN32) #if defined(__CYGWIN__) || defined(_WIN32)
///\todo Maybe sometime implement anything else than 777
static SECURITY_ATTRIBUTES getSecurityAttributes();
HANDLE mySem; HANDLE mySem;
#else #else
sem_t * mySem; sem_t * mySem;
@ -228,4 +230,15 @@ namespace IPC {
///\brief Whether the payload has a counter, if so, it is added in front of the payload ///\brief Whether the payload has a counter, if so, it is added in front of the payload
bool hasCounter; bool hasCounter;
}; };
class userConnection {
public:
userConnection(char * _data);
unsigned long getTrackId(size_t offset) const;
void setTrackId(size_t offset, unsigned long trackId) const;
unsigned long getKeynum(size_t offset) const;
void setKeynum(size_t offset, unsigned long keynum);
private:
char * data;
};
} }