Merge branch 'development' into LTS_development
# Conflicts: # lib/stream.cpp # src/output/output_http_internal.cpp
This commit is contained in:
commit
569ef07f29
7 changed files with 97 additions and 49 deletions
|
@ -73,63 +73,86 @@ void Util::sanitizeName(std::string & streamname) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts a process for a VoD stream.
|
/// Checks if the given streamname has an active input serving it. Returns true if this is the case.
|
||||||
|
/// Assumes the streamname has already been through sanitizeName()!
|
||||||
|
bool Util::streamAlive(std::string & streamname){
|
||||||
|
IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
|
if (!playerLock.tryWait()) {
|
||||||
|
playerLock.close();
|
||||||
|
return true;
|
||||||
|
}else{
|
||||||
|
playerLock.post();
|
||||||
|
playerLock.close();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Assures the input for the given stream name is active.
|
||||||
|
/// Does stream name sanitizion first, followed by a stream name length check (<= 100 chars).
|
||||||
|
/// Then, checks if an input is already active by running streamAlive(). If yes, aborts.
|
||||||
|
/// If no, loads up the server configuration and attempts to start the given stream according to current config.
|
||||||
|
/// At this point, fails and aborts if MistController isn't running.
|
||||||
bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) {
|
bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) {
|
||||||
|
sanitizeName(streamname);
|
||||||
if (streamname.size() > 100){
|
if (streamname.size() > 100){
|
||||||
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
|
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
//Check if the stream is already active.
|
||||||
|
//If yes, don't activate again to prevent duplicate inputs.
|
||||||
|
//It's still possible a duplicate starts anyway, this is caught in the inputs initializer.
|
||||||
|
//Note: this uses the _whole_ stream name, including + (if any).
|
||||||
|
//This means "test+a" and "test+b" have separate locks and do not interact with each other.
|
||||||
|
if (streamAlive(streamname)){
|
||||||
|
DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Attempt to load up configuration and find this stream
|
||||||
IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE);
|
IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE);
|
||||||
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
|
//Lock the config to prevent race conditions and corruption issues while reading
|
||||||
configLock.wait();
|
configLock.wait();
|
||||||
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
|
||||||
|
//Abort if no config available
|
||||||
|
if (!config){
|
||||||
|
FAIL_MSG("Configuration not available, aborting! Is MistController running?");
|
||||||
|
configLock.post();//unlock the config semaphore
|
||||||
|
return false;
|
||||||
|
}
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
if (config.getMember("hardlimit_active")) {
|
if (config.getMember("hardlimit_active")) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
/*LTS-END*/
|
/*LTS-END*/
|
||||||
|
//Find stream base name
|
||||||
sanitizeName(streamname);
|
|
||||||
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
|
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
|
||||||
//check if smp (everything before + or space) exists
|
//check if base name (everything before + or space) exists
|
||||||
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
|
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
|
||||||
if (!stream_cfg){
|
if (!stream_cfg){
|
||||||
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str());
|
DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str());
|
||||||
configLock.post();//unlock the config semaphore
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*LTS-START*/
|
/*LTS-START*/
|
||||||
if (stream_cfg.getMember("hardlimit_active")) {
|
if (stream_cfg && stream_cfg.getMember("hardlimit_active")) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
/*LTS-END*/
|
/*LTS-END*/
|
||||||
|
|
||||||
|
|
||||||
//If starting without filename parameter, check if the stream is already active.
|
//Only use configured source if not manually overridden. Abort if no config is available.
|
||||||
//If yes, don't activate again to prevent duplicate inputs.
|
|
||||||
//It's still possible a duplicate starts anyway, this is caught in the inputs initializer.
|
|
||||||
//Note: this uses the _whole_ stream name, including + (if any).
|
|
||||||
//This means "test+a" and "test+b" have separate locks and do not interact with each other.
|
|
||||||
if (!filename.size()){
|
if (!filename.size()){
|
||||||
IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
if (!stream_cfg){
|
||||||
if (!playerLock.tryWait()) {
|
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str());
|
||||||
playerLock.close();
|
|
||||||
DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active - not activating again", streamname.c_str());
|
|
||||||
configLock.post();//unlock the config semaphore
|
configLock.post();//unlock the config semaphore
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
playerLock.post();
|
|
||||||
playerLock.close();
|
|
||||||
filename = stream_cfg.getMember("source").asString();
|
filename = stream_cfg.getMember("source").asString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//check in curConf for capabilities-inputs-<naam>-priority/source_match
|
||||||
std::string player_bin;
|
std::string player_bin;
|
||||||
bool selected = false;
|
bool selected = false;
|
||||||
long long int curPrio = -1;
|
long long int curPrio = -1;
|
||||||
//check in curConf for capabilities-inputs-<naam>-priority/source_match
|
|
||||||
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
|
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
|
||||||
DTSC::Scan input;
|
DTSC::Scan input;
|
||||||
unsigned int input_size = inputs.getSize();
|
unsigned int input_size = inputs.getSize();
|
||||||
|
|
|
@ -8,5 +8,6 @@
|
||||||
namespace Util {
|
namespace Util {
|
||||||
std::string getTmpFolder();
|
std::string getTmpFolder();
|
||||||
void sanitizeName(std::string & streamname);
|
void sanitizeName(std::string & streamname);
|
||||||
|
bool streamAlive(std::string & streamname);
|
||||||
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true);
|
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,10 +154,10 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DEBUG_MSG(DLVL_DONTEVEN,"Pre-While");
|
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str());
|
||||||
|
|
||||||
long long int activityCounter = Util::bootSecs();
|
long long int activityCounter = Util::bootSecs();
|
||||||
while ((Util::bootSecs() - activityCounter) < 10){//10 second timeout
|
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout
|
||||||
Util::wait(1000);
|
Util::wait(1000);
|
||||||
removeUnused();
|
removeUnused();
|
||||||
userPage.parseEach(callbackWrapper);
|
userPage.parseEach(callbackWrapper);
|
||||||
|
@ -169,7 +169,7 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finish();
|
finish();
|
||||||
DEBUG_MSG(DLVL_DEVEL,"Closing clean");
|
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str());
|
||||||
//end player functionality
|
//end player functionality
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -9,22 +9,24 @@
|
||||||
#include INPUTTYPE
|
#include INPUTTYPE
|
||||||
#include <mist/config.h>
|
#include <mist/config.h>
|
||||||
#include <mist/defines.h>
|
#include <mist/defines.h>
|
||||||
|
#include <mist/procs.h>
|
||||||
|
|
||||||
int main(int argc, char * argv[]) {
|
int main(int argc, char * argv[]) {
|
||||||
Util::Config conf(argv[0], PACKAGE_VERSION);
|
Util::Config conf(argv[0], PACKAGE_VERSION);
|
||||||
mistIn conv(&conf);
|
mistIn conv(&conf);
|
||||||
if (conf.parseArgs(argc, argv)) {
|
if (conf.parseArgs(argc, argv)) {
|
||||||
|
std::string streamName = conf.getString("streamname");
|
||||||
IPC::semaphore playerLock;
|
IPC::semaphore playerLock;
|
||||||
if(conf.getString("streamname").size()){
|
if (streamName.size()){
|
||||||
playerLock.open(std::string("/lock_" + conf.getString("streamname")).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
if (!playerLock.tryWait()){
|
if (!playerLock.tryWait()){
|
||||||
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", conf.getString("streamname").c_str());
|
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conf.activate();
|
conf.activate();
|
||||||
while (conf.is_active){
|
while (conf.is_active){
|
||||||
int pid = fork();
|
pid_t pid = fork();
|
||||||
if (pid == 0){
|
if (pid == 0){
|
||||||
playerLock.close();
|
playerLock.close();
|
||||||
return conv.run();
|
return conv.run();
|
||||||
|
@ -36,15 +38,23 @@ int main(int argc, char * argv[]) {
|
||||||
}
|
}
|
||||||
//wait for the process to exit
|
//wait for the process to exit
|
||||||
int status;
|
int status;
|
||||||
while (waitpid(pid, &status, 0) != pid && errno == EINTR) continue;
|
while (waitpid(pid, &status, 0) != pid && errno == EINTR){
|
||||||
|
if (!conf.is_active){
|
||||||
|
DEBUG_MSG(DLVL_DEVEL, "Shutting down input for stream %s because of signal interrupt...", streamName.c_str());
|
||||||
|
Util::Procs::Stop(pid);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
//if the exit was clean, don't restart it
|
//if the exit was clean, don't restart it
|
||||||
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
|
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
|
||||||
DEBUG_MSG(DLVL_MEDIUM, "Finished player succesfully");
|
DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (DEBUG >= DLVL_DEVEL){
|
if (DEBUG >= DLVL_DEVEL){
|
||||||
DEBUG_MSG(DLVL_DEVEL, "Player exited with errors - stopping because this is a development build.");
|
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
|
||||||
break;
|
break;
|
||||||
|
}else{
|
||||||
|
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
playerLock.post();
|
playerLock.post();
|
||||||
|
@ -53,4 +63,3 @@ int main(int argc, char * argv[]) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -107,14 +107,26 @@ namespace Mist {
|
||||||
if (streamName.size() < 1){
|
if (streamName.size() < 1){
|
||||||
return; //abort - no stream to initialize...
|
return; //abort - no stream to initialize...
|
||||||
}
|
}
|
||||||
|
isInitialized = true;
|
||||||
|
reconnect();
|
||||||
|
selectDefaultTracks();
|
||||||
|
sought = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connects or reconnects to the stream.
|
||||||
|
/// Assumes streamName class member has been set already.
|
||||||
|
/// Will start input if not currently active, calls onFail() if this does not succeed.
|
||||||
|
/// After assuring stream is online, clears metaPages, then sets metaPages[0], statsPage and userClient to (hopefully) valid handles.
|
||||||
|
/// Finally, calls updateMeta()
|
||||||
|
void Output::reconnect(){
|
||||||
if (!Util::startInput(streamName)){
|
if (!Util::startInput(streamName)){
|
||||||
DEBUG_MSG(DLVL_FAIL, "Opening stream disallowed - aborting initalization");
|
DEBUG_MSG(DLVL_FAIL, "Opening stream failed - aborting initalization");
|
||||||
onFail();
|
onFail();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
isInitialized = true;
|
|
||||||
char pageId[NAME_BUFFER_SIZE];
|
char pageId[NAME_BUFFER_SIZE];
|
||||||
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
|
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
|
||||||
|
metaPages.clear();
|
||||||
metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE);
|
metaPages[0].init(pageId, DEFAULT_META_PAGE_SIZE);
|
||||||
if (!metaPages[0].mapped){
|
if (!metaPages[0].mapped){
|
||||||
DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str());
|
DEBUG_MSG(DLVL_FAIL, "Could not connect to server for %s\n", streamName.c_str());
|
||||||
|
@ -124,12 +136,8 @@ namespace Mist {
|
||||||
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
|
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
|
||||||
char userPageName[NAME_BUFFER_SIZE];
|
char userPageName[NAME_BUFFER_SIZE];
|
||||||
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
|
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
|
||||||
if (!userClient.getData()){
|
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
|
||||||
userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
|
|
||||||
}
|
|
||||||
updateMeta();
|
updateMeta();
|
||||||
selectDefaultTracks();
|
|
||||||
sought = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Output::selectDefaultTracks(){
|
void Output::selectDefaultTracks(){
|
||||||
|
@ -293,7 +301,13 @@ namespace Mist {
|
||||||
if (!timeout){
|
if (!timeout){
|
||||||
DEBUG_MSG(DLVL_HIGH, "Requesting page with key %lu:%lld", trackId, keyNum);
|
DEBUG_MSG(DLVL_HIGH, "Requesting page with key %lu:%lld", trackId, keyNum);
|
||||||
}
|
}
|
||||||
if (timeout++ > 100){
|
++timeout;
|
||||||
|
//if we've been waiting for this page for 3 seconds, reconnect to the stream - something might be going wrong...
|
||||||
|
if (timeout == 30){
|
||||||
|
DEVEL_MSG("Loading is taking longer than usual, reconnecting to stream %s...", streamName.c_str());
|
||||||
|
reconnect();
|
||||||
|
}
|
||||||
|
if (timeout > 100){
|
||||||
DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page %lld for track %lu. Aborting.", keyNum, trackId);
|
DEBUG_MSG(DLVL_FAIL, "Timeout while waiting for requested page %lld for track %lu. Aborting.", keyNum, trackId);
|
||||||
curPage.erase(trackId);
|
curPage.erase(trackId);
|
||||||
currKeyOpen.erase(trackId);
|
currKeyOpen.erase(trackId);
|
||||||
|
|
|
@ -70,6 +70,7 @@ namespace Mist {
|
||||||
virtual bool onFinish() {
|
virtual bool onFinish() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
void reconnect();
|
||||||
virtual void initialize();
|
virtual void initialize();
|
||||||
virtual void sendHeader();
|
virtual void sendHeader();
|
||||||
virtual void onFail();
|
virtual void onFail();
|
||||||
|
|
|
@ -249,7 +249,7 @@ namespace Mist {
|
||||||
|
|
||||||
H.Clean();
|
H.Clean();
|
||||||
H.SetHeader("Content-Type", "application/smil");
|
H.SetHeader("Content-Type", "application/smil");
|
||||||
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver);
|
H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
|
||||||
H.setCORSHeaders();
|
H.setCORSHeaders();
|
||||||
H.SetBody("<smil>\n <head>\n <meta base='rtmp://" + host + ":" + port + url_rel + "' />\n </head>\n <body>\n <switch>\n"+trackSources+" </switch>\n </body>\n</smil>");
|
H.SetBody("<smil>\n <head>\n <meta base='rtmp://" + host + ":" + port + url_rel + "' />\n </head>\n <body>\n <switch>\n"+trackSources+" </switch>\n </body>\n</smil>");
|
||||||
H.SendResponse("200", "OK", myConn);
|
H.SendResponse("200", "OK", myConn);
|
||||||
|
@ -264,7 +264,7 @@ namespace Mist {
|
||||||
host.resize(host.find(':'));
|
host.resize(host.find(':'));
|
||||||
}
|
}
|
||||||
H.Clean();
|
H.Clean();
|
||||||
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
|
H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
|
||||||
H.setCORSHeaders();
|
H.setCORSHeaders();
|
||||||
if (rURL.substr(0, 6) != "/json_"){
|
if (rURL.substr(0, 6) != "/json_"){
|
||||||
H.SetHeader("Content-Type", "application/javascript");
|
H.SetHeader("Content-Type", "application/javascript");
|
||||||
|
@ -287,9 +287,9 @@ namespace Mist {
|
||||||
configLock.post();
|
configLock.post();
|
||||||
//Stream metadata not found - attempt to start it
|
//Stream metadata not found - attempt to start it
|
||||||
if (Util::startInput(streamName)){
|
if (Util::startInput(streamName)){
|
||||||
char streamPageName[NAME_BUFFER_SIZE];
|
char pageId[NAME_BUFFER_SIZE];
|
||||||
snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
|
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
|
||||||
streamIndex.init(streamPageName, DEFAULT_META_PAGE_SIZE);
|
streamIndex.init(pageId, DEFAULT_META_PAGE_SIZE);
|
||||||
if (streamIndex.mapped){
|
if (streamIndex.mapped){
|
||||||
metaLock = true;
|
metaLock = true;
|
||||||
metaLocker.wait();
|
metaLocker.wait();
|
||||||
|
|
Loading…
Add table
Reference in a new issue