mistserver/lib/stream.cpp
2016-05-05 16:05:12 +02:00

542 lines
20 KiB
C++

/// \file stream.cpp
/// Utilities for handling streams.
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <semaphore.h>
#include "json.h"
#include "stream.h"
#include "procs.h"
#include "config.h"
#include "socket.h"
#include "defines.h"
#include "shared_memory.h"
#include "dtsc.h"
#include "triggers.h"//LTS
/* roxlu-begin */
static std::string strftime_now(const std::string& format);
static void replace(std::string& str, const std::string& from, const std::string& to);
static void replace_variables(std::string& str);
/* roxlu-end */
std::string Util::getTmpFolder() {
std::string dir;
char * tmp_char = 0;
if (!tmp_char) {
tmp_char = getenv("TMP");
}
if (!tmp_char) {
tmp_char = getenv("TEMP");
}
if (!tmp_char) {
tmp_char = getenv("TMPDIR");
}
if (tmp_char) {
dir = tmp_char;
dir += "/mist";
} else {
#if defined(_WIN32) || defined(_CYGWIN_)
dir = "C:/tmp/mist";
#else
dir = "/tmp/mist";
#endif
}
if (access(dir.c_str(), 0) != 0) {
mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IRWXO); //attempt to create mist folder - ignore failures
}
return dir + "/";
}
/// Filters the streamname, removing invalid characters and converting all
/// letters to lowercase. If a '?' character is found, everything following
/// that character is deleted. The original string is modified. If a '+' or space
/// exists, then only the part before that is sanitized.
void Util::sanitizeName(std::string & streamname) {
//strip anything that isn't numbers, digits or underscores
size_t index = streamname.find_first_of("+ ");
if(index != std::string::npos){
std::string preplus = streamname.substr(0,index);
sanitizeName(preplus);
std::string postplus = streamname.substr(index+1);
if (postplus.find('?') != std::string::npos){
postplus = postplus.substr(0, (postplus.find('?')));
}
streamname = preplus+"+"+postplus;
return;
}
for (std::string::iterator i = streamname.end() - 1; i >= streamname.begin(); --i) {
if (*i == '?') {
streamname.erase(i, streamname.end());
break;
}
if ( !isalpha( *i) && !isdigit( *i) && *i != '_' && *i != '.'){
streamname.erase(i);
} else {
*i = tolower(*i);
}
}
}
JSON::Value Util::getStreamConfig(std::string streamname){
JSON::Value result;
if (streamname.size() > 100){
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
return result;
}
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
sanitizeName(streamname);
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
//check if smp (everything before + or space) exists
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
if (!stream_cfg){
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str());
}else{
result = stream_cfg.asJSON();
}
configLock.post();//unlock the config semaphore
return result;
}
/// Checks if the given streamname has an active input serving it. Returns true if this is the case.
/// Assumes the streamname has already been through sanitizeName()!
bool Util::streamAlive(std::string & streamname){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamname.c_str());
IPC::semaphore playerLock(semName, O_RDWR, ACCESSPERMS, 1, true);
if (!playerLock){return false;}
if (!playerLock.tryWait()) {
playerLock.close();
return true;
}else{
playerLock.post();
playerLock.close();
return false;
}
}
/// Assures the input for the given stream name is active.
/// Does stream name sanitizion first, followed by a stream name length check (<= 100 chars).
/// Then, checks if an input is already active by running streamAlive(). If yes, aborts.
/// If no, loads up the server configuration and attempts to start the given stream according to current config.
/// At this point, fails and aborts if MistController isn't running.
/// \triggers
/// The `"STREAM_LOAD"` trigger is stream-specific, and is ran right before launching an input for an inactive stream. If cancelled, the input is not launched. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// ~~~~~~~~~~~~~~~
/// The `"STREAM_SOURCE"` trigger is stream-specific, and is ran right before launching an input for an inactive stream. It cannot be cancelled, but an invalid source can be returned; which is effectively equivalent to cancelling.
/// This trigger is special: the response is used as source override for this stream, and not handled as normal. If used, the handler for this trigger MUST return a valid source to allow the stream input to load up at all. If used multiple times, the last defined handler overrides any and all previous handlers.
/// Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// ~~~~~~~~~~~~~~~
bool Util::startInput(std::string streamname, std::string filename, bool forkFirst) {
sanitizeName(streamname);
if (streamname.size() > 100){
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
return false;
}
//Check if the stream is already active.
//If yes, don't activate again to prevent duplicate inputs.
//It's still possible a duplicate starts anyway, this is caught in the inputs initializer.
//Note: this uses the _whole_ stream name, including + (if any).
//This means "test+a" and "test+b" have separate locks and do not interact with each other.
if (streamAlive(streamname)){
DEBUG_MSG(DLVL_MEDIUM, "Stream %s already active; continuing", streamname.c_str());
return true;
}
//Attempt to load up configuration and find this stream
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
//Lock the config to prevent race conditions and corruption issues while reading
configLock.wait();
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
//Abort if no config available
if (!config){
FAIL_MSG("Configuration not available, aborting! Is MistController running?");
configLock.post();//unlock the config semaphore
return false;
}
/*LTS-START*/
if (config.getMember("hardlimit_active")) {
return false;
}
/*LTS-END*/
//Find stream base name
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
//check if base name (everything before + or space) exists
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp);
if (!stream_cfg){
DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str());
}
/*LTS-START*/
if (stream_cfg && stream_cfg.getMember("hardlimit_active")) {
return false;
}
if(Triggers::shouldTrigger("STREAM_LOAD", smp)){
if (!Triggers::doTrigger("STREAM_LOAD", streamname, smp)){
return false;
}
}
if(Triggers::shouldTrigger("STREAM_SOURCE", smp)){
Triggers::doTrigger("STREAM_SOURCE", streamname, smp, false, filename);
}
/*LTS-END*/
//Only use configured source if not manually overridden. Abort if no config is available.
if (!filename.size()){
if (!stream_cfg){
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured, no source manually given, cannot start", streamname.c_str());
configLock.post();//unlock the config semaphore
return false;
}
filename = stream_cfg.getMember("source").asString();
}
//check in curConf for capabilities-inputs-<naam>-priority/source_match
std::string player_bin;
bool selected = false;
long long int curPrio = -1;
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
DTSC::Scan input;
unsigned int input_size = inputs.getSize();
for (unsigned int i = 0; i < input_size; ++i){
input = inputs.getIndice(i);
//if match voor current stream && priority is hoger dan wat we al hebben
if (input.getMember("source_match") && curPrio < input.getMember("priority").asInt()){
if (input.getMember("source_match").getSize()){
for(unsigned int j = 0; j < input.getMember("source_match").getSize(); ++j){
std::string source = input.getMember("source_match").getIndice(j).asString();
std::string front = source.substr(0,source.find('*'));
std::string back = source.substr(source.find('*')+1);
DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str());
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString();
curPrio = input.getMember("priority").asInt();
selected = true;
}
}
}else{
std::string source = input.getMember("source_match").asString();
std::string front = source.substr(0,source.find('*'));
std::string back = source.substr(source.find('*')+1);
DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str());
if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){
player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString();
curPrio = input.getMember("priority").asInt();
selected = true;
}
}
}
}
if (!selected){
configLock.post();//unlock the config semaphore
FAIL_MSG("No compatible input found for stream %s: %s", streamname.c_str(), filename.c_str());
return false;
}
//copy the neccessary arguments to separate storage so we can unlock the config semaphore safely
std::map<std::string, std::string> str_args;
//check required parameters
DTSC::Scan required = input.getMember("required");
unsigned int req_size = required.getSize();
for (unsigned int i = 0; i < req_size; ++i){
std::string opt = required.getIndiceName(i);
if (!stream_cfg.getMember(opt)){
configLock.post();//unlock the config semaphore
FAIL_MSG("Required parameter %s for stream %s missing", opt.c_str(), streamname.c_str());
return false;
}
str_args[required.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString();
}
//check optional parameters
DTSC::Scan optional = input.getMember("optional");
unsigned int opt_size = optional.getSize();
for (unsigned int i = 0; i < opt_size; ++i){
std::string opt = optional.getIndiceName(i);
DEBUG_MSG(DLVL_VERYHIGH, "Checking optional %u: %s", i, opt.c_str());
if (stream_cfg.getMember(opt)){
str_args[optional.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString();
}
}
//finally, unlock the config semaphore
configLock.post();
DEBUG_MSG(DLVL_MEDIUM, "Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str());
char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()};
int argNum = 3;
std::string debugLvl;
if (Util::Config::printDebugLevel != DEBUG && !str_args.count("--debug")){
debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString();
argv[++argNum] = (char *)"--debug";
argv[++argNum] = (char *)debugLvl.c_str();
}
for (std::map<std::string, std::string>::iterator it = str_args.begin(); it != str_args.end(); ++it){
argv[++argNum] = (char *)it->first.c_str();
argv[++argNum] = (char *)it->second.c_str();
}
argv[++argNum] = (char *)0;
int pid = 0;
if (forkFirst){
DEBUG_MSG(DLVL_DONTEVEN, "Forking");
pid = fork();
if (pid == -1) {
FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno));
return false;
}
}else{
DEBUG_MSG(DLVL_DONTEVEN, "Not forking");
}
if (pid == 0){
DEBUG_MSG(DLVL_DONTEVEN, "execvp");
execvp(argv[0], argv);
FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno));
_exit(42);
}
unsigned int waiting = 0;
while (!streamAlive(streamname) && ++waiting < 40){
Util::sleep(250);
}
return streamAlive(streamname);
}
/* roxlu-begin */
int Util::startRecording(std::string streamname) {
sanitizeName(streamname);
if (streamname.size() > 100){
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
return -1;
}
// Attempt to load up configuration and find this stream
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
//Lock the config to prevent race conditions and corruption issues while reading
configLock.wait();
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
//Abort if no config available
if (!config){
FAIL_MSG("Configuration not available, aborting! Is MistController running?");
configLock.post();//unlock the config semaphore
return -2;
}
//Find stream base name
std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
DTSC::Scan streamCfg = config.getMember("streams").getMember(smp);
if (!streamCfg){
DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str());
configLock.post();
return -3;
}
// When we have a validate trigger, we execute that first before we continue.
if (Triggers::shouldTrigger("RECORDING_VALIDATE", streamname)) {
std::string validate_result;
Triggers::doTrigger("RECORDING_VALIDATE", streamname, streamname.c_str(), false, validate_result);
INFO_MSG("RECORDING_VALIDATE returned: %s", validate_result.c_str());
if (validate_result == "0") {
INFO_MSG("RECORDING_VALIDATE: the hook returned 0 so we're not going to create a recording.");
configLock.post();
return 0;
}
}
// Should we start an flv output? (We allow hooks to specify custom filenames)
DTSC::Scan recordFilenameConf = streamCfg.getMember("record");
std::string recordFilename;
if (Triggers::shouldTrigger("RECORDING_FILEPATH", streamname)) {
std::string payload = streamname;
std::string filepath_response;
Triggers::doTrigger("RECORDING_FILEPATH", payload, streamname.c_str(), false, filepath_response); /* @todo do we need to handle the return of doTrigger? */
if (filepath_response.size() < 1024) { /* @todo is there a MAX_FILEPATH somewhere? */
recordFilename = filepath_response;
}
else {
FAIL_MSG("The RECORDING_FILEPATH trigger returned a filename which is bigger then our allowed max filename size. Not using returned filepath from hook.");
}
}
// No filename set through trigger, so use the one one from the stream config.
if (recordFilename.size() == 0) {
recordFilename = recordFilenameConf.asString();
}
/*if (recordFilename.size() == 0
|| recordFilename.substr(recordFilename.find_last_of(".") + 1) != "flv")
{
configLock.post();
return -4;
}*/
// The filename can hold variables like current time etc..
replace_variables(recordFilename);
replace(recordFilename, "$stream", streamname);
INFO_MSG("Filepath that we use for the recording: %s", recordFilename.c_str());
//to change hardcoding
//determine extension, first find the '.' for extension
size_t pointPlace = recordFilename.rfind(".");
if (pointPlace == std::string::npos){
FAIL_MSG("no extension found in output name. Aborting recording.");
return -1;
}
std::string fileExtension = recordFilename.substr(pointPlace+1);
DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors");
DTSC::Scan output;
std::string output_filepath = "";
unsigned int outputs_size = outputs.getSize();
HIGH_MSG("Recording outputs %d",outputs_size);
for (unsigned int i = 0; i<outputs_size; ++i){
output = outputs.getIndice(i);
HIGH_MSG("Checking output: %s",output.getMember("name").asString().c_str());
if (output.getMember("canRecord")){
HIGH_MSG("Output %s can record!", output.getMember("name").asString().c_str());
DTSC::Scan recTypes = output.getMember("canRecord");
unsigned int recTypesLength = recTypes.getSize();
bool breakOuterLoop = false;
for (unsigned int o = 0; o<recTypesLength; ++o){
if (recTypes.getIndice(o).asString() == fileExtension){
HIGH_MSG("Output %s can record %s!", output.getMember("name").asString().c_str(), fileExtension.c_str());
output_filepath = Util::getMyPath() + "MistOut" + output.getMember("name").asString();
breakOuterLoop = true;
break;
}
}
if (breakOuterLoop) break;
}
}
if (output_filepath == ""){
FAIL_MSG("No output found for filetype %s.", fileExtension.c_str());
return -4;
}
// Start output.
char* argv[] = {
(char*)output_filepath.c_str(),
(char*)"--stream", (char*)streamname.c_str(),
(char*)"--outputFilename", (char*)recordFilename.c_str(),
(char*)NULL
};
int pid = fork();
if (pid == -1) {
FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno));
configLock.post();
return -5;
}
// Child process gets pid == 0
if (pid == 0) {
if (execvp(argv[0], argv) == -1) {
FAIL_MSG("Failed to start MistOutFLV: %s", strerror(errno));
configLock.post();
return -6;
}
}
configLock.post();
return pid;
}
static void replace(std::string& str, const std::string& from, const std::string& to) {
if(from.empty()) {
return;
}
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length();
}
}
static void replace_variables(std::string& str) {
char buffer[80] = { 0 };
std::map<std::string, std::string> vars;
std::string day = strftime_now("%d");
std::string month = strftime_now("%m");
std::string year = strftime_now("%Y");
std::string hour = strftime_now("%H");
std::string minute = strftime_now("%M");
std::string seconds = strftime_now("%S");
std::string datetime = year +"." +month +"." +day +"." +hour +"." +minute +"." +seconds;
if (0 == day.size()) {
WARN_MSG("Failed to retrieve the current day with strftime_now().");
}
if (0 == month.size()) {
WARN_MSG("Failed to retrieve the current month with strftime_now().");
}
if (0 == year.size()) {
WARN_MSG("Failed to retrieve the current year with strftime_now().");
}
if (0 == hour.size()) {
WARN_MSG("Failed to retrieve the current hour with strftime_now().");
}
if (0 == minute.size()) {
WARN_MSG("Failed to retrieve the current minute with strftime_now().");
}
if (0 == seconds.size()) {
WARN_MSG("Failed to retrieve the current seconds with strftime_now().");
}
vars.insert(std::pair<std::string, std::string>("$day", day));
vars.insert(std::pair<std::string, std::string>("$month", month));
vars.insert(std::pair<std::string, std::string>("$year", year));
vars.insert(std::pair<std::string, std::string>("$hour", hour));
vars.insert(std::pair<std::string, std::string>("$minute", minute));
vars.insert(std::pair<std::string, std::string>("$second", seconds));
vars.insert(std::pair<std::string, std::string>("$datetime", datetime));
std::map<std::string, std::string>::iterator it = vars.begin();
while (it != vars.end()) {
replace(str, it->first, it->second);
++it;
}
}
static std::string strftime_now(const std::string& format) {
time_t rawtime;
struct tm* timeinfo = NULL;
char buffer [80] = { 0 };
time(&rawtime);
timeinfo = localtime (&rawtime);
if (0 == strftime(buffer, 80, format.c_str(), timeinfo)) {
FAIL_MSG("Call to stftime() failed with format: %s, maybe our buffer is not big enough (80 bytes).", format.c_str());
return "";
}
return buffer;
}
/* roxlu-end */