New input starting method based on JSON capabilities.

This commit is contained in:
Thulinma 2014-10-02 15:38:59 +02:00
parent b4feaebbe8
commit 08953540f6
5 changed files with 159 additions and 119 deletions

View file

@ -13,6 +13,7 @@
#include "socket.h"
#include "defines.h"
#include "shared_memory.h"
#include "dtsc.h"
std::string Util::getTmpFolder() {
std::string dir;
@ -46,7 +47,7 @@ std::string Util::getTmpFolder() {
/// letters to lowercase. If a '?' character is found, everything following
/// that character is deleted. The original string is modified. If a '+'
/// exists, then only the part before the + is sanitized.
void Util::Stream::sanitizeName(std::string & streamname) {
void Util::sanitizeName(std::string & streamname) {
//strip anything that isn't numbers, digits or underscores
size_t index = streamname.find('+');
if(index != std::string::npos){
@ -68,113 +69,127 @@ void Util::Stream::sanitizeName(std::string & streamname) {
}
}
bool Util::Stream::getLive(std::string streamname) {
JSON::Value ServConf = JSON::fromFile(getTmpFolder() + "streamlist");
std::string bufferTime;
std::string debugLvl;
std::string player_bin = Util::getMyPath() + "MistInBuffer";
DEBUG_MSG(DLVL_WARN, "Starting %s -p -s %s", player_bin.c_str(), streamname.c_str());
char * argv[15] = {(char *)player_bin.c_str(), (char *)"-p", (char *)"-s", (char *)streamname.c_str()};
int argNum = 3;
if (ServConf["streams"][streamname].isMember("DVR")) {
bufferTime = ServConf["streams"][streamname]["DVR"].asString();
argv[++argNum] = (char *)"-b";
argv[++argNum] = (char *)bufferTime.c_str();
}
if (Util::Config::printDebugLevel != DEBUG){
debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString();
argv[++argNum] = (char *)"--debug";
argv[++argNum] = (char *)debugLvl.c_str();
}
argv[++argNum] = (char *)0;
int pid = fork();
if (pid == -1) {
FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno));
return false;
}
if (pid == 0){
execvp(argv[0], argv);
FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno));
_exit(42);
}
return true;
}
/// Starts a process for a VoD stream.
bool Util::Stream::getVod(std::string filename, std::string streamname) {
std::string player_bin = Util::getMyPath() + "MistInDTSC";
bool selected = false;
if (filename.substr(filename.size() - 5) == ".dtsc") {
player_bin = Util::getMyPath() + "MistInDTSC";
selected = true;
}
if (filename.substr(filename.size() - 4) == ".flv") {
player_bin = Util::getMyPath() + "MistInFLV";
selected = true;
}
INFO_MSG("Starting %s -p -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
char * argv[15] = {(char *)player_bin.c_str(), (char *)"-p", (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()};
int argNum = 4;
std::string debugLvl;
if (Util::Config::printDebugLevel != DEBUG){
debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString();
argv[++argNum] = (char *)"--debug";
argv[++argNum] = (char *)debugLvl.c_str();
}
argv[++argNum] = (char *)0;
bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) {
IPC::sharedPage mistConfOut("!mistConfig", 4*1024*1024);
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
int pid = fork();
if (pid == -1) {
FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno));
sanitizeName(streamname);
std::string smp = streamname.substr(0, streamname.find('+'));
//check if smp (everything before +) exists
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
if (!stream_cfg){
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str());
configLock.post();//unlock the config semaphore
return false;
}
if (pid == 0){
execvp(argv[0], argv);
FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno));
_exit(42);
}
return true;
}
/// Probe for available streams. Currently first VoD, then Live.
bool Util::Stream::getStream(std::string streamname) {
sanitizeName(streamname);
JSON::Value ServConf = JSON::fromFile(getTmpFolder() + "streamlist");
std::string smp = streamname.substr(0,(streamname.find('+')));
//check if smp (everything before +) exists
///\todo Check if the input type used for this stream supports + syntax, if not, reject the request if smp != streamname.
if (ServConf["streams"].isMember(smp)){
//Check if the stream is already active, if yes, don't activate again.
//Note: this uses the _whole_ stream name, including + (if any).
//This means "test+a" and "test+b" have separate locks and do not interact with each other.
//If starting without filename parameter, check if the stream is already active.
//If yes, don't activate again to prevent duplicate inputs.
//It's still possible a duplicate starts anyway, this is caught in the inputs initializer.
//Note: this uses the _whole_ stream name, including + (if any).
//This means "test+a" and "test+b" have separate locks and do not interact with each other.
if (!filename.size()){
IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()) {
playerLock.close();
DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active - not activating again", streamname.c_str());
configLock.post();//unlock the config semaphore
return true;
}
playerLock.post();
playerLock.close();
if (ServConf["streams"][streamname]["source"].asString()[0] == '/') {
DEBUG_MSG(DLVL_MEDIUM, "Activating VoD stream %s", streamname.c_str());
return getVod(ServConf["streams"][streamname]["source"].asString(), streamname);
} else {
DEBUG_MSG(DLVL_MEDIUM, "Activating live stream %s", streamname.c_str());
return getLive(streamname);
filename = stream_cfg.getMember("source").asString();
}
std::string player_bin;
bool selected = false;
long long int curPrio = -1;
//check in curConf for capabilities-inputs-<naam>-priority/source_match
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
DTSC::Scan input;
unsigned int input_size = inputs.getSize();
for (unsigned int i = 0; i < input_size; ++i){
input = inputs.getIndice(i);
//if match voor current stream && priority is hoger dan wat we al hebben
if (curPrio < input.getMember("priority").asInt()){
std::string source = input.getMember("source_match").asString();
std::string front = source.substr(0,source.find('*'));
std::string back = source.substr(source.find('*')+1);
DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str());
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString();
curPrio = input.getMember("priority").asInt();
selected = true;
}
}
}
DEBUG_MSG(DLVL_ERROR, "Stream not found: %s", streamname.c_str());
return false;
}
if (!selected){
configLock.post();//unlock the config semaphore
FAIL_MSG("No compatible input found for stream %s: %s", streamname.c_str(), filename.c_str());
return false;
}
/// Create a stream on the system.
/// Filters the streamname, removing invalid characters and
/// converting all letters to lowercase.
/// If a '?' character is found, everything following that character is deleted.
Socket::Server Util::Stream::makeLive(std::string streamname) {
sanitizeName(streamname);
std::string loc = getTmpFolder() + "stream_" + streamname;
//create and return the Socket::Server
return Socket::Server(loc);
//copy the neccessary arguments to separate storage so we can unlock the config semaphore safely
std::map<std::string, std::string> str_args;
//check required parameters
DTSC::Scan required = input.getMember("required");
unsigned int req_size = required.getSize();
for (unsigned int i = 0; i < req_size; ++i){
std::string opt = required.getIndiceName(i);
if (!stream_cfg.getMember(opt)){
configLock.post();//unlock the config semaphore
FAIL_MSG("Required parameter %s for stream %s missing", opt.c_str(), streamname.c_str());
return false;
}
str_args[required.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString();
}
//check optional parameters
DTSC::Scan optional = input.getMember("optional");
unsigned int opt_size = optional.getSize();
for (unsigned int i = 0; i < opt_size; ++i){
std::string opt = optional.getIndiceName(i);
if (stream_cfg.getMember(opt)){
str_args[optional.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString();
}
}
//finally, unlock the config semaphore
configLock.post();
INFO_MSG("Starting %s -p -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()};
int argNum = 3;
std::string debugLvl;
if (Util::Config::printDebugLevel != DEBUG && !str_args.count("--debug")){
debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString();
argv[++argNum] = (char *)"--debug";
argv[++argNum] = (char *)debugLvl.c_str();
}
for (std::map<std::string, std::string>::iterator it = str_args.begin(); it != str_args.end(); ++it){
argv[++argNum] = (char *)it->first.c_str();
argv[++argNum] = (char *)it->second.c_str();
}
argv[++argNum] = (char *)0;
int pid = 1;
if (forkFirst){
pid = fork();
if (pid == -1) {
FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno));
return false;
}
}
if (pid == 0){
execvp(argv[0], argv);
FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno));
_exit(42);
}
return true;
}