From c3efc1001fc1655de0f0594120006c9f8f33a640 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 21 Jul 2015 23:07:10 +0200 Subject: [PATCH] Improved startInput function behaviour and reliability, added streamAlive function. --- lib/stream.cpp | 77 ++++++++++++++++++++++++++++--------------- lib/stream.h | 1 + src/output/output.cpp | 2 +- 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/lib/stream.cpp b/lib/stream.cpp index 7c8551c3..27422962 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -73,50 +73,75 @@ void Util::sanitizeName(std::string & streamname) { } } -/// Starts a process for a VoD stream. +/// Checks if the given streamname has an active input serving it. Returns true if this is the case. +/// Assumes the streamname has already been through sanitizeName()! +bool Util::streamAlive(std::string & streamname){ + IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()) { + playerLock.close(); + return true; + }else{ + playerLock.post(); + playerLock.close(); + return false; + } +} + +/// Assures the input for the given stream name is active. +/// Does stream name sanitizion first, followed by a stream name length check (<= 100 chars). +/// Then, checks if an input is already active by running streamAlive(). If yes, aborts. +/// If no, loads up the server configuration and attempts to start the given stream according to current config. +/// At this point, fails and aborts if MistController isn't running. bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) { + sanitizeName(streamname); if (streamname.size() > 100){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return false; } - IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - - sanitizeName(streamname); - std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); - //check if smp (everything before + or space) exists - DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); - if (!stream_cfg){ - DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); - configLock.post();//unlock the config semaphore - return false; - } - - //If starting without filename parameter, check if the stream is already active. + //Check if the stream is already active. //If yes, don't activate again to prevent duplicate inputs. //It's still possible a duplicate starts anyway, this is caught in the inputs initializer. //Note: this uses the _whole_ stream name, including + (if any). //This means "test+a" and "test+b" have separate locks and do not interact with each other. + if (streamAlive(streamname)){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str()); + return true; + } + + //Attempt to load up configuration and find this stream + IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + //Lock the config to prevent race conditions and corruption issues while reading + configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + //Abort if no config available + if (!config){ + FAIL_MSG("Configuration not available, aborting! Is MistController running?"); + configLock.post();//unlock the config semaphore + return false; + } + //Find stream base name + std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); + //check if base name (everything before + or space) exists + DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); + if (!stream_cfg){ + DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str()); + } + + //Only use configured source if not manually overridden. Abort if no config is available. if (!filename.size()){ - IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()) { - playerLock.close(); - DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active - not activating again", streamname.c_str()); + if (!stream_cfg){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str()); configLock.post();//unlock the config semaphore - return true; + return false; } - playerLock.post(); - playerLock.close(); filename = stream_cfg.getMember("source").asString(); } - + //check in curConf for capabilities-inputs--priority/source_match std::string player_bin; bool selected = false; long long int curPrio = -1; - //check in curConf for capabilities-inputs--priority/source_match DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); DTSC::Scan input; unsigned int input_size = inputs.getSize(); diff --git a/lib/stream.h b/lib/stream.h index 1db6c471..07c935ea 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -8,5 +8,6 @@ namespace Util { std::string getTmpFolder(); void sanitizeName(std::string & streamname); + bool streamAlive(std::string & streamname); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true); } diff --git a/src/output/output.cpp b/src/output/output.cpp index 6640ea36..0ee166ad 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -97,7 +97,7 @@ namespace Mist { return; //abort - no stream to initialize... } if (!Util::startInput(streamName)){ - DEBUG_MSG(DLVL_FAIL, "Opening stream disallowed - aborting initalization"); + DEBUG_MSG(DLVL_FAIL, "Opening stream failed - aborting initalization"); onFail(); return; }