Merge branch 'development' into LTS_development
This commit is contained in:
commit
d6922c92f5
8 changed files with 57 additions and 19 deletions
|
@ -181,17 +181,39 @@ namespace IPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///\brief Tries to wait for the semaphore, returns true if successfull, false otherwise
|
///\brief Tries to wait for the semaphore, returns true if successful, false otherwise
|
||||||
bool semaphore::tryWait() {
|
bool semaphore::tryWait() {
|
||||||
bool result;
|
int result;
|
||||||
#if defined(__CYGWIN__) || defined(_WIN32)
|
#if defined(__CYGWIN__) || defined(_WIN32)
|
||||||
result = WaitForSingleObject(mySem, 0);//wait at most 1ms
|
result = WaitForSingleObject(mySem, 0);//wait at most 1ms
|
||||||
|
if (result == 0x80){
|
||||||
|
WARN_MSG("Consistency error caught on semaphore %s", myName);
|
||||||
|
result = 0;
|
||||||
|
}
|
||||||
#else
|
#else
|
||||||
result = sem_trywait(mySem);
|
result = sem_trywait(mySem);
|
||||||
#endif
|
#endif
|
||||||
return (result == 0);
|
return (result == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise
|
||||||
|
bool semaphore::tryWaitOneSecond() {
|
||||||
|
int result;
|
||||||
|
#if defined(__CYGWIN__) || defined(_WIN32)
|
||||||
|
result = WaitForSingleObject(mySem, 1000);//wait at most 1s
|
||||||
|
if (result == 0x80){
|
||||||
|
WARN_MSG("Consistency error caught on semaphore %s", myName);
|
||||||
|
result = 0;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
struct timespec wt;
|
||||||
|
wt.tv_sec = 1;
|
||||||
|
wt.tv_nsec = 0;
|
||||||
|
result = sem_timedwait(mySem, &wt);
|
||||||
|
#endif
|
||||||
|
return (result == 0);
|
||||||
|
}
|
||||||
|
|
||||||
///\brief Closes the currently opened semaphore
|
///\brief Closes the currently opened semaphore
|
||||||
void semaphore::close() {
|
void semaphore::close() {
|
||||||
if (*this) {
|
if (*this) {
|
||||||
|
|
|
@ -64,6 +64,7 @@ namespace IPC {
|
||||||
void post();
|
void post();
|
||||||
void wait();
|
void wait();
|
||||||
bool tryWait();
|
bool tryWait();
|
||||||
|
bool tryWaitOneSecond();
|
||||||
void close();
|
void close();
|
||||||
void unlink();
|
void unlink();
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -109,9 +109,23 @@ void statusMonitor(void * np){
|
||||||
//this scope prevents the configMutex from being locked constantly
|
//this scope prevents the configMutex from being locked constantly
|
||||||
{
|
{
|
||||||
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
|
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
|
||||||
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], Controller::capabilities);
|
bool changed = false;
|
||||||
Controller::CheckAllStreams(Controller::Storage["streams"]);
|
//checks online protocols, reports changes to status
|
||||||
//Controller::myConverter.updateStatus();
|
changed |= Controller::CheckProtocols(Controller::Storage["config"]["protocols"], Controller::capabilities);
|
||||||
|
//checks stream statuses, reports changes to status
|
||||||
|
changed |= Controller::CheckAllStreams(Controller::Storage["streams"]);
|
||||||
|
|
||||||
|
//check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second...
|
||||||
|
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
|
if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){
|
||||||
|
//that failed. We now unlock it, no matter what - and print a warning that it was stuck.
|
||||||
|
WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config.");
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
configLock.post();
|
||||||
|
if (changed){
|
||||||
|
Controller::writeConfig();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Util::wait(5000);//wait at least 5 seconds
|
Util::wait(5000);//wait at least 5 seconds
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,7 +194,6 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
|
||||||
//Parse config and streams from the request.
|
//Parse config and streams from the request.
|
||||||
if (Request.isMember("config")){
|
if (Request.isMember("config")){
|
||||||
Controller::checkConfig(Request["config"], Controller::Storage["config"]);
|
Controller::checkConfig(Request["config"], Controller::Storage["config"]);
|
||||||
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities);
|
|
||||||
}
|
}
|
||||||
if (Request.isMember("streams")){
|
if (Request.isMember("streams")){
|
||||||
Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);
|
Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);
|
||||||
|
|
|
@ -81,7 +81,8 @@ namespace Controller {
|
||||||
///\brief Checks current protocol configuration, updates state of enabled connectors if neccessary.
|
///\brief Checks current protocol configuration, updates state of enabled connectors if neccessary.
|
||||||
///\param p An object containing all protocols.
|
///\param p An object containing all protocols.
|
||||||
///\param capabilities An object containing the detected capabilities.
|
///\param capabilities An object containing the detected capabilities.
|
||||||
void CheckProtocols(JSON::Value & p, JSON::Value & capabilities){
|
///\returns True if any action was taken
|
||||||
|
bool CheckProtocols(JSON::Value & p, JSON::Value & capabilities){
|
||||||
std::set<std::string> runningConns;
|
std::set<std::string> runningConns;
|
||||||
|
|
||||||
// used for building args
|
// used for building args
|
||||||
|
@ -145,12 +146,14 @@ namespace Controller {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool action = false;
|
||||||
//shut down deleted/changed connectors
|
//shut down deleted/changed connectors
|
||||||
std::map<std::string, pid_t>::iterator it;
|
std::map<std::string, pid_t>::iterator it;
|
||||||
for (it = currentConnectors.begin(); it != currentConnectors.end(); it++){
|
for (it = currentConnectors.begin(); it != currentConnectors.end(); it++){
|
||||||
if (!runningConns.count(it->first)){
|
if (!runningConns.count(it->first)){
|
||||||
if (Util::Procs::isActive(it->second)){
|
if (Util::Procs::isActive(it->second)){
|
||||||
Log("CONF", "Stopping connector " + it->first);
|
Log("CONF", "Stopping connector " + it->first);
|
||||||
|
action = true;
|
||||||
Util::Procs::Stop(it->second);
|
Util::Procs::Stop(it->second);
|
||||||
}
|
}
|
||||||
currentConnectors.erase(it);
|
currentConnectors.erase(it);
|
||||||
|
@ -162,6 +165,7 @@ namespace Controller {
|
||||||
while (runningConns.size() && conf.is_active){
|
while (runningConns.size() && conf.is_active){
|
||||||
if (!currentConnectors.count(*runningConns.begin()) || !Util::Procs::isActive(currentConnectors[*runningConns.begin()])){
|
if (!currentConnectors.count(*runningConns.begin()) || !Util::Procs::isActive(currentConnectors[*runningConns.begin()])){
|
||||||
Log("CONF", "Starting connector: " + *runningConns.begin());
|
Log("CONF", "Starting connector: " + *runningConns.begin());
|
||||||
|
action = true;
|
||||||
// clear out old args
|
// clear out old args
|
||||||
for (i=0; i<15; i++){argarr[i] = 0;}
|
for (i=0; i<15; i++){argarr[i] = 0;}
|
||||||
// get args for this connector
|
// get args for this connector
|
||||||
|
@ -171,6 +175,7 @@ namespace Controller {
|
||||||
}
|
}
|
||||||
runningConns.erase(runningConns.begin());
|
runningConns.erase(runningConns.begin());
|
||||||
}
|
}
|
||||||
|
return action;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,6 @@ namespace Controller {
|
||||||
void UpdateProtocol(std::string protocol);
|
void UpdateProtocol(std::string protocol);
|
||||||
|
|
||||||
/// Checks current protocol configuration, updates state of enabled connectors if neccesary.
|
/// Checks current protocol configuration, updates state of enabled connectors if neccesary.
|
||||||
void CheckProtocols(JSON::Value & p, JSON::Value & capabilities);
|
bool CheckProtocols(JSON::Value & p, JSON::Value & capabilities);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,8 @@ namespace Controller {
|
||||||
|
|
||||||
///\brief Checks all streams, restoring if needed.
|
///\brief Checks all streams, restoring if needed.
|
||||||
///\param data The stream configuration for the server.
|
///\param data The stream configuration for the server.
|
||||||
void CheckAllStreams(JSON::Value & data){
|
///\returns True if the server status changed
|
||||||
|
bool CheckAllStreams(JSON::Value & data){
|
||||||
long long int currTime = Util::epoch();
|
long long int currTime = Util::epoch();
|
||||||
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
|
for (JSON::ObjIter jit = data.ObjBegin(); jit != data.ObjEnd(); jit++){
|
||||||
checkStream(jit->first, jit->second);
|
checkStream(jit->first, jit->second);
|
||||||
|
@ -117,19 +118,15 @@ namespace Controller {
|
||||||
jit->second["online"] = 1;
|
jit->second["online"] = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//check for changes in config or streams
|
||||||
static JSON::Value strlist;
|
static JSON::Value strlist;
|
||||||
bool changed = false;
|
if (strlist["config"] != Storage["config"] || strlist["streams"] != Storage["streams"]){
|
||||||
if (strlist["config"] != Storage["config"]){
|
|
||||||
strlist["config"] = Storage["config"];
|
strlist["config"] = Storage["config"];
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
if (strlist["streams"] != Storage["streams"]){
|
|
||||||
strlist["streams"] = Storage["streams"];
|
strlist["streams"] = Storage["streams"];
|
||||||
changed = true;
|
return true;
|
||||||
}
|
|
||||||
if (changed){
|
|
||||||
writeConfig();
|
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AddStreams(JSON::Value & in, JSON::Value & out){
|
void AddStreams(JSON::Value & in, JSON::Value & out){
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
namespace Controller {
|
namespace Controller {
|
||||||
bool streamsEqual(JSON::Value & one, JSON::Value & two);
|
bool streamsEqual(JSON::Value & one, JSON::Value & two);
|
||||||
void checkStream(std::string name, JSON::Value & data);
|
void checkStream(std::string name, JSON::Value & data);
|
||||||
void CheckAllStreams(JSON::Value & data);
|
bool CheckAllStreams(JSON::Value & data);
|
||||||
void CheckStreams(JSON::Value & in, JSON::Value & out);
|
void CheckStreams(JSON::Value & in, JSON::Value & out);
|
||||||
void AddStreams(JSON::Value & in, JSON::Value & out);
|
void AddStreams(JSON::Value & in, JSON::Value & out);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue