From 61b66e349ec2ef759651805f1bb1aaa1671633de Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 20 Apr 2016 14:33:51 +0200 Subject: [PATCH] Added onCrash handler for inputs, buffer now does proper cleanup when crashing. --- src/input/input.h | 1 + src/input/input_buffer.cpp | 70 ++++++++++++++++++++++++++++++++++++++ src/input/input_buffer.h | 1 + src/input/mist_in.cpp | 5 +++ src/output/output.cpp | 4 +++ 5 files changed, 81 insertions(+) diff --git a/src/input/input.h b/src/input/input.h index e49ac0cd..b48adbbc 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -20,6 +20,7 @@ namespace Mist { public: Input(Util::Config * cfg); virtual int run(); + virtual void onCrash(){} virtual void argumentsParsed(){} virtual ~Input() {}; protected: diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index c6f73bde..68f76202 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -180,6 +180,76 @@ namespace Mist { } } + + ///Cleans up any left-over data for the current stream + void inputBuffer::onCrash(){ + WARN_MSG("BUffer crashed. Cleaning."); + streamName = config->getString("streamname"); + char pageName[NAME_BUFFER_SIZE]; + + //Set userpage to all 0xFF's, will disconnect all current clients. + snprintf(pageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + std::string baseName = pageName; + for (long unsigned i = 0; i < 15; ++i){ + unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024)); + IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false); + tmp.master = false; + if (tmp.mapped){ + WARN_MSG("Wiping %s", std::string(baseName + (char)(i + (int)'A')).c_str()); + memset(tmp.mapped, 0xFF, size); + } + } + //Wait five seconds to allow everyone to disconnect gracefully. + Util::wait(5000); + //Now delete those pages + for (long unsigned i = 0; i < 15; ++i){ + unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024)); + IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false); + tmp.master = true; + if (tmp.mapped){ + WARN_MSG("Wiping %s some more", std::string(baseName + (char)(i + (int)'A')).c_str()); + } + } + + { + //Delete the live stream semaphore, if any. + snprintf(pageName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); + IPC::semaphore liveMeta(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + liveMeta.unlink(); + }{ + //Delete the stream index metapage. + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + IPC::sharedPage erasePage(pageName, 1024, false, false); + erasePage.master = true; + } + //Delete most if not all temporary track metadata pages. + for (long unsigned i = 1001; i <= 1024; ++i){ + snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), i); + IPC::sharedPage erasePage(pageName, 1024, false, false); + erasePage.master = true; + } + //Delete most if not all track indexes and data pages. + for (long unsigned i = 1; i <= 24; ++i){ + snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), i); + IPC::sharedPage indexPage(pageName, 8192, false, false); + indexPage.master = true; + if (indexPage.mapped){ + char * mappedPointer = indexPage.mapped; + for (int j = 0; j < 8192; j += 8) { + int * tmpOffset = (int *)(mappedPointer + j); + if (tmpOffset[0] == 0 && tmpOffset[1] == 0){ + continue; + } + unsigned long keyNum = ntohl(tmpOffset[0]); + snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), i, keyNum); + IPC::sharedPage erasePage(pageName, 1024, false, false); + erasePage.master = true; + } + } + + } + } + /// \triggers /// The `"STREAM_BUFFER"` trigger is stream-specific, and is ran whenever the buffer changes state between playable (FULL) or not (EMPTY). It cannot be cancelled. It is possible to receive multiple EMPTY calls without FULL calls in between, as EMPTY is always generated when a stream is unloaded from memory, even if this stream never reached playable state in the first place (e.g. a broadcast was cancelled before filling enough buffer to be playable). Its payload is: /// ~~~~~~~~~~~~~~~ diff --git a/src/input/input_buffer.h b/src/input/input_buffer.h index 6c99f402..b360523a 100644 --- a/src/input/input_buffer.h +++ b/src/input/input_buffer.h @@ -9,6 +9,7 @@ namespace Mist { public: inputBuffer(Util::Config * cfg); ~inputBuffer(); + void onCrash(); private: unsigned int bufferTime; unsigned int cutTime; diff --git a/src/input/mist_in.cpp b/src/input/mist_in.cpp index db0beafe..baeb2ade 100644 --- a/src/input/mist_in.cpp +++ b/src/input/mist_in.cpp @@ -57,6 +57,11 @@ int main(int argc, char * argv[]) { DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str()); break; } +#if DEBUG >= DLVL_DEVEL + WARN_MSG("Aborting autoclean; this is a development build."); +#else + conv.onCrash(); +#endif if (DEBUG >= DLVL_DEVEL){ DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); break; diff --git a/src/output/output.cpp b/src/output/output.cpp index c880e523..53baaa36 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1167,6 +1167,10 @@ namespace Mist { return; } } + if (nProxy.userClient.getData()[-1] > 127){ + myConn.close(); + return; + } if (!nProxy.trackMap.size()){ IPC::userConnection userConn(nProxy.userClient.getData()); for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){