diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index 92676833..b8a91c97 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -1,6 +1,7 @@ #include "controller_push.h" #include "controller_statistics.h" #include "controller_storage.h" +#include #include #include #include @@ -16,6 +17,9 @@ namespace Controller{ /// Internal list of waiting pushes std::map> waitingPushes; + static bool mustWritePushList = false; + static bool pushListRead = false; + /// Immediately starts a push for the given stream to the given target. /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. void startPush(const std::string &stream, std::string &target){ @@ -30,11 +34,15 @@ namespace Controller{ push.append(originalTarget); push.append(target); activePushes[ret] = push; + mustWritePushList = true; } } /// Returns true if the push is currently active, false otherwise. bool isPushActive(const std::string &streamname, const std::string &target){ + while (Controller::conf.is_active && !pushListRead){ + Util::sleep(100); + } std::set toWipe; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ if (Util::Procs::isActive(it->first)){ @@ -45,6 +53,7 @@ namespace Controller{ } while (toWipe.size()){ activePushes.erase(*toWipe.begin()); + mustWritePushList = true; toWipe.erase(toWipe.begin()); } return false; @@ -55,8 +64,60 @@ namespace Controller{ if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);} } + /// Compactly writes the list of pushes to a pointer, assumed to be 8MiB in size + static void writePushList(char * pwo){ + char * max = pwo + 8*1024*1024 - 4; + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + //check if the whole entry will fit + unsigned int entrylen = 4+2+it->second[1u].asStringRef().size()+2+it->second[2u].asStringRef().size()+2+it->second[3u].asStringRef().size(); + if (pwo+entrylen >= max){return;} + //write the pid as a 32 bits unsigned integer + Bit::htobl(pwo, it->first); + pwo += 4; + //write the streamname, original target and target, 2-byte-size-prepended + for (unsigned int i = 1; i < 4; ++i){ + const std::string &itm = it->second[i].asStringRef(); + Bit::htobs(pwo, itm.size()); + memcpy(pwo+2, itm.data(), itm.size()); + pwo += 2+itm.size(); + } + } + //if it fits, write an ending zero to indicate end of page + if (pwo <= max){ + Bit::htobl(pwo, 0); + } + } + + ///Reads the list of pushes from a pointer, assumed to end in four zeroes + static void readPushList(char * pwo){ + activePushes.clear(); + pid_t p = Bit::btohl(pwo); + HIGH_MSG("Recovering pushes: %lu", (uint32_t)p); + while (p > 1){ + JSON::Value push; + push.append((long long)p); + pwo += 4; + for (uint8_t i = 0; i < 3; ++i){ + uint16_t l = Bit::btohs(pwo); + push.append(std::string(pwo+2, l)); + pwo += 2+l; + } + INFO_MSG("Recovered push: %s", push.toString().c_str()); + Util::Procs::remember(p); + mustWritePushList = true; + activePushes[p] = push; + p = Bit::btohl(pwo); + } + } + /// Loops, checking every second if any pushes need restarting. void pushCheckLoop(void *np){ + { + IPC::sharedPage pushReadPage("MstPush", 8*1024*1024, false, false); + if (pushReadPage.mapped){readPushList(pushReadPage.mapped);} + } + pushListRead = true; + IPC::sharedPage pushPage("MstPush", 8*1024*1024, true, false); while (Controller::conf.is_active){ // this scope prevents the configMutex from being locked constantly { @@ -85,9 +146,21 @@ namespace Controller{ } } } + if (mustWritePushList && pushPage.mapped){ + writePushList(pushPage.mapped); + mustWritePushList = false; + } } Util::wait(1000); // wait at least a second } + //keep the pushPage if we are restarting, so we can restore state from it + if (Controller::restarting){ + pushPage.master = false; + //forget about all pushes, so they keep running + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + Util::Procs::forget(it->first); + } + } } /// Gives a list of all currently active pushes @@ -103,6 +176,7 @@ namespace Controller{ } while (toWipe.size()){ activePushes.erase(*toWipe.begin()); + mustWritePushList = true; toWipe.erase(toWipe.begin()); } }