Added onCrash handler for inputs, buffer now does proper cleanup when crashing.

This commit is contained in:
Thulinma 2016-04-20 14:33:51 +02:00
parent babbcf706a
commit 61b66e349e
5 changed files with 81 additions and 0 deletions

View file

@ -20,6 +20,7 @@ namespace Mist {
public:
Input(Util::Config * cfg);
virtual int run();
virtual void onCrash(){}
virtual void argumentsParsed(){}
virtual ~Input() {};
protected:

View file

@ -180,6 +180,76 @@ namespace Mist {
}
}
///Cleans up any left-over data for the current stream
void inputBuffer::onCrash(){
WARN_MSG("BUffer crashed. Cleaning.");
streamName = config->getString("streamname");
char pageName[NAME_BUFFER_SIZE];
//Set userpage to all 0xFF's, will disconnect all current clients.
snprintf(pageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
std::string baseName = pageName;
for (long unsigned i = 0; i < 15; ++i){
unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024));
IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false);
tmp.master = false;
if (tmp.mapped){
WARN_MSG("Wiping %s", std::string(baseName + (char)(i + (int)'A')).c_str());
memset(tmp.mapped, 0xFF, size);
}
}
//Wait five seconds to allow everyone to disconnect gracefully.
Util::wait(5000);
//Now delete those pages
for (long unsigned i = 0; i < 15; ++i){
unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024));
IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false);
tmp.master = true;
if (tmp.mapped){
WARN_MSG("Wiping %s some more", std::string(baseName + (char)(i + (int)'A')).c_str());
}
}
{
//Delete the live stream semaphore, if any.
snprintf(pageName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.unlink();
}{
//Delete the stream index metapage.
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
IPC::sharedPage erasePage(pageName, 1024, false, false);
erasePage.master = true;
}
//Delete most if not all temporary track metadata pages.
for (long unsigned i = 1001; i <= 1024; ++i){
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), i);
IPC::sharedPage erasePage(pageName, 1024, false, false);
erasePage.master = true;
}
//Delete most if not all track indexes and data pages.
for (long unsigned i = 1; i <= 24; ++i){
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), i);
IPC::sharedPage indexPage(pageName, 8192, false, false);
indexPage.master = true;
if (indexPage.mapped){
char * mappedPointer = indexPage.mapped;
for (int j = 0; j < 8192; j += 8) {
int * tmpOffset = (int *)(mappedPointer + j);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), i, keyNum);
IPC::sharedPage erasePage(pageName, 1024, false, false);
erasePage.master = true;
}
}
}
}
/// \triggers
/// The `"STREAM_BUFFER"` trigger is stream-specific, and is ran whenever the buffer changes state between playable (FULL) or not (EMPTY). It cannot be cancelled. It is possible to receive multiple EMPTY calls without FULL calls in between, as EMPTY is always generated when a stream is unloaded from memory, even if this stream never reached playable state in the first place (e.g. a broadcast was cancelled before filling enough buffer to be playable). Its payload is:
/// ~~~~~~~~~~~~~~~

View file

@ -9,6 +9,7 @@ namespace Mist {
public:
inputBuffer(Util::Config * cfg);
~inputBuffer();
void onCrash();
private:
unsigned int bufferTime;
unsigned int cutTime;

View file

@ -57,6 +57,11 @@ int main(int argc, char * argv[]) {
DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str());
break;
}
#if DEBUG >= DLVL_DEVEL
WARN_MSG("Aborting autoclean; this is a development build.");
#else
conv.onCrash();
#endif
if (DEBUG >= DLVL_DEVEL){
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
break;

View file

@ -1167,6 +1167,10 @@ namespace Mist {
return;
}
}
if (nProxy.userClient.getData()[-1] > 127){
myConn.close();
return;
}
if (!nProxy.trackMap.size()){
IPC::userConnection userConn(nProxy.userClient.getData());
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){