Prevent simultaneous access to the push list

This commit is contained in:
Thulinma 2024-05-19 13:28:51 +02:00
parent aca4623a8a
commit cf475a7c07
3 changed files with 97 additions and 64 deletions

View file

@ -347,6 +347,10 @@ int main_loop(int argc, char **argv){
<< "!----" APPNAME " Started at " << buffer << " ----!" << std::endl; << "!----" APPNAME " Started at " << buffer << " ----!" << std::endl;
} }
} }
// We need to do this before we start the log reader, since the log reader might parse messages
// from pushes, which block if this list is not read yet.
Controller::readPushList();
{// spawn thread that reads stderr of process {// spawn thread that reads stderr of process
std::string logPipe = Util::getTmpFolder() + "MstLog"; std::string logPipe = Util::getTmpFolder() + "MstLog";

View file

@ -14,6 +14,7 @@ namespace Controller{
/// Internal list of currently active pushes /// Internal list of currently active pushes
std::map<pid_t, JSON::Value> activePushes; std::map<pid_t, JSON::Value> activePushes;
tthread::recursive_mutex actPushMut;
/// Internal list of waiting pushes /// Internal list of waiting pushes
std::map<std::string, std::map<std::string, unsigned int> > waitingPushes; std::map<std::string, std::map<std::string, unsigned int> > waitingPushes;
@ -24,6 +25,7 @@ namespace Controller{
/// Immediately starts a push for the given stream to the given target. /// Immediately starts a push for the given stream to the given target.
/// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map.
void startPush(const std::string &stream, std::string &target){ void startPush(const std::string &stream, std::string &target){
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
// Cancel if already active // Cancel if already active
if (isPushActive(stream, target)){return;} if (isPushActive(stream, target)){return;}
std::string originalTarget = target; std::string originalTarget = target;
@ -40,11 +42,14 @@ namespace Controller{
} }
void setPushStatus(uint64_t id, const JSON::Value & status){ void setPushStatus(uint64_t id, const JSON::Value & status){
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
if (!activePushes.count(id)){return;} if (!activePushes.count(id)){return;}
activePushes[id][5].extend(status); activePushes[id][5].extend(status);
} }
void pushLogMessage(uint64_t id, const JSON::Value & msg){ void pushLogMessage(uint64_t id, const JSON::Value & msg){
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
if (!activePushes.count(id)){return;}
JSON::Value &log = activePushes[id][4]; JSON::Value &log = activePushes[id][4];
log.append(msg); log.append(msg);
log.shrink(10); log.shrink(10);
@ -52,48 +57,59 @@ namespace Controller{
bool isPushActive(uint64_t id){ bool isPushActive(uint64_t id){
while (Controller::conf.is_active && !pushListRead){Util::sleep(100);} while (Controller::conf.is_active && !pushListRead){Util::sleep(100);}
return activePushes.count(id); {
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
return activePushes.count(id);
}
} }
/// Only used internally, to remove pushes /// Only used internally, to remove pushes
static void removeActivePush(pid_t id){ static void removeActivePush(pid_t id){
//ignore if the push does not exist JSON::Value p;
if (!activePushes.count(id)){return;}
{
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
//ignore if the push does not exist
if (!activePushes.count(id)){return;}
p = activePushes[id];
//actually remove, make sure next pass the new list is written out too
activePushes.erase(id);
mustWritePushList = true;
}
JSON::Value p = activePushes[id];
if (Triggers::shouldTrigger("PUSH_END", p[1].asStringRef())){ if (Triggers::shouldTrigger("PUSH_END", p[1].asStringRef())){
std::string payload = p[0u].asString() + "\n" + p[1u].asString() + "\n" + p[2u].asString() + "\n" + p[3u].asString() + "\n" + p[4u].toString() + "\n" + p[5u].toString(); std::string payload = p[0u].asString() + "\n" + p[1u].asString() + "\n" + p[2u].asString() + "\n" + p[3u].asString() + "\n" + p[4u].toString() + "\n" + p[5u].toString();
Triggers::doTrigger("PUSH_END", payload, p[1].asStringRef()); Triggers::doTrigger("PUSH_END", payload, p[1].asStringRef());
} }
//actually remove, make sure next pass the new list is written out too
activePushes.erase(id);
mustWritePushList = true;
} }
/// Returns true if the push is currently active, false otherwise. /// Returns true if the push is currently active, false otherwise.
bool isPushActive(const std::string &streamname, const std::string &target){ bool isPushActive(const std::string &streamname, const std::string &target){
while (Controller::conf.is_active && !pushListRead){Util::sleep(100);} while (Controller::conf.is_active && !pushListRead){Util::sleep(100);}
std::set<pid_t> toWipe; {
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
if (Util::Procs::isActive(it->first)){ std::set<pid_t> toWipe;
// Apply variable substitution to make sure another push target does not resolve to the same target for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
if (it->second[1u].asStringRef() == streamname){ if (Util::Procs::isActive(it->first)){
std::string activeTarget = it->second[2u].asStringRef(); // Apply variable substitution to make sure another push target does not resolve to the same target
std::string cmpTarget = target; if (it->second[1u].asStringRef() == streamname){
Util::streamVariables(activeTarget, streamname); std::string activeTarget = it->second[2u].asStringRef();
Util::streamVariables(cmpTarget, streamname); std::string cmpTarget = target;
if (activeTarget == cmpTarget){ Util::streamVariables(activeTarget, streamname);
return true; Util::streamVariables(cmpTarget, streamname);
if (activeTarget == cmpTarget){
return true;
}
} }
}else{
toWipe.insert(it->first);
} }
}else{
toWipe.insert(it->first);
} }
} while (toWipe.size()){
while (toWipe.size()){ removeActivePush(*toWipe.begin());
removeActivePush(*toWipe.begin()); toWipe.erase(toWipe.begin());
toWipe.erase(toWipe.begin()); }
} }
return false; return false;
} }
@ -101,31 +117,36 @@ namespace Controller{
/// Stops any pushes matching the stream name (pattern) and target /// Stops any pushes matching the stream name (pattern) and target
void stopActivePushes(const std::string &streamname, const std::string &target){ void stopActivePushes(const std::string &streamname, const std::string &target){
while (Controller::conf.is_active && !pushListRead){Util::sleep(100);} while (Controller::conf.is_active && !pushListRead){Util::sleep(100);}
std::set<pid_t> toWipe; {
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
if (Util::Procs::isActive(it->first)){ std::set<pid_t> toWipe;
if (it->second[2u].asStringRef() == target && for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
(it->second[1u].asStringRef() == streamname || if (Util::Procs::isActive(it->first)){
(*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){ if (it->second[2u].asStringRef() == target &&
Util::Procs::Stop(it->first); (it->second[1u].asStringRef() == streamname ||
(*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){
Util::Procs::Stop(it->first);
}
}else{
toWipe.insert(it->first);
} }
}else{
toWipe.insert(it->first);
} }
} while (toWipe.size()){
while (toWipe.size()){ removeActivePush(*toWipe.begin());
removeActivePush(*toWipe.begin()); toWipe.erase(toWipe.begin());
toWipe.erase(toWipe.begin()); }
} }
} }
/// Immediately stops a push with the given ID /// Immediately stops a push with the given ID
void stopPush(unsigned int ID){ void stopPush(unsigned int ID){
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);} if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);}
} }
/// Compactly writes the list of pushes to a pointer, assumed to be 8MiB in size /// Compactly writes the list of pushes to a pointer, assumed to be 8MiB in size
static void writePushList(char *pwo){ static void writePushList(char *pwo){
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
char *max = pwo + 8 * 1024 * 1024 - 4; char *max = pwo + 8 * 1024 * 1024 - 4;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
// check if the whole entry will fit // check if the whole entry will fit
@ -148,25 +169,35 @@ namespace Controller{
} }
/// Reads the list of pushes from a pointer, assumed to end in four zeroes /// Reads the list of pushes from a pointer, assumed to end in four zeroes
static void readPushList(char *pwo){ void readPushList(){
activePushes.clear(); size_t recoverCount = 0;
pid_t p = Bit::btohl(pwo); {
HIGH_MSG("Recovering pushes: %" PRIu32, (uint32_t)p); tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
while (p > 1){ IPC::sharedPage pushReadPage("MstPush", 8 * 1024 * 1024, false, false);
JSON::Value push; char * pwo = pushReadPage.mapped;
push.append(p); if (pwo){
pwo += 4; pushReadPage.master = true;
for (uint8_t i = 0; i < 3; ++i){ activePushes.clear();
uint16_t l = Bit::btohs(pwo); uint32_t p = Bit::btohl(pwo);
push.append(std::string(pwo + 2, l)); while (p > 1){
pwo += 2 + l; JSON::Value push;
push.append(p);
pwo += 4;
for (uint8_t i = 0; i < 3; ++i){
uint16_t l = Bit::btohs(pwo);
push.append(std::string(pwo + 2, l));
pwo += 2 + l;
}
Util::Procs::remember(p);
mustWritePushList = true;
activePushes[p] = push;
++recoverCount;
p = Bit::btohl(pwo);
}
} }
INFO_MSG("Recovered push: %s", push.toString().c_str()); pushListRead = true;
Util::Procs::remember(p);
mustWritePushList = true;
activePushes[p] = push;
p = Bit::btohl(pwo);
} }
INFO_MSG("Recovered %zu pushes:", recoverCount);
} }
/// \brief Evaluates <value of currentVariable> <operator> <matchedValue> /// \brief Evaluates <value of currentVariable> <operator> <matchedValue>
@ -288,14 +319,6 @@ namespace Controller{
/// Loops, checking every second if any pushes need restarting. /// Loops, checking every second if any pushes need restarting.
void pushCheckLoop(void *np){ void pushCheckLoop(void *np){
{
IPC::sharedPage pushReadPage("MstPush", 8 * 1024 * 1024, false, false);
if (pushReadPage.mapped){
readPushList(pushReadPage.mapped);
pushReadPage.master = true;
}
}
pushListRead = true;
IPC::sharedPage pushPage("MstPush", 8 * 1024 * 1024, true, false); IPC::sharedPage pushPage("MstPush", 8 * 1024 * 1024, true, false);
while (Controller::conf.is_active){ while (Controller::conf.is_active){
// this scope prevents the configMutex from being locked constantly // this scope prevents the configMutex from being locked constantly
@ -353,8 +376,11 @@ namespace Controller{
} }
//Check if any pushes have ended, clean them up //Check if any pushes have ended, clean them up
std::set<pid_t> toWipe; std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ {
if (!Util::Procs::isActive(it->first)){toWipe.insert(it->first);} tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
if (!Util::Procs::isActive(it->first)){toWipe.insert(it->first);}
}
} }
while (toWipe.size()){ while (toWipe.size()){
removeActivePush(*toWipe.begin()); removeActivePush(*toWipe.begin());
@ -373,6 +399,7 @@ namespace Controller{
if (Util::Config::is_restarting){ if (Util::Config::is_restarting){
pushPage.master = false; pushPage.master = false;
// forget about all pushes, so they keep running // forget about all pushes, so they keep running
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
Util::Procs::forget(it->first); Util::Procs::forget(it->first);
} }
@ -381,6 +408,7 @@ namespace Controller{
/// Gives a list of all currently active pushes /// Gives a list of all currently active pushes
void listPush(JSON::Value &output){ void listPush(JSON::Value &output){
tthread::lock_guard<tthread::recursive_mutex> actGuard(actPushMut);
output.null(); output.null();
std::set<pid_t> toWipe; std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){

View file

@ -7,6 +7,7 @@ namespace Controller{
// Functions for current pushes, start/stop/list // Functions for current pushes, start/stop/list
void startPush(const std::string &streamname, std::string &target); void startPush(const std::string &streamname, std::string &target);
void stopPush(unsigned int ID); void stopPush(unsigned int ID);
void readPushList();
void listPush(JSON::Value &output); void listPush(JSON::Value &output);
void pushLogMessage(uint64_t id, const JSON::Value & msg); void pushLogMessage(uint64_t id, const JSON::Value & msg);
void setPushStatus(uint64_t id, const JSON::Value & status); void setPushStatus(uint64_t id, const JSON::Value & status);