From 49aa497a6648b345c8f4415716fd598a7622403f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 29 Nov 2020 01:59:23 +0100 Subject: [PATCH] Added no_unconfigured_streams API call, which nukes streams that are not supposed to be running. --- src/controller/controller_api.cpp | 51 ++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 48c97688..ebfcebed 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -485,6 +485,18 @@ static void removeDuplicateProtocols(){ } } +/// Helper function for nuke_stream and related calls +static void nukeStream(const std::string & strm){ + std::deque command; + command.push_back(Util::getMyPath() + "MistUtilNuke"); + command.push_back(strm); + int stdIn = 0; + int stdOut = 1; + int stdErr = 2; + Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr); +} + + void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ /*LTS-START*/ // These are only used internally. We abort further processing if encountered. @@ -964,14 +976,37 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ } if (Request.isMember("nuke_stream") && Request["nuke_stream"].isString() && Request["nuke_stream"].asStringRef().size()){ - std::string strm = Request["nuke_stream"].asStringRef(); - std::deque command; - command.push_back(Util::getMyPath() + "MistUtilNuke"); - command.push_back(strm); - int stdIn = 0; - int stdOut = 1; - int stdErr = 2; - Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr); + nukeStream(Request["nuke_stream"].asStringRef()); + } + + if (Request.isMember("no_unconfigured_streams")){ + JSON::Value emptyRequest; + JSON::Value currStreams; + Controller::fillActive(emptyRequest, currStreams, true); + jsonForEach(currStreams, strm){ + std::string S = strm->asStringRef(); + //Remove wildcard, if any + if (S.find('+') != std::string::npos){S.erase(S.find('+'));} + if (!Controller::Storage["streams"].isMember(S) || !Controller::Storage["streams"][S].isMember("source")){ + WARN_MSG("Shutting down unconfigured stream %s", strm->asStringRef().c_str()); + nukeStream(strm->asStringRef()); + continue; + } + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, strm->asStringRef().c_str()); + IPC::sharedPage streamPage(pageName, 0, false, false); + if (streamPage){ + Util::RelAccX rlxStrm(streamPage.mapped, false); + if (rlxStrm.isReady()){ + std::string source = rlxStrm.getPointer("source"); + const std::string & oriSource = Controller::Storage["streams"][S]["source"].asStringRef(); + if (source != oriSource){ + WARN_MSG("Source for %s is %s instead of %s; shutting it down", strm->asStringRef().c_str(), source.c_str(), oriSource.c_str()); + nukeStream(strm->asStringRef()); + } + } + } + } } if (Request.isMember("invalidate_sessions")){