diff --git a/CMakeLists.txt b/CMakeLists.txt index a7ec41a6..908ed781 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -475,6 +475,7 @@ makeOutput(WAV wav)#LTS makeOutput(WebRTC webrtc http)#LTS add_executable(MistProcFFMPEG + ${BINARY_DIR}/mist/.headers src/process/process_ffmpeg.cpp src/output/output_ebml.cpp src/input/input_ebml.cpp @@ -482,10 +483,20 @@ add_executable(MistProcFFMPEG src/output/output_http.cpp src/output/output.cpp src/io.cpp - ) -target_link_libraries(MistProcFFMPEG - mist - ) +) +target_link_libraries(MistProcFFMPEG mist) + +add_executable(MistProcMKVExec + ${BINARY_DIR}/mist/.headers + src/process/process_exec.cpp + src/output/output_ebml.cpp + src/input/input_ebml.cpp + src/input/input.cpp + src/output/output_http.cpp + src/output/output.cpp + src/io.cpp +) +target_link_libraries(MistProcMKVExec mist) if (NOT DEFINED NOSSL ) makeOutput(HTTPS https)#LTS @@ -745,7 +756,7 @@ target_link_libraries(urltest mist) add_test(URLTest COMMAND urltest) add_executable(logtest test/log.cpp ${BINARY_DIR}/mist/.headers) target_link_libraries(logtest mist) -add_test(LOGTest COMMAND urltest) +add_test(LOGTest COMMAND logtest) add_executable(downloadertest test/downloader.cpp ${BINARY_DIR}/mist/.headers) target_link_libraries(downloadertest mist) add_test(DownloaderTest COMMAND downloadertest) diff --git a/src/input/input.cpp b/src/input/input.cpp index 30805bcc..caecdff9 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -595,6 +595,10 @@ namespace Mist { std::map overrides; overrides["throughboot"] = ""; if(isSingular()){ + if (Util::streamAlive(streamName)){ + WARN_MSG("Stream already online, cancelling"); + return; + } overrides["singular"] = ""; } if (config->getBool("realtime") || (capa.isMember("hardcoded") && capa["hardcoded"].isMember("resume") && capa["hardcoded"]["resume"])){ diff --git a/src/process/process_exec.cpp b/src/process/process_exec.cpp new file mode 100644 index 00000000..ebea4e7d --- /dev/null +++ b/src/process/process_exec.cpp @@ -0,0 +1,200 @@ +#include "process_exec.h" +#include +#include +#include +#include +#include //for std::find +#include //for stat +#include //for stat +#include //for stat +#include + +int pipein[2], pipeout[2], pipeerr[2]; + +Util::Config co; +Util::Config conf; + +void sinkThread(void *){ + Mist::ProcessSink in(&co); + co.getOption("output", true).append("-"); + co.activate(); + MEDIUM_MSG("Running sink thread..."); + in.setInFile(pipeout[0]); + co.is_active = true; + in.run(); + conf.is_active = false; +} + +void sourceThread(void *){ + Mist::ProcessSource::init(&conf); + conf.getOption("streamname", true).append(Mist::opt["source"].c_str()); + conf.getOption("target",true).append("-?audio=all&video=all"); + if (Mist::opt.isMember("track_select")){ + conf.getOption("target",true).append("-?"+Mist::opt["track_select"].asString()); + } + conf.is_active = true; + Socket::Connection c(pipein[1],0); + Mist::ProcessSource out(c); + MEDIUM_MSG("Running source thread..." ); + out.run(); + co.is_active = false; +} + +int main(int argc, char * argv[]){ + Util::Config config(argv[0]); + JSON::Value capa; + + { + JSON::Value opt; + opt["arg"] = "string"; + opt["default"] = "-"; + opt["arg_num"] = 1; + opt["help"] = "JSON configuration, or - (default) to read from stdin"; + config.addOption("configuration", opt); + opt.null(); + opt["long"] = "json"; + opt["short"] = "j"; + opt["help"] = "Output connector info in JSON format, then exit."; + opt["value"].append(0); + config.addOption("json", opt); + + } + + if (!(config.parseArgs(argc, argv))){return 1;} + if (config.getBool("json")) { + + capa["name"] = "MKVExec"; + capa["desc"] = "Pipe MKV in, expect MKV out. You choose the executable in between yourself."; + + capa["required"]["exec"]["name"] = "Executable"; + capa["required"]["exec"]["help"] = "What to executable to run on the stream data"; + capa["required"]["exec"]["type"] = "string"; + + capa["optional"]["sink"]["name"] = "Target stream"; + capa["optional"]["sink"]["help"] = "What stream the encoded track should be added to. Defaults to source stream. May contain variables."; + capa["optional"]["sink"]["type"] = "string"; + capa["optional"]["sink"]["validate"][0u] = "streamname_with_wildcard_and_variables"; + + capa["optional"]["track_select"]["name"] = "Source selector(s)"; + capa["optional"]["track_select"]["help"] = "What tracks to select for the input. Defaults to audio=all&video=all."; + capa["optional"]["track_select"]["type"] = "string"; + capa["optional"]["track_select"]["validate"][0u] = "track_selector"; + capa["optional"]["track_select"]["default"] = "audio=all&video=all"; + + std::cout << capa.toString() << std::endl; + return -1; + } + + Util::redirectLogsIfNeeded(); + + //read configuration + if(config.getString("configuration") != "-"){ + Mist::opt = JSON::fromString(config.getString("configuration")); + }else{ + std::string json, line; + INFO_MSG("Reading configuration from standard input"); + while (std::getline(std::cin, line)){json.append(line);} + Mist::opt = JSON::fromString(json.c_str()); + } + + //check config for generic options + Mist::ProcMKVExec Enc; + if(!Enc.CheckConfig()){ + FAIL_MSG("Error config syntax error!"); + return 1; + } + + //create pipe pair before thread + pipe(pipein); + pipe(pipeout); + + //stream which connects to input + tthread::thread source(sourceThread, 0); + + //needs to pass through encoder to outputEBML + tthread::thread sink(sinkThread, 0); + + co.is_active = true; + + //run process + Enc.Run(); + + co.is_active = false; + conf.is_active = false; + + //close pipes + close(pipein[0]); + close(pipeout[0]); + close(pipein[1]); + close(pipeout[1]); + + sink.join(); + HIGH_MSG("sink thread joined") + + source.join(); + HIGH_MSG("source thread joined"); + + return 0; +} + +namespace Mist{ + ///check source, sink, source_track, codec, bitrate, flags and process options. + bool ProcMKVExec::CheckConfig(){ + // Check generic configuration variables + if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){ + FAIL_MSG("invalid source in config!"); + return false; + } + + if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){ + INFO_MSG("No sink explicitly set, using source as sink"); + } + + return true; + } + + void ProcMKVExec::Run(){ + Util::Procs p; + int ffer = 2; + pid_t execd_proc = -1; + + //exec command + char exec_cmd[10240]; + strncpy(exec_cmd, opt["exec"].asString().c_str(), 10240); + MEDIUM_MSG("Executing command: %s", exec_cmd); + uint8_t argCnt = 0; + char *startCh = 0; + char *args[1280]; + for (char *i = exec_cmd; i - exec_cmd < 10240; ++i){ + if (!*i){ + if (startCh){args[argCnt++] = startCh;} + break; + } + if (*i == ' '){ + if (startCh){ + args[argCnt++] = startCh; + startCh = 0; + *i = 0; + } + }else{ + if (!startCh){startCh = i;} + } + } + args[argCnt] = 0; + + execd_proc = p.StartPiped(args, &pipein[0], &pipeout[1], &ffer); + + while(conf.is_active && p.isRunning(execd_proc)){ + Util::sleep(200); + } + + while(p.isRunning(execd_proc)){ + MEDIUM_MSG("Stopping process..."); + p.StopAll(); + Util::sleep(200); + } + + MEDIUM_MSG("Closing process clean"); + } +} + diff --git a/src/process/process_exec.h b/src/process/process_exec.h new file mode 100644 index 00000000..d9772111 --- /dev/null +++ b/src/process/process_exec.h @@ -0,0 +1,66 @@ +#include +#include +#include "../input/input_ebml.h" +#include "../output/output_ebml.h" + +namespace Mist{ + bool getFirst = false; + bool sendFirst = false; + + uint64_t packetTimeDiff; + uint64_t sendPacketTime; + JSON::Value opt;///Options + + class ProcMKVExec { + public: + ProcMKVExec(){}; + bool CheckConfig(); + void Run(); + }; + + class ProcessSink : public InputEBML { + public: + ProcessSink(Util::Config *cfg) :InputEBML(cfg){}; + void getNext(bool smart = true){ + static bool recurse = false; + if (recurse){return InputEBML::getNext(smart);} + recurse = true; + InputEBML::getNext(smart); + recurse = false; + if(!getFirst){ + packetTimeDiff = sendPacketTime - thisPacket.getTime(); + getFirst = true; + } + uint64_t tmpLong; + uint64_t packTime = thisPacket.getTime() + packetTimeDiff; + //change packettime + char * data = thisPacket.getData(); + tmpLong = htonl((int)(packTime >> 32)); + memcpy(data+12, (char *)&tmpLong, 4); + tmpLong = htonl((int)(packTime & 0xFFFFFFFF)); + memcpy(data+16, (char *)&tmpLong, 4); + } + void setInFile(int stdin_val){ + inFile = fdopen(stdin_val, "r"); + streamName = opt["sink"].asString(); + if (!streamName.size()){streamName = opt["source"].asString();} + nProxy.streamName = streamName; + } + bool needsLock(){return false;} + bool isSingular(){return false;} + }; + + class ProcessSource : public OutEBML { + public: + ProcessSource(Socket::Connection & c): OutEBML(c){}; + void sendNext(){ + if(!sendFirst){ + sendPacketTime = thisPacket.getTime(); + sendFirst = true; + } + OutEBML::sendNext(); + } + }; +} + + diff --git a/src/process/process_ffmpeg.cpp b/src/process/process_ffmpeg.cpp index ab3b7978..1122f33f 100644 --- a/src/process/process_ffmpeg.cpp +++ b/src/process/process_ffmpeg.cpp @@ -24,11 +24,6 @@ uint64_t packetTimeDiff; uint64_t sendPacketTime; bool getFirst = false; bool sendFirst = false; -bool dump_ffmpeg = false; - -std::string supported_video_codec[] ={"H264","H265", "VP9"}; -std::string supported_audio_codec[] ={"AAC","OPUS", "MP3"}; -std::string supported_process[] ={"ffmpeg"}; uint32_t res_x = 0; uint32_t res_y = 0; @@ -53,9 +48,9 @@ void sourceThread(void *){ conf.getOption("streamname", true).append(opt["source"].c_str()); if(Enc.isAudio){ - conf.getOption("target",true).append("-?audio=" + opt["input_track"].asString() + "&video=0"); + conf.getOption("target",true).append("-?audio=" + opt["source_track"].asString() + "&video=0"); }else if(Enc.isVideo){ - conf.getOption("target",true).append("-?video=" + opt["input_track"].asString() + "&audio=0"); + conf.getOption("target",true).append("-?video=" + opt["source_track"].asString() + "&audio=0"); }else{ FAIL_MSG("Cannot set target option parameters"); return; @@ -80,21 +75,15 @@ int main(int argc, char * argv[]){ JSON::Value opt; opt["arg"] = "string"; opt["default"] = "-"; - opt["arg_num"] = 1ll; - opt["help"] = "where the configuration is read from, or - from stdin"; + opt["arg_num"] = 1; + opt["help"] = "JSON configuration, or - (default) to read from stdin"; config.addOption("configuration", opt); - JSON::Value ffmpeg_dump; - ffmpeg_dump["short"] = "f"; - ffmpeg_dump["value"].append(0ll); - ffmpeg_dump["help"] = "Show ffmpeg output"; - config.addOption("ffmpeg_output", ffmpeg_dump); - JSON::Value option; option["long"] = "json"; option["short"] = "j"; option["help"] = "Output connector info in JSON format, then exit."; - option["value"].append(0ll); + option["value"].append(0); config.addOption("json", option); } @@ -153,7 +142,7 @@ int main(int argc, char * argv[]){ capa["optional"]["sink"]["help"] = "What stream the encoded track should be added to. Defaults to source stream."; capa["optional"]["sink"]["placeholder"] = "source stream"; capa["optional"]["sink"]["type"] = "str"; - capa["optional"]["sink"]["validate"][0u] = "streamname_with_wildcard"; + capa["optional"]["sink"]["validate"][0u] = "streamname_with_wildcard_and_variables"; capa["optional"]["sink"]["n"] = 3; capa["optional"]["resolution"]["name"] = "resolution"; @@ -261,36 +250,17 @@ int main(int argc, char * argv[]){ } Util::redirectLogsIfNeeded(); - dump_ffmpeg = config.getBool("ffmpeg_output"); - std::string json; - std::string line; - - //read configuration file in json format + //read configuration if(config.getString("configuration") != "-"){ - std::ifstream ifile(config.getString("configuration")); - INFO_MSG("reading from file: %s", config.getString("configuration").c_str()); - if((bool)ifile){ - //file exists, read config file - while(ifile){ - std::getline(ifile, line); - json.append(line); - } - ifile.close(); - }else{ - FAIL_MSG("Incorrect config file"); - return 0; - } + opt = JSON::fromString(config.getString("configuration")); }else{ - //read from stdin - INFO_MSG("read from stdin"); - while (std::getline(std::cin, line)) - { - json.append(line); - } + std::string json, line; + INFO_MSG("Reading configuration from standard input"); + while (std::getline(std::cin, line)){json.append(line);} + opt = JSON::fromString(json.c_str()); } - opt = JSON::fromString(json.c_str()); Enc.SetConfig(opt); //check config for generic options @@ -367,7 +337,8 @@ namespace Mist{ void EncodeInputEBML::setInFile(int stdin_val){ inFile = fdopen(stdin_val, "r"); - streamName = opt["sink"].asString().c_str(); + streamName = opt["sink"].asString(); + if (!streamName.size()){streamName = opt["source"].asString();} nProxy.streamName = streamName; } @@ -385,6 +356,7 @@ namespace Mist{ } void EncodeOutputEBML::sendHeader(){ + realTime = 0; res_x = myMeta.tracks[getMainSelectedTrack()].width; res_y = myMeta.tracks[getMainSelectedTrack()].height; Enc.setResolution(res_x, res_y); @@ -415,15 +387,26 @@ namespace Mist{ } OutENC::OutENC(){ - ffcmd[10240] = 0; + ffcmd[10239] = 0; isAudio = false; isVideo = false; crf = -1; - sample_rate = 44100; + sample_rate = 0; + supportedVideoCodecs.insert("H264"); + supportedVideoCodecs.insert("H265"); + supportedVideoCodecs.insert("VP9"); + supportedAudioCodecs.insert("AAC"); + supportedAudioCodecs.insert("opus"); + supportedAudioCodecs.insert("MP3"); + } bool OutENC::buildAudioCommand(){ - snprintf(ffcmd,10240, "%s-i - -acodec %s -ar %s %s -strict -2 -ac 2 %s -f matroska - ", opt["process"].c_str(), codec.c_str(), std::to_string(sample_rate), getBitrateSetting().c_str(), flags.c_str()); + std::string samplerate; + if (sample_rate){ + samplerate = "-ar " + JSON::Value(sample_rate).asString(); + } + snprintf(ffcmd,10240, "ffmpeg -hide_banner -loglevel warning -i - -acodec %s %s -strict -2 -ac 2 %s -f matroska -live 1 -cluster_time_limit 100 - ", codec.c_str(), samplerate.c_str(), getBitrateSetting().c_str(), flags.c_str()); return true; } @@ -454,18 +437,14 @@ namespace Mist{ for(JSON::Iter it(opt["sources"]); it; ++it){ - if((*it).isMember("src") && (*it)["src"].isString()){ + if((*it).isMember("src") && (*it)["src"].isString()&& (*it)["src"].asString().size() > 3){ std::string src = (*it)["src"].asString(); -/* std::string ext = src.substr( src.length() - 3); - std::transform(ext.begin(), ext.end(), ext.begin(), ::tolower); if(ext == "gif"){ //for animated gif files, prepend extra parameter sprintf(in, " -ignore_loop 0 -i %s", src.c_str()); }else{ sprintf(in, " -i %s", src.c_str()); } -*/ - sprintf(in, " -i %s", src.c_str()); MEDIUM_MSG("Loading Input: %s", src.c_str()); }else{ @@ -482,19 +461,13 @@ namespace Mist{ int32_t i_y = 0; std::string i_anchor = "topleft"; - if((*it).isMember("width") && (*it)["width"].isInt()){ - i_width = (*it)["width"].asInt(); - } - if((*it).isMember("height") && (*it)["height"].isInt()){ + if ((*it).isMember("width") && (*it)["width"].asInt()){i_width = (*it)["width"].asInt();} + if ((*it).isMember("height") && (*it)["height"].asInt()){ i_height = (*it)["height"].asInt(); } - if((*it).isMember("x") && (*it)["x"].isInt()){ - i_x = (*it)["x"].asInt(); - } - if((*it).isMember("y") && (*it)["y"].isInt()){ - i_y = (*it)["y"].asInt(); - } + if ((*it).isMember("x")){i_x = (*it)["x"].asInt();} + if ((*it).isMember("y")){i_y = (*it)["y"].asInt();} if((*it).isMember("anchor") && (*it)["anchor"].isString()){ i_anchor = (*it)["anchor"].asString(); @@ -563,14 +536,7 @@ namespace Mist{ options.append(" -preset " + preset); } - snprintf(ffcmd,10240, "ffmpeg -loglevel quiet -s %dx%d -f rawvideo -pix_fmt rgb24 -r 25 -i /dev/zero %s %s -c:v %s %s %s -an -f matroska - ", res_x, res_y, s_input.c_str(), s_overlay.c_str(), codec.c_str(), getBitrateSetting().c_str(), flags.c_str()); - - if(dump_ffmpeg){ - snprintf(ffcmd,10240, "%s -f lavfi -i color=c=black:s=%dx%d %s %s -c:v %s %s %s %s -an -f matroska - ",opt["process"].c_str(), res_x, res_y, s_input.c_str(), s_overlay.c_str(), codec.c_str(), options.c_str(), getBitrateSetting().c_str(), flags.c_str()); - }else{ - - snprintf(ffcmd,10240, "%s -loglevel quiet -f lavfi -i color=c=black:s=%dx%d %s %s -c:v %s %s %s %s -an -f matroska - ", opt["process"].c_str(), res_x, res_y, s_input.c_str(), s_overlay.c_str(), codec.c_str(), options.c_str(), getBitrateSetting().c_str(), flags.c_str()); - } + snprintf(ffcmd,10240, "ffmpeg -hide_banner -loglevel warning -f lavfi -i color=c=black:s=%dx%d %s %s -c:v %s %s %s %s -an -f matroska - ", res_x, res_y, s_input.c_str(), s_overlay.c_str(), codec.c_str(), options.c_str(), getBitrateSetting().c_str(), flags.c_str()); return true; } @@ -615,7 +581,7 @@ namespace Mist{ if(isVideo){ if(crf > -1){ //use crf value instead of bitrate - setting = "-crf " + std::to_string(crf); + setting = "-crf " + JSON::Value(crf).asString(); }else{ //use bitrate value set above } @@ -699,7 +665,7 @@ namespace Mist{ } - ///check source, sink, input_track, codec, bitrate, flags and process options. + ///check source, sink, source_track, codec, bitrate, flags and process options. bool OutENC::CheckConfig(){ // Check generic configuration variables if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){ @@ -708,32 +674,16 @@ namespace Mist{ } if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){ - FAIL_MSG("invalid sink in config!"); - return false; + INFO_MSG("No sink explicitly set, using source as sink"); } - if (!opt.isMember("input_track") || !opt["input_track"] || !opt["input_track"].isString()){ - FAIL_MSG("invalid input_track in config!"); + if (supportedVideoCodecs.count(opt["codec"].asString())){isVideo = true;} + if (supportedAudioCodecs.count(opt["codec"].asString())){isAudio = true;} + if (!isVideo && !isAudio){ + FAIL_MSG("Codec: '%s' not supported!",opt["codec"].c_str()); return false; } - /** check json parameters **/ - if(std::find(std::begin(supported_process), std::end(supported_process), opt["process"].c_str()) == std::end(supported_process)){ - FAIL_MSG("Process: '%s' not supported!",opt["name"].c_str()); - return false; - } - - if(std::find(std::begin(supported_video_codec), std::end(supported_video_codec), opt["codec"].c_str()) == std::end(supported_video_codec)){ - if(std::find(std::begin(supported_audio_codec), std::end(supported_audio_codec), opt["codec"].c_str()) == std::end(supported_audio_codec)){ - FAIL_MSG("Codec: '%s' not supported!",opt["codec"].c_str()); - return false; - }else{ - isAudio = true; - } - }else{ - isVideo = true; - } - setCodec(opt["codec"]); std::string b_rate; diff --git a/src/process/process_ffmpeg.h b/src/process/process_ffmpeg.h index e2ce21bd..c57f203a 100644 --- a/src/process/process_ffmpeg.h +++ b/src/process/process_ffmpeg.h @@ -35,6 +35,8 @@ namespace Mist{ bool buildVideoCommand(); bool buildAudioCommand(); void prepareCommand(); + std::set supportedVideoCodecs; + std::set supportedAudioCodecs; }; class EncodeInputEBML : public InputEBML {