Added auto-recovery of push list on crash and/or restart
This commit is contained in:
parent
6254c285d6
commit
781cad41c8
1 changed files with 74 additions and 0 deletions
|
@ -1,6 +1,7 @@
|
||||||
#include "controller_push.h"
|
#include "controller_push.h"
|
||||||
#include "controller_statistics.h"
|
#include "controller_statistics.h"
|
||||||
#include "controller_storage.h"
|
#include "controller_storage.h"
|
||||||
|
#include <mist/bitfields.h>
|
||||||
#include <mist/config.h>
|
#include <mist/config.h>
|
||||||
#include <mist/json.h>
|
#include <mist/json.h>
|
||||||
#include <mist/procs.h>
|
#include <mist/procs.h>
|
||||||
|
@ -16,6 +17,9 @@ namespace Controller{
|
||||||
/// Internal list of waiting pushes
|
/// Internal list of waiting pushes
|
||||||
std::map<std::string, std::map<std::string, unsigned int>> waitingPushes;
|
std::map<std::string, std::map<std::string, unsigned int>> waitingPushes;
|
||||||
|
|
||||||
|
static bool mustWritePushList = false;
|
||||||
|
static bool pushListRead = false;
|
||||||
|
|
||||||
/// Immediately starts a push for the given stream to the given target.
|
/// Immediately starts a push for the given stream to the given target.
|
||||||
/// Simply calls Util::startPush and stores the resulting PID in the local activePushes map.
|
/// Simply calls Util::startPush and stores the resulting PID in the local activePushes map.
|
||||||
void startPush(const std::string &stream, std::string &target){
|
void startPush(const std::string &stream, std::string &target){
|
||||||
|
@ -30,11 +34,15 @@ namespace Controller{
|
||||||
push.append(originalTarget);
|
push.append(originalTarget);
|
||||||
push.append(target);
|
push.append(target);
|
||||||
activePushes[ret] = push;
|
activePushes[ret] = push;
|
||||||
|
mustWritePushList = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if the push is currently active, false otherwise.
|
/// Returns true if the push is currently active, false otherwise.
|
||||||
bool isPushActive(const std::string &streamname, const std::string &target){
|
bool isPushActive(const std::string &streamname, const std::string &target){
|
||||||
|
while (Controller::conf.is_active && !pushListRead){
|
||||||
|
Util::sleep(100);
|
||||||
|
}
|
||||||
std::set<pid_t> toWipe;
|
std::set<pid_t> toWipe;
|
||||||
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
|
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
|
||||||
if (Util::Procs::isActive(it->first)){
|
if (Util::Procs::isActive(it->first)){
|
||||||
|
@ -45,6 +53,7 @@ namespace Controller{
|
||||||
}
|
}
|
||||||
while (toWipe.size()){
|
while (toWipe.size()){
|
||||||
activePushes.erase(*toWipe.begin());
|
activePushes.erase(*toWipe.begin());
|
||||||
|
mustWritePushList = true;
|
||||||
toWipe.erase(toWipe.begin());
|
toWipe.erase(toWipe.begin());
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -55,8 +64,60 @@ namespace Controller{
|
||||||
if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);}
|
if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compactly writes the list of pushes to a pointer, assumed to be 8MiB in size
|
||||||
|
static void writePushList(char * pwo){
|
||||||
|
char * max = pwo + 8*1024*1024 - 4;
|
||||||
|
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
|
||||||
|
//check if the whole entry will fit
|
||||||
|
unsigned int entrylen = 4+2+it->second[1u].asStringRef().size()+2+it->second[2u].asStringRef().size()+2+it->second[3u].asStringRef().size();
|
||||||
|
if (pwo+entrylen >= max){return;}
|
||||||
|
//write the pid as a 32 bits unsigned integer
|
||||||
|
Bit::htobl(pwo, it->first);
|
||||||
|
pwo += 4;
|
||||||
|
//write the streamname, original target and target, 2-byte-size-prepended
|
||||||
|
for (unsigned int i = 1; i < 4; ++i){
|
||||||
|
const std::string &itm = it->second[i].asStringRef();
|
||||||
|
Bit::htobs(pwo, itm.size());
|
||||||
|
memcpy(pwo+2, itm.data(), itm.size());
|
||||||
|
pwo += 2+itm.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//if it fits, write an ending zero to indicate end of page
|
||||||
|
if (pwo <= max){
|
||||||
|
Bit::htobl(pwo, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///Reads the list of pushes from a pointer, assumed to end in four zeroes
|
||||||
|
static void readPushList(char * pwo){
|
||||||
|
activePushes.clear();
|
||||||
|
pid_t p = Bit::btohl(pwo);
|
||||||
|
HIGH_MSG("Recovering pushes: %lu", (uint32_t)p);
|
||||||
|
while (p > 1){
|
||||||
|
JSON::Value push;
|
||||||
|
push.append((long long)p);
|
||||||
|
pwo += 4;
|
||||||
|
for (uint8_t i = 0; i < 3; ++i){
|
||||||
|
uint16_t l = Bit::btohs(pwo);
|
||||||
|
push.append(std::string(pwo+2, l));
|
||||||
|
pwo += 2+l;
|
||||||
|
}
|
||||||
|
INFO_MSG("Recovered push: %s", push.toString().c_str());
|
||||||
|
Util::Procs::remember(p);
|
||||||
|
mustWritePushList = true;
|
||||||
|
activePushes[p] = push;
|
||||||
|
p = Bit::btohl(pwo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Loops, checking every second if any pushes need restarting.
|
/// Loops, checking every second if any pushes need restarting.
|
||||||
void pushCheckLoop(void *np){
|
void pushCheckLoop(void *np){
|
||||||
|
{
|
||||||
|
IPC::sharedPage pushReadPage("MstPush", 8*1024*1024, false, false);
|
||||||
|
if (pushReadPage.mapped){readPushList(pushReadPage.mapped);}
|
||||||
|
}
|
||||||
|
pushListRead = true;
|
||||||
|
IPC::sharedPage pushPage("MstPush", 8*1024*1024, true, false);
|
||||||
while (Controller::conf.is_active){
|
while (Controller::conf.is_active){
|
||||||
// this scope prevents the configMutex from being locked constantly
|
// this scope prevents the configMutex from being locked constantly
|
||||||
{
|
{
|
||||||
|
@ -85,9 +146,21 @@ namespace Controller{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (mustWritePushList && pushPage.mapped){
|
||||||
|
writePushList(pushPage.mapped);
|
||||||
|
mustWritePushList = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Util::wait(1000); // wait at least a second
|
Util::wait(1000); // wait at least a second
|
||||||
}
|
}
|
||||||
|
//keep the pushPage if we are restarting, so we can restore state from it
|
||||||
|
if (Controller::restarting){
|
||||||
|
pushPage.master = false;
|
||||||
|
//forget about all pushes, so they keep running
|
||||||
|
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
|
||||||
|
Util::Procs::forget(it->first);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gives a list of all currently active pushes
|
/// Gives a list of all currently active pushes
|
||||||
|
@ -103,6 +176,7 @@ namespace Controller{
|
||||||
}
|
}
|
||||||
while (toWipe.size()){
|
while (toWipe.size()){
|
||||||
activePushes.erase(*toWipe.begin());
|
activePushes.erase(*toWipe.begin());
|
||||||
|
mustWritePushList = true;
|
||||||
toWipe.erase(toWipe.begin());
|
toWipe.erase(toWipe.begin());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue