From 15c132f6f8955d49074ba1349d1fa2ea3a8a40c7 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 21 Apr 2021 18:12:11 +0200 Subject: [PATCH] Added MistUtilNuke, added input PID and pull PID fields, added nuke_stream API call --- CMakeLists.txt | 1 + lib/defines.h | 2 + src/controller/controller_api.cpp | 11 ++ src/input/input.cpp | 46 ++++++++- src/input/input.h | 2 + src/utils/util_nuke.cpp | 166 ++++++++++++++++++++++++++++++ 6 files changed, 223 insertions(+), 5 deletions(-) create mode 100644 src/utils/util_nuke.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2b9e7f74..1848dad2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -372,6 +372,7 @@ makeUtil(META meta) makeUtil(RAX rax) makeUtil(AMF amf) makeUtil(Certbot certbot) +makeUtil(Nuke nuke) if (DEFINED LOAD_BALANCE ) makeUtil(Load load) endif() diff --git a/lib/defines.h b/lib/defines.h index 2568d047..f2dfe10d 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -210,6 +210,8 @@ static inline void show_stackframe(){} #define SHM_STREAM_STATE "MstSTATE%s" //%s stream name #define SHM_STREAM_CONF "MstSCnf%s" //%s stream name +#define SHM_STREAM_IPID "MstIPID%s" //%s stream name +#define SHM_STREAM_PPID "MstPPID%s" //%s stream name #define SHM_GLOBAL_CONF "MstGlobalConfig" #define STRMSTAT_OFF 0 #define STRMSTAT_INIT 1 diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 4b8ad984..d0121412 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -936,6 +936,17 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ } } + if (Request.isMember("nuke_stream") && Request["nuke_stream"].isString() && Request["nuke_stream"].asStringRef().size()){ + std::string strm = Request["nuke_stream"].asStringRef(); + std::deque command; + command.push_back(Util::getMyPath() + "MistUtilNuke"); + command.push_back(strm); + int stdIn = 0; + int stdOut = 1; + int stdErr = 2; + Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr); + } + if (Request.isMember("invalidate_sessions")){ if (Request["invalidate_sessions"].isArray()){ for (unsigned int i = 0; i < Request["invalidate_sessions"].size(); ++i){ diff --git a/src/input/input.cpp b/src/input/input.cpp index b08bbe7f..a7b8f08c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -335,12 +335,19 @@ namespace Mist{ playerLock.close(); return 1; } + //Set stream status to STRMSTAT_INIT, then close the page in non-master mode to keep it around char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); streamStatus.init(pageName, 1, true, false); if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} streamStatus.master = false; streamStatus.close(); + //Set stream input PID to current PID + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_IPID, streamName.c_str()); + pidPage.init(pageName, 8, true, false); + if (pidPage){(*(uint64_t*)(pidPage.mapped)) = getpid();} + pidPage.master = false; + pidPage.close(); }else{ // needsLock() == false means this binary will itself start the sole responsible input // So, we definitely do NOT lock SEM_INPUT, since the child process will do that later. @@ -359,6 +366,13 @@ namespace Mist{ pullLock.close(); return 1; } + //Set stream pull PID to current PID + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_PPID, streamName.c_str()); + pidPage.init(pageName, 8, true, false); + if (pidPage){(*(uint64_t*)(pidPage.mapped)) = getpid();} + pidPage.master = false; + pidPage.close(); } } } @@ -449,21 +463,43 @@ namespace Mist{ streamName.c_str()); break; #else - WARN_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); + if (config->is_active){ + WARN_MSG("Input for stream %s uncleanly shut down! Cleaning and restarting...", streamName.c_str()); + }else{ + WARN_MSG("Input for stream %s uncleanly killed, cleaning up...", streamName.c_str()); + } onCrash(); - Util::wait(reTimer); - reTimer += 1000; + if (config->is_active){ + Util::wait(reTimer); + reTimer += 1000; + } #endif } if (playerLock){ - playerLock.unlink(); + //Clear stream input PID char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_IPID, streamName.c_str()); + pidPage.init(pageName, 8, false, false); + pidPage.master = true; + pidPage.close(); + //Clear stream state snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); streamStatus.init(pageName, 1, true, false); streamStatus.close(); + //Delete lock + playerLock.unlink(); + } + if (pullLock){ + //Clear stream pull PID + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_PPID, streamName.c_str()); + pidPage.init(pageName, 8, false, false); + pidPage.master = true; + pidPage.close(); + //Delete lock + pullLock.unlink(); } - pullLock.unlink(); HIGH_MSG("Angel process for %s exiting", streamName.c_str()); return 0; diff --git a/src/input/input.h b/src/input/input.h index a2ba4258..e0957fb8 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -101,6 +101,8 @@ namespace Mist{ uint64_t simStartTime; + IPC::sharedPage pidPage; ///Stores responsible input process PID + void handleBuyDRM(); }; }// namespace Mist diff --git a/src/utils/util_nuke.cpp b/src/utils/util_nuke.cpp new file mode 100644 index 00000000..c338f6a3 --- /dev/null +++ b/src/utils/util_nuke.cpp @@ -0,0 +1,166 @@ +#include +#include +#include +#include +#include +#include + +const char * getStateString(uint8_t state){ + switch (state){ + case STRMSTAT_OFF: return "Stream is offline"; + case STRMSTAT_INIT: return "Stream is initializing"; + case STRMSTAT_BOOT: return "Stream is booting"; + case STRMSTAT_WAIT: return "Stream is waiting for data"; + case STRMSTAT_READY: return "Stream is online"; + case STRMSTAT_SHUTDOWN: return "Stream is shutting down"; + case STRMSTAT_INVALID: return "Stream status is invalid?!"; + default: return "Stream status is unknown?!"; + } +} + +/// Gets a PID from a shared memory page, if it exists +uint64_t getPidFromPage(const char * pagePattern){ + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, pagePattern, Util::streamName.c_str()); + IPC::sharedPage pidPage(pageName, 8, false, false); + if (pidPage){ + return *(uint64_t*)(pidPage.mapped); + } + return 0; +} + +/// Deletes a shared memory page, if it exists +void nukePage(const char * pagePattern){ + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, pagePattern, Util::streamName.c_str()); + IPC::sharedPage page(pageName, 0, false, false); + page.master = true; +} + +/// Deletes a semaphore, if it exists +void nukeSem(const char * pagePattern){ + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, pagePattern, Util::streamName.c_str()); + IPC::semaphore sem(pageName, O_RDWR, ACCESSPERMS, 0, true); + if (sem){sem.unlink();} +} + +int main(int argc, char **argv){ + Util::redirectLogsIfNeeded(); + if (argc < 1){ + FAIL_MSG("Usage: %s STREAM_NAME", argv[0]); + return 1; + } + Util::streamName = argv[1]; + uint8_t state = Util::getStreamStatus(Util::streamName); + INFO_MSG("Current stream status: %s", getStateString(state)); + size_t loops = 0; + if (state != STRMSTAT_OFF){INFO_MSG("Attempting clean shutdown...");} + while (state != STRMSTAT_OFF && loops++ < 40){ + uint64_t pid; + pid = getPidFromPage(SHM_STREAM_IPID); + if (pid > 1){Util::Procs::Stop(pid);} + pid = getPidFromPage(SHM_STREAM_PPID); + if (pid > 1){Util::Procs::Stop(pid);} + Util::wait(250); + uint8_t prevState = state; + state = Util::getStreamStatus(Util::streamName); + if (prevState != state){ + INFO_MSG("Current stream status: %s", getStateString(state)); + } + } + INFO_MSG("Detecting and cleaning up any leftovers..."); + // Scoping to clear up metadata and track providers + { + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, Util::streamName.c_str()); + IPC::sharedPage streamPage(pageName, 0, false, false); + if (streamPage.mapped){ + streamPage.master = true; + std::set checkPids; + Util::RelAccX stream(streamPage.mapped, false); + if (stream.isReady()){ + Util::RelAccX trackList(stream.getPointer("tracks"), false); + if (trackList.isReady()){ + for (size_t i = 0; i < trackList.getPresent(); i++){ + IPC::sharedPage trackPage(trackList.getPointer("page", i), SHM_STREAM_TRACK_LEN, false, false); + trackPage.master = true; + pid_t pid = trackList.getInt("pid", i); + if (pid > 1){ + Util::Procs::Stop(pid); + checkPids.insert(pid); + } + if (trackPage){ + Util::RelAccX track(trackPage.mapped, false); + if (track.isReady()){ + Util::RelAccX pages(track.getPointer("pages"), false); + if (pages.isReady()){ + for (uint64_t j = pages.getDeleted(); j < pages.getEndPos(); j++){ + char thisPageName[NAME_BUFFER_SIZE]; + snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, + Util::streamName.c_str(), i, pages.getInt("firstkey", j)); + IPC::sharedPage p(thisPageName, 0); + p.master = true; + } + } + } + } + } + } + } + // Hard-kill any remaining track providers + if (checkPids.size()){ + //Wait a bit to settle + Util::sleep(1000); + while (checkPids.size()){ + Util::Procs::Murder(*checkPids.begin()); + checkPids.erase(*checkPids.begin()); + } + } + } + } + { // Wipe applications, if any are left over + uint64_t pid; + pid = getPidFromPage(SHM_STREAM_IPID); + if (pid){Util::Procs::Murder(pid);} + pid = getPidFromPage(SHM_STREAM_PPID); + if (pid){Util::Procs::Murder(pid);} + } + //Wipe relevant pages + nukePage(SHM_STREAM_STATE); + nukePage(SHM_STREAM_IPID); + nukePage(SHM_STREAM_PPID); + // Scoping to clear up users page + { + Comms::Users cleanUsers; + cleanUsers.reload(Util::streamName, true); + std::set checkPids; + for (size_t i = cleanUsers.firstValid(); i < cleanUsers.endValid(); ++i){ + uint8_t status = cleanUsers.getStatus(i); + cleanUsers.setStatus(COMM_STATUS_INVALID, i); + if (status != COMM_STATUS_INVALID && status != COMM_STATUS_DISCONNECT && cleanUsers.getTimer(i) < 126){ + pid_t pid = cleanUsers.getPid(i); + if (pid > 1){ + Util::Procs::Stop(pid); + checkPids.insert(pid); + } + } + } + cleanUsers.setMaster(true); + // Hard-kill any remaining clients + if (checkPids.size()){ + //Wait a bit to settle + Util::sleep(1000); + while (checkPids.size()){ + Util::Procs::Murder(*checkPids.begin()); + checkPids.erase(*checkPids.begin()); + } + } + } + nukePage(COMMS_USERS); + nukeSem(SEM_USERS); + nukeSem(SEM_LIVE); + nukeSem(SEM_INPUT); + nukeSem("/MstPull_%s"); + nukeSem(SEM_TRACKLIST); +}