diff --git a/lib/stream.cpp b/lib/stream.cpp index 53eb5cae..f19fee3d 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -73,63 +73,86 @@ void Util::sanitizeName(std::string & streamname) { } } -/// Starts a process for a VoD stream. +/// Checks if the given streamname has an active input serving it. Returns true if this is the case. +/// Assumes the streamname has already been through sanitizeName()! +bool Util::streamAlive(std::string & streamname){ + IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()) { + playerLock.close(); + return true; + }else{ + playerLock.post(); + playerLock.close(); + return false; + } +} + +/// Assures the input for the given stream name is active. +/// Does stream name sanitizion first, followed by a stream name length check (<= 100 chars). +/// Then, checks if an input is already active by running streamAlive(). If yes, aborts. +/// If no, loads up the server configuration and attempts to start the given stream according to current config. +/// At this point, fails and aborts if MistController isn't running. bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) { + sanitizeName(streamname); if (streamname.size() > 100){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return false; } + //Check if the stream is already active. + //If yes, don't activate again to prevent duplicate inputs. + //It's still possible a duplicate starts anyway, this is caught in the inputs initializer. + //Note: this uses the _whole_ stream name, including + (if any). + //This means "test+a" and "test+b" have separate locks and do not interact with each other. + if (streamAlive(streamname)){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str()); + return true; + } + + //Attempt to load up configuration and find this stream IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + //Lock the config to prevent race conditions and corruption issues while reading configLock.wait(); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - + //Abort if no config available + if (!config){ + FAIL_MSG("Configuration not available, aborting! Is MistController running?"); + configLock.post();//unlock the config semaphore + return false; + } /*LTS-START*/ if (config.getMember("hardlimit_active")) { return false; } /*LTS-END*/ - - sanitizeName(streamname); + //Find stream base name std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); - //check if smp (everything before + or space) exists + //check if base name (everything before + or space) exists DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); if (!stream_cfg){ - DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); - configLock.post();//unlock the config semaphore - return false; + DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str()); } - /*LTS-START*/ - if (stream_cfg.getMember("hardlimit_active")) { + if (stream_cfg && stream_cfg.getMember("hardlimit_active")) { return false; } /*LTS-END*/ - //If starting without filename parameter, check if the stream is already active. - //If yes, don't activate again to prevent duplicate inputs. - //It's still possible a duplicate starts anyway, this is caught in the inputs initializer. - //Note: this uses the _whole_ stream name, including + (if any). - //This means "test+a" and "test+b" have separate locks and do not interact with each other. + //Only use configured source if not manually overridden. Abort if no config is available. if (!filename.size()){ - IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()) { - playerLock.close(); - DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active - not activating again", streamname.c_str()); + if (!stream_cfg){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str()); configLock.post();//unlock the config semaphore - return true; + return false; } - playerLock.post(); - playerLock.close(); filename = stream_cfg.getMember("source").asString(); } - + //check in curConf for capabilities-inputs--priority/source_match std::string player_bin; bool selected = false; long long int curPrio = -1; - //check in curConf for capabilities-inputs--priority/source_match DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); DTSC::Scan input; unsigned int input_size = inputs.getSize(); diff --git a/lib/stream.h b/lib/stream.h index 1db6c471..07c935ea 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -8,5 +8,6 @@ namespace Util { std::string getTmpFolder(); void sanitizeName(std::string & streamname); + bool streamAlive(std::string & streamname); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true); } diff --git a/src/input/input.cpp b/src/input/input.cpp index 4155f5f2..8a0e1550 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -154,10 +154,10 @@ namespace Mist { } } - DEBUG_MSG(DLVL_DONTEVEN,"Pre-While"); + DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str()); long long int activityCounter = Util::bootSecs(); - while ((Util::bootSecs() - activityCounter) < 10){//10 second timeout + while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout Util::wait(1000); removeUnused(); userPage.parseEach(callbackWrapper); @@ -169,7 +169,7 @@ namespace Mist { } } finish(); - DEBUG_MSG(DLVL_DEVEL,"Closing clean"); + DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str()); //end player functionality } return 0; diff --git a/src/input/mist_in.cpp b/src/input/mist_in.cpp index 1f04fa15..a222f0cf 100644 --- a/src/input/mist_in.cpp +++ b/src/input/mist_in.cpp @@ -9,22 +9,24 @@ #include INPUTTYPE #include #include +#include int main(int argc, char * argv[]) { Util::Config conf(argv[0], PACKAGE_VERSION); mistIn conv(&conf); if (conf.parseArgs(argc, argv)) { + std::string streamName = conf.getString("streamname"); IPC::semaphore playerLock; - if(conf.getString("streamname").size()){ - playerLock.open(std::string("/lock_" + conf.getString("streamname")).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (streamName.size()){ + playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!playerLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", conf.getString("streamname").c_str()); + DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); return 1; } } conf.activate(); while (conf.is_active){ - int pid = fork(); + pid_t pid = fork(); if (pid == 0){ playerLock.close(); return conv.run(); @@ -36,15 +38,23 @@ int main(int argc, char * argv[]) { } //wait for the process to exit int status; - while (waitpid(pid, &status, 0) != pid && errno == EINTR) continue; + while (waitpid(pid, &status, 0) != pid && errno == EINTR){ + if (!conf.is_active){ + DEBUG_MSG(DLVL_DEVEL, "Shutting down input for stream %s because of signal interrupt...", streamName.c_str()); + Util::Procs::Stop(pid); + } + continue; + } //if the exit was clean, don't restart it if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ - DEBUG_MSG(DLVL_MEDIUM, "Finished player succesfully"); + DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str()); break; } if (DEBUG >= DLVL_DEVEL){ - DEBUG_MSG(DLVL_DEVEL, "Player exited with errors - stopping because this is a development build."); + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); break; + }else{ + DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); } } playerLock.post(); @@ -53,4 +63,3 @@ int main(int argc, char * argv[]) { return 0; } - diff --git a/src/output/output.cpp b/src/output/output.cpp index 8eba1934..3cf83e61 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -107,14 +107,26 @@ namespace Mist { if (streamName.size() < 1){ return; //abort - no stream to initialize... } + isInitialized = true; + reconnect(); + selectDefaultTracks(); + sought = false; + } + + /// Connects or reconnects to the stream. + /// Assumes streamName class member has been set already. + /// Will start input if not currently active, calls onFail() if this does not succeed. + /// After assuring stream is online, clears metaPages, then sets metaPages[0], statsPage and userClient to (hopefully) valid handles. + /// Finally, calls updateMeta() + void Output::reconnect(){ if (!Util::startInput(streamName)){ - DEBUG_MSG(DLVL_FAIL, "Opening stream disallowed - aborting initalization"); + DEBUG_MSG(DLVL_FAIL, "Opening stream failed - aborting initalization"); onFail(); return; } - isInitialized = true; char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + metaPages.clear(); metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE); if (!metaPages[0].mapped){ DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str()); @@ -124,14 +136,10 @@ namespace Mist { statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - if (!userClient.getData()){ - userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); - } + userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); updateMeta(); - selectDefaultTracks(); - sought = false; } - + void Output::selectDefaultTracks(){ if (!isInitialized){ initialize(); @@ -293,7 +301,13 @@ namespace Mist { if (!timeout){ DEBUG_MSG(DLVL_HIGH, "Requesting page with key %lu:%lld", trackId, keyNum); } - if (timeout++ > 100){ + ++timeout; + //if we've been waiting for this page for 3 seconds, reconnect to the stream - something might be going wrong... + if (timeout == 30){ + DEVEL_MSG("Loading is taking longer than usual, reconnecting to stream %s...", streamName.c_str()); + reconnect(); + } + if (timeout > 100){ DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page %lld for track %lu. Aborting.", keyNum, trackId); curPage.erase(trackId); currKeyOpen.erase(trackId); diff --git a/src/output/output.h b/src/output/output.h index 5223e78a..5f546ddd 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -70,6 +70,7 @@ namespace Mist { virtual bool onFinish() { return false; } + void reconnect(); virtual void initialize(); virtual void sendHeader(); virtual void onFail(); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index b846390e..a4ea0984 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -249,7 +249,7 @@ namespace Mist { H.Clean(); H.SetHeader("Content-Type", "application/smil"); - H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver); + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); H.SetBody("\n \n \n \n \n \n"+trackSources+" \n \n"); H.SendResponse("200", "OK", myConn); @@ -264,7 +264,7 @@ namespace Mist { host.resize(host.find(':')); } H.Clean(); - H.SetHeader("Server", "mistserver/" PACKAGE_VERSION); + H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); if (rURL.substr(0, 6) != "/json_"){ H.SetHeader("Content-Type", "application/javascript"); @@ -287,9 +287,9 @@ namespace Mist { configLock.post(); //Stream metadata not found - attempt to start it if (Util::startInput(streamName)){ - char streamPageName[NAME_BUFFER_SIZE]; - snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); - streamIndex.init(streamPageName, DEFAULT_META_PAGE_SIZE); + char pageId[NAME_BUFFER_SIZE]; + snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + streamIndex.init(pageId, DEFAULT_META_PAGE_SIZE); if (streamIndex.mapped){ metaLock = true; metaLocker.wait();