From dcde0501430ce65cc4e75b7b7c97e719ed33fd90 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 21 Jul 2015 22:23:47 +0200 Subject: [PATCH 1/4] Improved debug messages for all MistIn processes, added ability to manually kill MistIn processes. --- src/input/input.cpp | 6 +++--- src/input/mist_in.cpp | 25 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index 4155f5f2..8a0e1550 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -154,10 +154,10 @@ namespace Mist { } } - DEBUG_MSG(DLVL_DONTEVEN,"Pre-While"); + DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str()); long long int activityCounter = Util::bootSecs(); - while ((Util::bootSecs() - activityCounter) < 10){//10 second timeout + while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout Util::wait(1000); removeUnused(); userPage.parseEach(callbackWrapper); @@ -169,7 +169,7 @@ namespace Mist { } } finish(); - DEBUG_MSG(DLVL_DEVEL,"Closing clean"); + DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str()); //end player functionality } return 0; diff --git a/src/input/mist_in.cpp b/src/input/mist_in.cpp index 1f04fa15..a222f0cf 100644 --- a/src/input/mist_in.cpp +++ b/src/input/mist_in.cpp @@ -9,22 +9,24 @@ #include INPUTTYPE #include #include +#include int main(int argc, char * argv[]) { Util::Config conf(argv[0], PACKAGE_VERSION); mistIn conv(&conf); if (conf.parseArgs(argc, argv)) { + std::string streamName = conf.getString("streamname"); IPC::semaphore playerLock; - if(conf.getString("streamname").size()){ - playerLock.open(std::string("/lock_" + conf.getString("streamname")).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (streamName.size()){ + playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!playerLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", conf.getString("streamname").c_str()); + DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); return 1; } } conf.activate(); while (conf.is_active){ - int pid = fork(); + pid_t pid = fork(); if (pid == 0){ playerLock.close(); return conv.run(); @@ -36,15 +38,23 @@ int main(int argc, char * argv[]) { } //wait for the process to exit int status; - while (waitpid(pid, &status, 0) != pid && errno == EINTR) continue; + while (waitpid(pid, &status, 0) != pid && errno == EINTR){ + if (!conf.is_active){ + DEBUG_MSG(DLVL_DEVEL, "Shutting down input for stream %s because of signal interrupt...", streamName.c_str()); + Util::Procs::Stop(pid); + } + continue; + } //if the exit was clean, don't restart it if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ - DEBUG_MSG(DLVL_MEDIUM, "Finished player succesfully"); + DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str()); break; } if (DEBUG >= DLVL_DEVEL){ - DEBUG_MSG(DLVL_DEVEL, "Player exited with errors - stopping because this is a development build."); + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); break; + }else{ + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); } } playerLock.post(); @@ -53,4 +63,3 @@ int main(int argc, char * argv[]) { return 0; } - From c3efc1001fc1655de0f0594120006c9f8f33a640 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 21 Jul 2015 23:07:10 +0200 Subject: [PATCH 2/4] Improved startInput function behaviour and reliability, added streamAlive function. --- lib/stream.cpp | 77 ++++++++++++++++++++++++++++--------------- lib/stream.h | 1 + src/output/output.cpp | 2 +- 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/lib/stream.cpp b/lib/stream.cpp index 7c8551c3..27422962 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -73,50 +73,75 @@ void Util::sanitizeName(std::string & streamname) { } } -/// Starts a process for a VoD stream. +/// Checks if the given streamname has an active input serving it. Returns true if this is the case. +/// Assumes the streamname has already been through sanitizeName()! +bool Util::streamAlive(std::string & streamname){ + IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()) { + playerLock.close(); + return true; + }else{ + playerLock.post(); + playerLock.close(); + return false; + } +} + +/// Assures the input for the given stream name is active. +/// Does stream name sanitizion first, followed by a stream name length check (<= 100 chars). +/// Then, checks if an input is already active by running streamAlive(). If yes, aborts. +/// If no, loads up the server configuration and attempts to start the given stream according to current config. +/// At this point, fails and aborts if MistController isn't running. bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) { + sanitizeName(streamname); if (streamname.size() > 100){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return false; } - IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - - sanitizeName(streamname); - std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); - //check if smp (everything before + or space) exists - DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); - if (!stream_cfg){ - DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); - configLock.post();//unlock the config semaphore - return false; - } - - //If starting without filename parameter, check if the stream is already active. + //Check if the stream is already active. //If yes, don't activate again to prevent duplicate inputs. //It's still possible a duplicate starts anyway, this is caught in the inputs initializer. //Note: this uses the _whole_ stream name, including + (if any). //This means "test+a" and "test+b" have separate locks and do not interact with each other. + if (streamAlive(streamname)){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str()); + return true; + } + + //Attempt to load up configuration and find this stream + IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + //Lock the config to prevent race conditions and corruption issues while reading + configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + //Abort if no config available + if (!config){ + FAIL_MSG("Configuration not available, aborting! Is MistController running?"); + configLock.post();//unlock the config semaphore + return false; + } + //Find stream base name + std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); + //check if base name (everything before + or space) exists + DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); + if (!stream_cfg){ + DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str()); + } + + //Only use configured source if not manually overridden. Abort if no config is available. if (!filename.size()){ - IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()) { - playerLock.close(); - DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active - not activating again", streamname.c_str()); + if (!stream_cfg){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str()); configLock.post();//unlock the config semaphore - return true; + return false; } - playerLock.post(); - playerLock.close(); filename = stream_cfg.getMember("source").asString(); } - + //check in curConf for capabilities-inputs--priority/source_match std::string player_bin; bool selected = false; long long int curPrio = -1; - //check in curConf for capabilities-inputs--priority/source_match DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); DTSC::Scan input; unsigned int input_size = inputs.getSize(); diff --git a/lib/stream.h b/lib/stream.h index 1db6c471..07c935ea 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -8,5 +8,6 @@ namespace Util { std::string getTmpFolder(); void sanitizeName(std::string & streamname); + bool streamAlive(std::string & streamname); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true); } diff --git a/src/output/output.cpp b/src/output/output.cpp index 6640ea36..0ee166ad 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -97,7 +97,7 @@ namespace Mist { return; //abort - no stream to initialize... } if (!Util::startInput(streamName)){ - DEBUG_MSG(DLVL_FAIL, "Opening stream disallowed - aborting initalization"); + DEBUG_MSG(DLVL_FAIL, "Opening stream failed - aborting initalization"); onFail(); return; } From fce83c903b97a604a93951196f0f9ef22cf8de87 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 21 Jul 2015 23:07:41 +0200 Subject: [PATCH 3/4] Make internal HTTP output correctly serve embed codes for newer stream types. --- src/output/output_http_internal.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 936dd016..9e4f05b3 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -232,7 +232,7 @@ namespace Mist { H.Clean(); H.SetHeader("Content-Type", "application/smil"); - H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); H.SetBody("\n \n \n \n \n \n"+trackSources+" \n \n"); H.SendResponse("200", "OK", myConn); @@ -247,7 +247,7 @@ namespace Mist { host.resize(host.find(':')); } H.Clean(); - H.SetHeader("Server", "mistserver/" PACKAGE_VERSION); + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); if (rURL.substr(0, 6) != "/json_"){ H.SetHeader("Content-Type", "application/javascript"); @@ -267,7 +267,9 @@ namespace Mist { configLock.post(); //Stream metadata not found - attempt to start it if (Util::startInput(streamName)){ - streamIndex.init(streamName, DEFAULT_META_PAGE_SIZE); + char pageId[NAME_BUFFER_SIZE]; + snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + streamIndex.init(pageId, DEFAULT_META_PAGE_SIZE); if (streamIndex.mapped){ metaLock = true; metaLocker.wait(); From dc6b4ca0b9f0670a7c9927f78afb65a558d13ae0 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 21 Jul 2015 23:36:32 +0200 Subject: [PATCH 4/4] Gave MistOut processes the ability to restart MistIn processes and/or reconnect to them as/if needed. --- src/output/output.cpp | 30 ++++++++++++++++++++++-------- src/output/output.h | 1 + 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 0ee166ad..effe8489 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -96,14 +96,26 @@ namespace Mist { if (streamName.size() < 1){ return; //abort - no stream to initialize... } + isInitialized = true; + reconnect(); + selectDefaultTracks(); + sought = false; + } + + /// Connects or reconnects to the stream. + /// Assumes streamName class member has been set already. + /// Will start input if not currently active, calls onFail() if this does not succeed. + /// After assuring stream is online, clears metaPages, then sets metaPages[0], statsPage and userClient to (hopefully) valid handles. + /// Finally, calls updateMeta() + void Output::reconnect(){ if (!Util::startInput(streamName)){ DEBUG_MSG(DLVL_FAIL, "Opening stream failed - aborting initalization"); onFail(); return; } - isInitialized = true; char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + metaPages.clear(); metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE); if (!metaPages[0].mapped){ DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str()); @@ -113,14 +125,10 @@ namespace Mist { statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - if (!userClient.getData()){ - userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); - } + userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); updateMeta(); - selectDefaultTracks(); - sought = false; } - + void Output::selectDefaultTracks(){ if (!isInitialized){ initialize(); @@ -282,7 +290,13 @@ namespace Mist { if (!timeout){ DEBUG_MSG(DLVL_HIGH, "Requesting page with key %lu:%lld", trackId, keyNum); } - if (timeout++ > 100){ + ++timeout; + //if we've been waiting for this page for 3 seconds, reconnect to the stream - something might be going wrong... + if (timeout == 30){ + DEVEL_MSG("Loading is taking longer than usual, reconnecting to stream %s...", streamName.c_str()); + reconnect(); + } + if (timeout > 100){ DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page %lld for track %lu. Aborting.", keyNum, trackId); curPage.erase(trackId); currKeyOpen.erase(trackId); diff --git a/src/output/output.h b/src/output/output.h index e934213b..f8a67632 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -59,6 +59,7 @@ namespace Mist { virtual bool onFinish() { return false; } + void reconnect(); virtual void initialize(); virtual void sendHeader(); virtual void onFail();