Fixed all semaphore instances not being cleaned up properly.

This commit is contained in:
Thulinma 2016-05-03 12:12:14 +02:00
parent dda9ed54b4
commit f4b296164e
14 changed files with 77 additions and 63 deletions

View file

@ -179,12 +179,16 @@ namespace Mist {
}
}
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.unlink();
}
///Cleans up any left-over data for the current stream
void inputBuffer::onCrash(){
WARN_MSG("BUffer crashed. Cleaning.");
WARN_MSG("Buffer crashed. Cleaning.");
streamName = config->getString("streamname");
char pageName[NAME_BUFFER_SIZE];
@ -194,23 +198,12 @@ namespace Mist {
for (long unsigned i = 0; i < 15; ++i){
unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024));
IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false);
tmp.master = false;
if (tmp.mapped){
tmp.master = true;
WARN_MSG("Wiping %s", std::string(baseName + (char)(i + (int)'A')).c_str());
memset(tmp.mapped, 0xFF, size);
}
}
//Wait five seconds to allow everyone to disconnect gracefully.
Util::wait(5000);
//Now delete those pages
for (long unsigned i = 0; i < 15; ++i){
unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024));
IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false);
tmp.master = true;
if (tmp.mapped){
WARN_MSG("Wiping %s some more", std::string(baseName + (char)(i + (int)'A')).c_str());
}
}
{
//Delete the live stream semaphore, if any.
@ -859,8 +852,8 @@ namespace Mist {
std::string strName = config->getString("streamname");
Util::sanitizeName(strName);
strName = strName.substr(0, (strName.find_first_of("+ ")));
IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName);
long long tmpNum;

View file

@ -17,7 +17,7 @@
#include <mist/tinythread.h>
#include <sys/stat.h>
#define SEM_TS_CLAIM "/MstTSIN%s"
/// \todo Implement this trigger equivalent...
@ -46,8 +46,9 @@ std::set<unsigned long> claimableThreads;
void parseThread(void * ignored) {
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
int tid = -1;
lock.wait();
@ -151,12 +152,14 @@ namespace Mist {
fclose(inFile);
}
#ifdef TSLIVE_INPUT
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
lock.wait();
threadTimer.clear();
claimableThreads.clear();
lock.post();
lock.unlink();
#endif
}
@ -394,8 +397,9 @@ namespace Mist {
//Check for and spawn threads here.
if (Util::bootSecs() - threadCheckTimer > 2) {
std::set<unsigned long> activeTracks = liveStream.getActiveTracks();
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
lock.wait();
for (std::set<unsigned long>::iterator it = activeTracks.begin(); it != activeTracks.end(); it++) {
if (threadTimer.count(*it) && ((Util::bootSecs() - threadTimer[*it]) > (2 * THREAD_TIMEOUT))) {
@ -422,8 +426,9 @@ namespace Mist {
}
void inputTS::finish() {
std::string semName = "MstInTSStreamClaim" + globalStreamName;
IPC::semaphore lock(semName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
int threadCount = 0;

View file

@ -20,7 +20,9 @@ int main(int argc, char * argv[]) {
#ifndef INPUT_NOLOCK
IPC::semaphore playerLock;
if (streamName.size()){
playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
@ -71,6 +73,7 @@ int main(int argc, char * argv[]) {
}
#ifndef INPUT_NOLOCK
playerLock.post();
playerLock.unlink();
playerLock.close();
#endif
}