diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index d883c1fe..81a55a39 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -347,6 +347,10 @@ int main_loop(int argc, char **argv){ << "!----" APPNAME " Started at " << buffer << " ----!" << std::endl; } } + + // We need to do this before we start the log reader, since the log reader might parse messages + // from pushes, which block if this list is not read yet. + Controller::readPushList(); {// spawn thread that reads stderr of process std::string logPipe = Util::getTmpFolder() + "MstLog"; diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index bae78246..3e00e5ad 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -14,6 +14,7 @@ namespace Controller{ /// Internal list of currently active pushes std::map activePushes; + tthread::recursive_mutex actPushMut; /// Internal list of waiting pushes std::map > waitingPushes; @@ -24,6 +25,7 @@ namespace Controller{ /// Immediately starts a push for the given stream to the given target. /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. void startPush(const std::string &stream, std::string &target){ + tthread::lock_guard actGuard(actPushMut); // Cancel if already active if (isPushActive(stream, target)){return;} std::string originalTarget = target; @@ -40,11 +42,14 @@ namespace Controller{ } void setPushStatus(uint64_t id, const JSON::Value & status){ + tthread::lock_guard actGuard(actPushMut); if (!activePushes.count(id)){return;} activePushes[id][5].extend(status); } void pushLogMessage(uint64_t id, const JSON::Value & msg){ + tthread::lock_guard actGuard(actPushMut); + if (!activePushes.count(id)){return;} JSON::Value &log = activePushes[id][4]; log.append(msg); log.shrink(10); @@ -52,48 +57,59 @@ namespace Controller{ bool isPushActive(uint64_t id){ while (Controller::conf.is_active && !pushListRead){Util::sleep(100);} - return activePushes.count(id); + { + tthread::lock_guard actGuard(actPushMut); + return activePushes.count(id); + } } /// Only used internally, to remove pushes static void removeActivePush(pid_t id){ - //ignore if the push does not exist - if (!activePushes.count(id)){return;} + JSON::Value p; + + { + tthread::lock_guard actGuard(actPushMut); + //ignore if the push does not exist + if (!activePushes.count(id)){return;} + p = activePushes[id]; + //actually remove, make sure next pass the new list is written out too + activePushes.erase(id); + mustWritePushList = true; + } - JSON::Value p = activePushes[id]; if (Triggers::shouldTrigger("PUSH_END", p[1].asStringRef())){ std::string payload = p[0u].asString() + "\n" + p[1u].asString() + "\n" + p[2u].asString() + "\n" + p[3u].asString() + "\n" + p[4u].toString() + "\n" + p[5u].toString(); Triggers::doTrigger("PUSH_END", payload, p[1].asStringRef()); } - //actually remove, make sure next pass the new list is written out too - activePushes.erase(id); - mustWritePushList = true; } /// Returns true if the push is currently active, false otherwise. bool isPushActive(const std::string &streamname, const std::string &target){ while (Controller::conf.is_active && !pushListRead){Util::sleep(100);} - std::set toWipe; - for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ - if (Util::Procs::isActive(it->first)){ - // Apply variable substitution to make sure another push target does not resolve to the same target - if (it->second[1u].asStringRef() == streamname){ - std::string activeTarget = it->second[2u].asStringRef(); - std::string cmpTarget = target; - Util::streamVariables(activeTarget, streamname); - Util::streamVariables(cmpTarget, streamname); - if (activeTarget == cmpTarget){ - return true; + { + tthread::lock_guard actGuard(actPushMut); + std::set toWipe; + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + if (Util::Procs::isActive(it->first)){ + // Apply variable substitution to make sure another push target does not resolve to the same target + if (it->second[1u].asStringRef() == streamname){ + std::string activeTarget = it->second[2u].asStringRef(); + std::string cmpTarget = target; + Util::streamVariables(activeTarget, streamname); + Util::streamVariables(cmpTarget, streamname); + if (activeTarget == cmpTarget){ + return true; + } } + }else{ + toWipe.insert(it->first); } - }else{ - toWipe.insert(it->first); } - } - while (toWipe.size()){ - removeActivePush(*toWipe.begin()); - toWipe.erase(toWipe.begin()); + while (toWipe.size()){ + removeActivePush(*toWipe.begin()); + toWipe.erase(toWipe.begin()); + } } return false; } @@ -101,31 +117,36 @@ namespace Controller{ /// Stops any pushes matching the stream name (pattern) and target void stopActivePushes(const std::string &streamname, const std::string &target){ while (Controller::conf.is_active && !pushListRead){Util::sleep(100);} - std::set toWipe; - for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ - if (Util::Procs::isActive(it->first)){ - if (it->second[2u].asStringRef() == target && - (it->second[1u].asStringRef() == streamname || - (*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){ - Util::Procs::Stop(it->first); + { + tthread::lock_guard actGuard(actPushMut); + std::set toWipe; + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + if (Util::Procs::isActive(it->first)){ + if (it->second[2u].asStringRef() == target && + (it->second[1u].asStringRef() == streamname || + (*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){ + Util::Procs::Stop(it->first); + } + }else{ + toWipe.insert(it->first); } - }else{ - toWipe.insert(it->first); } - } - while (toWipe.size()){ - removeActivePush(*toWipe.begin()); - toWipe.erase(toWipe.begin()); + while (toWipe.size()){ + removeActivePush(*toWipe.begin()); + toWipe.erase(toWipe.begin()); + } } } /// Immediately stops a push with the given ID void stopPush(unsigned int ID){ + tthread::lock_guard actGuard(actPushMut); if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);} } /// Compactly writes the list of pushes to a pointer, assumed to be 8MiB in size static void writePushList(char *pwo){ + tthread::lock_guard actGuard(actPushMut); char *max = pwo + 8 * 1024 * 1024 - 4; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ // check if the whole entry will fit @@ -148,25 +169,35 @@ namespace Controller{ } /// Reads the list of pushes from a pointer, assumed to end in four zeroes - static void readPushList(char *pwo){ - activePushes.clear(); - pid_t p = Bit::btohl(pwo); - HIGH_MSG("Recovering pushes: %" PRIu32, (uint32_t)p); - while (p > 1){ - JSON::Value push; - push.append(p); - pwo += 4; - for (uint8_t i = 0; i < 3; ++i){ - uint16_t l = Bit::btohs(pwo); - push.append(std::string(pwo + 2, l)); - pwo += 2 + l; + void readPushList(){ + size_t recoverCount = 0; + { + tthread::lock_guard actGuard(actPushMut); + IPC::sharedPage pushReadPage("MstPush", 8 * 1024 * 1024, false, false); + char * pwo = pushReadPage.mapped; + if (pwo){ + pushReadPage.master = true; + activePushes.clear(); + uint32_t p = Bit::btohl(pwo); + while (p > 1){ + JSON::Value push; + push.append(p); + pwo += 4; + for (uint8_t i = 0; i < 3; ++i){ + uint16_t l = Bit::btohs(pwo); + push.append(std::string(pwo + 2, l)); + pwo += 2 + l; + } + Util::Procs::remember(p); + mustWritePushList = true; + activePushes[p] = push; + ++recoverCount; + p = Bit::btohl(pwo); + } } - INFO_MSG("Recovered push: %s", push.toString().c_str()); - Util::Procs::remember(p); - mustWritePushList = true; - activePushes[p] = push; - p = Bit::btohl(pwo); + pushListRead = true; } + INFO_MSG("Recovered %zu pushes:", recoverCount); } /// \brief Evaluates @@ -288,14 +319,6 @@ namespace Controller{ /// Loops, checking every second if any pushes need restarting. void pushCheckLoop(void *np){ - { - IPC::sharedPage pushReadPage("MstPush", 8 * 1024 * 1024, false, false); - if (pushReadPage.mapped){ - readPushList(pushReadPage.mapped); - pushReadPage.master = true; - } - } - pushListRead = true; IPC::sharedPage pushPage("MstPush", 8 * 1024 * 1024, true, false); while (Controller::conf.is_active){ // this scope prevents the configMutex from being locked constantly @@ -353,8 +376,11 @@ namespace Controller{ } //Check if any pushes have ended, clean them up std::set toWipe; - for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ - if (!Util::Procs::isActive(it->first)){toWipe.insert(it->first);} + { + tthread::lock_guard actGuard(actPushMut); + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + if (!Util::Procs::isActive(it->first)){toWipe.insert(it->first);} + } } while (toWipe.size()){ removeActivePush(*toWipe.begin()); @@ -373,6 +399,7 @@ namespace Controller{ if (Util::Config::is_restarting){ pushPage.master = false; // forget about all pushes, so they keep running + tthread::lock_guard actGuard(actPushMut); for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ Util::Procs::forget(it->first); } @@ -381,6 +408,7 @@ namespace Controller{ /// Gives a list of all currently active pushes void listPush(JSON::Value &output){ + tthread::lock_guard actGuard(actPushMut); output.null(); std::set toWipe; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ diff --git a/src/controller/controller_push.h b/src/controller/controller_push.h index 8c3f64e2..f78a65b8 100644 --- a/src/controller/controller_push.h +++ b/src/controller/controller_push.h @@ -7,6 +7,7 @@ namespace Controller{ // Functions for current pushes, start/stop/list void startPush(const std::string &streamname, std::string &target); void stopPush(unsigned int ID); + void readPushList(); void listPush(JSON::Value &output); void pushLogMessage(uint64_t id, const JSON::Value & msg); void setPushStatus(uint64_t id, const JSON::Value & status);