From 71dc32d2d381a30bae89367e532b2cb202353b91 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 28 Aug 2015 21:56:27 +0200 Subject: [PATCH] Added ability to break through semaphore locks to the controller monitoring thread. --- lib/shared_memory.cpp | 26 ++++++++++++++++++++++-- lib/shared_memory.h | 1 + src/controller/controller.cpp | 20 +++++++++++++++--- src/controller/controller_api.cpp | 1 - src/controller/controller_connectors.cpp | 7 ++++++- src/controller/controller_connectors.h | 2 +- src/controller/controller_streams.cpp | 17 +++++++--------- src/controller/controller_streams.h | 2 +- 8 files changed, 57 insertions(+), 19 deletions(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index e7d1f2aa..d2c34934 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -181,16 +181,38 @@ namespace IPC { } } - ///\brief Tries to wait for the semaphore, returns true if successfull, false otherwise + ///\brief Tries to wait for the semaphore, returns true if successful, false otherwise bool semaphore::tryWait() { - bool result; + int result; #if defined(__CYGWIN__) || defined(_WIN32) result = WaitForSingleObject(mySem, 0);//wait at most 1ms + if (result == 0x80){ + WARN_MSG("Consistency error caught on semaphore %s", myName); + result = 0; + } #else result = sem_trywait(mySem); #endif return (result == 0); } + + ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise + bool semaphore::tryWaitOneSecond() { + int result; +#if defined(__CYGWIN__) || defined(_WIN32) + result = WaitForSingleObject(mySem, 1000);//wait at most 1s + if (result == 0x80){ + WARN_MSG("Consistency error caught on semaphore %s", myName); + result = 0; + } +#else + struct timespec wt; + wt.tv_sec = 1; + wt.tv_nsec = 0; + result = sem_timedwait(mySem, &wt); +#endif + return (result == 0); + } ///\brief Closes the currently opened semaphore void semaphore::close() { diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 0ec1c796..81b2b162 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -64,6 +64,7 @@ namespace IPC { void post(); void wait(); bool tryWait(); + bool tryWaitOneSecond(); void close(); void unlink(); private: diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 7329e0e9..063c28ce 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -92,9 +92,23 @@ void statusMonitor(void * np){ //this scope prevents the configMutex from being locked constantly { tthread::lock_guard guard(Controller::configMutex); - Controller::CheckProtocols(Controller::Storage["config"]["protocols"], Controller::capabilities); - Controller::CheckAllStreams(Controller::Storage["streams"]); - //Controller::myConverter.updateStatus(); + bool changed = false; + //checks online protocols, reports changes to status + changed |= Controller::CheckProtocols(Controller::Storage["config"]["protocols"], Controller::capabilities); + //checks stream statuses, reports changes to status + changed |= Controller::CheckAllStreams(Controller::Storage["streams"]); + + //check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second... + IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){ + //that failed. We now unlock it, no matter what - and print a warning that it was stuck. + WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); + changed = true; + } + configLock.post(); + if (changed){ + Controller::writeConfig(); + } } Util::wait(5000);//wait at least 5 seconds } diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 0bd6eaf6..5de70999 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -186,7 +186,6 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ //Parse config and streams from the request. if (Request.isMember("config")){ Controller::checkConfig(Request["config"], Controller::Storage["config"]); - Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities); } if (Request.isMember("streams")){ Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]); diff --git a/src/controller/controller_connectors.cpp b/src/controller/controller_connectors.cpp index 80b59c4c..4351b78e 100644 --- a/src/controller/controller_connectors.cpp +++ b/src/controller/controller_connectors.cpp @@ -81,7 +81,8 @@ namespace Controller { ///\brief Checks current protocol configuration, updates state of enabled connectors if neccessary. ///\param p An object containing all protocols. ///\param capabilities An object containing the detected capabilities. - void CheckProtocols(JSON::Value & p, JSON::Value & capabilities){ + ///\returns True if any action was taken + bool CheckProtocols(JSON::Value & p, JSON::Value & capabilities){ std::set runningConns; // used for building args @@ -145,12 +146,14 @@ namespace Controller { } } + bool action = false; //shut down deleted/changed connectors std::map::iterator it; for (it = currentConnectors.begin(); it != currentConnectors.end(); it++){ if (!runningConns.count(it->first)){ if (Util::Procs::isActive(it->second)){ Log("CONF", "Stopping connector " + it->first); + action = true; Util::Procs::Stop(it->second); } currentConnectors.erase(it); @@ -162,6 +165,7 @@ namespace Controller { while (runningConns.size() && conf.is_active){ if (!currentConnectors.count(*runningConns.begin()) || !Util::Procs::isActive(currentConnectors[*runningConns.begin()])){ Log("CONF", "Starting connector: " + *runningConns.begin()); + action = true; // clear out old args for (i=0; i<15; i++){argarr[i] = 0;} // get args for this connector @@ -171,6 +175,7 @@ namespace Controller { } runningConns.erase(runningConns.begin()); } + return action; } } diff --git a/src/controller/controller_connectors.h b/src/controller/controller_connectors.h index a90164f7..7e8b4c18 100644 --- a/src/controller/controller_connectors.h +++ b/src/controller/controller_connectors.h @@ -6,6 +6,6 @@ namespace Controller { void UpdateProtocol(std::string protocol); /// Checks current protocol configuration, updates state of enabled connectors if neccesary. - void CheckProtocols(JSON::Value & p, JSON::Value & capabilities); + bool CheckProtocols(JSON::Value & p, JSON::Value & capabilities); } diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 78a6e947..01023c1e 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -91,7 +91,8 @@ namespace Controller { ///\brief Checks all streams, restoring if needed. ///\param data The stream configuration for the server. - void CheckAllStreams(JSON::Value & data){ + ///\returns True if the server status changed + bool CheckAllStreams(JSON::Value & data){ long long int currTime = Util::epoch(); for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){ checkStream(jit->first, jit->second); @@ -114,19 +115,15 @@ namespace Controller { jit->second["online"] = 1; } } + + //check for changes in config or streams static JSON::Value strlist; - bool changed = false; - if (strlist["config"] != Storage["config"]){ + if (strlist["config"] != Storage["config"] || strlist["streams"] != Storage["streams"]){ strlist["config"] = Storage["config"]; - changed = true; - } - if (strlist["streams"] != Storage["streams"]){ strlist["streams"] = Storage["streams"]; - changed = true; - } - if (changed){ - writeConfig(); + return true; } + return false; } void AddStreams(JSON::Value & in, JSON::Value & out){ diff --git a/src/controller/controller_streams.h b/src/controller/controller_streams.h index c9d3bb91..3a1c9abc 100644 --- a/src/controller/controller_streams.h +++ b/src/controller/controller_streams.h @@ -3,7 +3,7 @@ namespace Controller { bool streamsEqual(JSON::Value & one, JSON::Value & two); void checkStream(std::string name, JSON::Value & data); - void CheckAllStreams(JSON::Value & data); + bool CheckAllStreams(JSON::Value & data); void CheckStreams(JSON::Value & in, JSON::Value & out); void AddStreams(JSON::Value & in, JSON::Value & out);