diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 30bb636f..7c959d50 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -24,6 +24,8 @@ namespace DTSC{ /// The mask that the current process will use to check if a track is valid uint8_t trackValidMask = TRACK_VALID_ALL; + /// The mask that will be set by the current process for new tracks + uint8_t trackValidDefault = TRACK_VALID_ALL; /// Default constructor for packets - sets a null pointer and invalid packet. Packet::Packet(){ @@ -1701,7 +1703,7 @@ namespace DTSC{ trackList.setInt(trackPidField, getpid(), tNumber); trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber); trackList.addRecords(1); - if (setValid){validateTrack(tNumber);} + if (setValid){validateTrack(tNumber, trackValidDefault);} if (!isMemBuf){trackLock.post();} return tNumber; } diff --git a/lib/dtsc.h b/lib/dtsc.h index 127d00e9..796e932e 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -38,6 +38,7 @@ namespace DTSC{ extern uint64_t veryUglyJitterOverride; extern uint8_t trackValidMask; + extern uint8_t trackValidDefault; ///\brief This enum holds all possible datatypes for DTSC packets. enum datatype{ diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index ebfcebed..c376fd82 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -520,6 +520,12 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ setPushStatus(statUp["id"].asInt(), statUp["status"]); } } + if (Request.isMember("proc_status_update")){ + JSON::Value &statUp = Request["proc_status_update"]; + if (statUp.isMember("id") && statUp.isMember("status") && statUp.isMember("source") && statUp.isMember("proc") && statUp.isMember("sink")){ + setProcStatus(statUp["id"].asInt(), statUp["proc"].asStringRef(), statUp["source"].asStringRef(), statUp["sink"].asStringRef(), statUp["status"]); + } + } /*LTS-END*/ if (Request.isMember("config_backup")){ @@ -1076,6 +1082,10 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ } } + if (Request.isMember("proc_list")){ + getProcsForStream(Request["proc_list"].asStringRef(), Response["proc_list"]); + } + if (Request.isMember("push_list")){Controller::listPush(Response["push_list"]);} if (Request.isMember("push_stop")){ diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 1f6f24df..b78d29ec 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -55,6 +55,7 @@ namespace Controller{ Storage["log"].append(m); Storage["log"].shrink(100); // limit to 100 log messages if (isPushActive(progPid)){pushLogMessage(progPid, m);} //LTS + if (isProcActive(progPid)){procLogMessage(progPid, m);} //LTS logCounter++; if (rlxLogs && rlxLogs->isReady()){ if (!firstLog){firstLog = logCounter;} diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 19e50a80..ef7f705e 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -3,6 +3,7 @@ #include "controller_statistics.h" #include "controller_storage.h" #include "controller_streams.h" +#include #include #include #include @@ -18,6 +19,62 @@ namespace Controller{ std::map inputProcesses; + /// Internal list of currently active processes + class procInfo{ + public: + JSON::Value stats; + std::string source; + std::string proc; + std::string sink; + uint64_t lastupdate; + JSON::Value logs; + }; + std::map activeProcs; + + void procLogMessage(uint64_t id, const JSON::Value & msg){ + JSON::Value &log = activeProcs[id].logs; + log.append(msg); + log.shrink(25); + } + + bool isProcActive(uint64_t id){ + return activeProcs.count(id); + } + + + void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList){ + std::set wipeList; + for (std::map::iterator it = activeProcs.begin(); it != activeProcs.end(); ++it){ + if (!stream.size() || stream == it->second.sink || stream == it->second.source){ + JSON::Value & thisProc = returnedProcList[JSON::Value(it->first).asString()]; + thisProc = it->second.stats; + thisProc["source"] = it->second.source; + thisProc["sink"] = it->second.sink; + thisProc["process"] = it->second.proc; + thisProc["logs"] = it->second.logs; + if (!Util::Procs::isRunning(it->first)){ + thisProc["terminated"] = true; + wipeList.insert(it->first); + } + } + } + while (wipeList.size()){ + activeProcs.erase(*wipeList.begin()); + wipeList.erase(wipeList.begin()); + } + } + + void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status){ + procInfo & prc = activeProcs[id]; + prc.lastupdate = Util::bootSecs(); + prc.stats.extend(status); + if (!prc.proc.size() && sink.size() && source.size() && proc.size()){ + prc.sink = sink; + prc.source = source; + prc.proc = proc; + } + } + ///\brief Checks whether two streams are equal. ///\param one The first stream for the comparison. ///\param two The second stream for the comparison. diff --git a/src/controller/controller_streams.h b/src/controller/controller_streams.h index d93e64a9..8af09c88 100644 --- a/src/controller/controller_streams.h +++ b/src/controller/controller_streams.h @@ -1,6 +1,10 @@ #include namespace Controller{ + void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status); + void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList); + void procLogMessage(uint64_t id, const JSON::Value & msg); + bool isProcActive(uint64_t id); bool streamsEqual(JSON::Value &one, JSON::Value &two); void checkStream(std::string name, JSON::Value &data); bool CheckAllStreams(JSON::Value &data); diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index f5882c98..3f62e7b5 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -27,6 +27,9 @@ namespace Mist{ inputBuffer::inputBuffer(Util::Config *cfg) : Input(cfg){ + firstProcTime = 0; + lastProcTime = 0; + allProcsRunning = false; capa["optional"].removeMember("realtime"); @@ -506,6 +509,29 @@ namespace Mist{ /*LTS-START*/ // Reload the configuration to make sure we stay up to date with changes through the api if (Util::epoch() - lastReTime > 4){preRun();} + size_t procInterval = 5000; + if (!firstProcTime || Util::bootMS() - firstProcTime < 30000){ + if (!firstProcTime){firstProcTime = Util::bootMS();} + if (Util::bootMS() - firstProcTime < 10000){ + procInterval = 200; + }else{ + procInterval = 1000; + } + } + if (Util::bootMS() - lastProcTime > procInterval){ + lastProcTime = Util::bootMS(); + std::string strName = config->getString("streamname"); + Util::sanitizeName(strName); + strName = strName.substr(0, (strName.find_first_of("+ "))); + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan streamCfg = rStrmConf.getScan(); + if (streamCfg){ + JSON::Value configuredProcesses = streamCfg.getMember("processes").asJSON(); + checkProcesses(configuredProcesses); + } + } /*LTS-END*/ connectedUsers = 0; @@ -537,7 +563,9 @@ namespace Mist{ } } void inputBuffer::userLeadOut(){ - if (config->is_active && streamStatus){streamStatus.mapped[0] = hasPush ? STRMSTAT_READY : STRMSTAT_WAIT;} + if (config->is_active && streamStatus){ + streamStatus.mapped[0] = (hasPush && allProcsRunning) ? STRMSTAT_READY : STRMSTAT_WAIT; + } if (hasPush){everHadPush = true;} if (!hasPush && everHadPush && !resumeMode && config->is_active){ Util::logExitReason("source disconnected for non-resumable stream"); @@ -583,10 +611,6 @@ namespace Mist{ snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); Util::DTSCShmReader rStrmConf(tmpBuf); DTSC::Scan streamCfg = rStrmConf.getScan(); - if (streamCfg){ - JSON::Value configuredProcesses = streamCfg.getMember("processes").asJSON(); - checkProcesses(configuredProcesses); - } //Check if bufferTime setting is correct uint64_t tmpNum = retrieveSetting(streamCfg, "DVR", "bufferTime"); @@ -682,6 +706,7 @@ namespace Mist{ /*LTS-START*/ /// Checks if all processes are running, starts them if needed, stops them if needed void inputBuffer::checkProcesses(const JSON::Value &procs){ + allProcsRunning = true; if (!M.getValidTracks().size()){return;} std::set newProcs; @@ -762,6 +787,7 @@ namespace Mist{ argarr[3] = (char*)debugLvl.c_str();; argarr[4] = 0; } + allProcsRunning = false; INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]); runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err); } diff --git a/src/input/input_buffer.h b/src/input/input_buffer.h index 84c78345..975dd24f 100644 --- a/src/input/input_buffer.h +++ b/src/input/input_buffer.h @@ -16,9 +16,12 @@ namespace Mist{ uint64_t cutTime; size_t segmentSize; /*LTS*/ uint64_t lastReTime; /*LTS*/ + uint64_t lastProcTime; /*LTS*/ + uint64_t firstProcTime; /*LTS*/ uint64_t finalMillis; bool hasPush;//Is a push currently being received? bool everHadPush;//Was there ever a push received? + bool allProcsRunning; bool resumeMode; uint64_t maxKeepAway; IPC::semaphore *liveMeta; diff --git a/src/output/output.cpp b/src/output/output.cpp index da47fcfd..2d28c6ef 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -112,6 +112,7 @@ namespace Mist{ } if (isRecording() && DTSC::trackValidMask == TRACK_VALID_EXT_HUMAN){ DTSC::trackValidMask = TRACK_VALID_EXT_PUSH; + if (targetParams.count("unmask")){DTSC::trackValidMask = TRACK_VALID_ALL;} } /*LTS-END*/ } @@ -1721,7 +1722,7 @@ namespace Mist{ std::string APIcall = "{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}"; Socket::UDPConnection uSock; - uSock.SetDestination("localhost", 4242); + uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); uSock.SendNow(APIcall); newUA = false; } diff --git a/src/process/process_exec.cpp b/src/process/process_exec.cpp index 15cdd9e7..59ae19eb 100644 --- a/src/process/process_exec.cpp +++ b/src/process/process_exec.cpp @@ -14,6 +14,221 @@ int pipein[2], pipeout[2]; Util::Config co; Util::Config conf; +//Stat related stuff +JSON::Value pStat; +JSON::Value & pData = pStat["proc_status_update"]["status"]; +tthread::mutex statsMutex; +uint64_t statSinkMs = 0; +uint64_t statSourceMs = 0; + +namespace Mist{ + + class ProcessSink : public InputEBML{ + public: + ProcessSink(Util::Config *cfg) : InputEBML(cfg){ + capa["name"] = "MKVExec"; + }; + void getNext(size_t idx = INVALID_TRACK_ID){ + { + tthread::lock_guard guard(statsMutex); + if (pData["sink_tracks"].size() != userSelect.size()){ + pData["sink_tracks"].null(); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + pData["sink_tracks"].append(it->first); + } + } + } + static bool recurse = false; + if (recurse){return InputEBML::getNext(idx);} + recurse = true; + InputEBML::getNext(idx); + recurse = false; + uint64_t pTime = thisPacket.getTime(); + if (thisPacket){ + if (!getFirst){ + packetTimeDiff = sendPacketTime - pTime; + getFirst = true; + } + pTime += packetTimeDiff; + // change packettime + char *data = thisPacket.getData(); + Bit::htobll(data + 12, pTime); + if (pTime >= statSinkMs){statSinkMs = pTime;} + } + } + void setInFile(int stdin_val){ + inFile = fdopen(stdin_val, "r"); + streamName = opt["sink"].asString(); + if (!streamName.size()){streamName = opt["source"].asString();} + Util::streamVariables(streamName, opt["source"].asString()); + Util::setStreamName(opt["source"].asString() + "→" + streamName); + { + tthread::lock_guard guard(statsMutex); + pStat["proc_status_update"]["sink"] = streamName; + pStat["proc_status_update"]["source"] = opt["source"]; + } + } + bool needsLock(){return false;} + bool isSingular(){return false;} + }; + + class ProcessSource : public OutEBML{ + public: + bool isRecording(){return false;} + ProcessSource(Socket::Connection &c) : OutEBML(c){ + capa["name"] = "MKVExec"; + targetParams["keeptimes"] = true; + realTime = 0; + }; + virtual bool onFinish(){ + if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){ + if (userSelect.size()){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + INFO_MSG("Unmasking source track %zu" PRIu64, it->first); + meta.validateTrack(it->first, TRACK_VALID_ALL); + } + } + } + return OutEBML::onFinish(); + } + virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true){ + if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){ + INFO_MSG("Unmasking source track %zu" PRIu64, trackId); + meta.validateTrack(trackId, TRACK_VALID_ALL); + } + OutEBML::dropTrack(trackId, reason, probablyBad); + } + void sendHeader(){ + if (opt["masksource"].asBool()){ + for (std::map::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){ + if (ti->first == INVALID_TRACK_ID){continue;} + INFO_MSG("Masking source track %zu", ti->first); + meta.validateTrack(ti->first, meta.trackValid(ti->first) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH)); + } + } + realTime = 0; + OutEBML::sendHeader(); + }; + void sendNext(){ + { + tthread::lock_guard guard(statsMutex); + if (pData["source_tracks"].size() != userSelect.size()){ + pData["source_tracks"].null(); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + pData["source_tracks"].append(it->first); + } + } + } + if (thisTime > statSourceMs){statSourceMs = thisTime;} + extraKeepAway = 0; + needsLookAhead = 0; + maxSkipAhead = 0; + if (!sendFirst){ + sendPacketTime = thisPacket.getTime(); + sendFirst = true; + /* + uint64_t maxJitter = 1; + for (std::map::iterator ti = userSelect.begin(); ti != + userSelect.end(); ++ti){if (!M.trackValid(ti->first)){continue; + }// ignore missing tracks + if (M.getMinKeepAway(ti->first) > maxJitter){ + maxJitter = M.getMinKeepAway(ti->first); + } + } + DTSC::veryUglyJitterOverride = maxJitter; + */ + } + OutEBML::sendNext(); + } + }; + + /// check source, sink, source_track, codec, bitrate, flags and process options. + bool ProcMKVExec::CheckConfig(){ + // Check generic configuration variables + if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){ + FAIL_MSG("invalid source in config!"); + return false; + } + + if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){ + INFO_MSG("No sink explicitly set, using source as sink"); + } + + return true; + } + + void ProcMKVExec::Run(){ + int ffer = 2; + pid_t execd_proc = -1; + + + std::string streamName = opt["sink"].asString(); + if (!streamName.size()){streamName = opt["source"].asStringRef();} + Util::streamVariables(streamName, opt["source"].asStringRef()); + + //Do variable substitution on command + std::string tmpCmd = opt["exec"].asStringRef(); + Util::streamVariables(tmpCmd, streamName, opt["source"].asStringRef()); + + // exec command + char exec_cmd[10240]; + strncpy(exec_cmd, tmpCmd.c_str(), 10240); + INFO_MSG("Executing command: %s", exec_cmd); + uint8_t argCnt = 0; + char *startCh = 0; + char *args[1280]; + for (char *i = exec_cmd; i - exec_cmd < 10240; ++i){ + if (!*i){ + if (startCh){args[argCnt++] = startCh;} + break; + } + if (*i == ' '){ + if (startCh){ + args[argCnt++] = startCh; + startCh = 0; + *i = 0; + } + }else{ + if (!startCh){startCh = i;} + } + } + args[argCnt] = 0; + + execd_proc = Util::Procs::StartPiped(args, &pipein[0], &pipeout[1], &ffer); + + uint64_t lastProcUpdate = Util::bootSecs(); + { + tthread::lock_guard guard(statsMutex); + pStat["proc_status_update"]["id"] = getpid(); + pStat["proc_status_update"]["proc"] = "MKVExec"; + pData["ainfo"]["child_pid"] = execd_proc; + pData["ainfo"]["cmd"] = opt["exec"]; + } + uint64_t startTime = Util::bootSecs(); + while (conf.is_active && Util::Procs::isRunning(execd_proc)){ + Util::sleep(200); + if (lastProcUpdate + 5 <= Util::bootSecs()){ + tthread::lock_guard guard(statsMutex); + pData["active_seconds"] = (Util::bootSecs() - startTime); + pData["ainfo"]["sourceTime"] = statSourceMs; + pData["ainfo"]["sinkTime"] = statSinkMs; + Socket::UDPConnection uSock; + uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); + uSock.SendNow(pStat.toString()); + lastProcUpdate = Util::bootSecs(); + } + } + + while (Util::Procs::isRunning(execd_proc)){ + INFO_MSG("Stopping process..."); + Util::Procs::StopAll(); + Util::sleep(200); + } + + INFO_MSG("Closing process clean"); + } +}// namespace Mist + void sinkThread(void *){ Mist::ProcessSink in(&co); co.getOption("output", true).append("-"); @@ -80,17 +295,58 @@ int main(int argc, char *argv[]){ capa["codecs"][0u][1u].append("DTS"); capa["codecs"][0u][2u].append("+JSON"); + capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp"; + capa["ainfo"]["sourceTime"]["name"] = "Source timestamp"; + capa["ainfo"]["child_pid"]["name"] = "Child process PID"; + capa["ainfo"]["cmd"]["name"] = "Child process command"; + if (!(config.parseArgs(argc, argv))){return 1;} if (config.getBool("json")){ capa["name"] = "MKVExec"; capa["desc"] = "Pipe MKV in, expect MKV out. You choose the executable in between yourself."; - capa["optional"]["masksource"]["name"] = "Make source track(s) unavailable for users"; - capa["optional"]["masksource"]["help"] = "If enabled, makes the source track(s) internal-only, so that external users and pushes cannot access them."; - capa["optional"]["masksource"]["type"] = "boolean"; - capa["optional"]["masksource"]["default"] = false; + capa["optional"]["source_mask"]["name"] = "Source track mask"; + capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)"; + capa["optional"]["source_mask"]["type"] = "select"; + capa["optional"]["source_mask"]["select"][0u][0u] = ""; + capa["optional"]["source_mask"]["select"][0u][1u] = "Keep original value"; + capa["optional"]["source_mask"]["select"][1u][0u] = 255; + capa["optional"]["source_mask"]["select"][1u][1u] = "Everything"; + capa["optional"]["source_mask"]["select"][2u][0u] = 4; + capa["optional"]["source_mask"]["select"][2u][1u] = "Processing tasks (not viewers, not pushes)"; + capa["optional"]["source_mask"]["select"][3u][0u] = 6; + capa["optional"]["source_mask"]["select"][3u][1u] = "Processing and pushing tasks (not viewers)"; + capa["optional"]["source_mask"]["select"][4u][0u] = 5; + capa["optional"]["source_mask"]["select"][4u][1u] = "Processing and viewer tasks (not pushes)"; + capa["optional"]["source_mask"]["default"] = ""; + capa["optional"]["target_mask"]["name"] = "Output track mask"; + capa["optional"]["target_mask"]["help"] = "What internal processes should have access to the ouput track(s)"; + capa["optional"]["target_mask"]["type"] = "select"; + capa["optional"]["target_mask"]["select"][0u][0u] = ""; + capa["optional"]["target_mask"]["select"][0u][1u] = "Keep original value"; + capa["optional"]["target_mask"]["select"][1u][0u] = 255; + capa["optional"]["target_mask"]["select"][1u][1u] = "Everything"; + capa["optional"]["target_mask"]["select"][2u][0u] = 1; + capa["optional"]["target_mask"]["select"][2u][1u] = "Viewer tasks (not processing, not pushes)"; + capa["optional"]["target_mask"]["select"][3u][0u] = 2; + capa["optional"]["target_mask"]["select"][3u][1u] = "Pushing tasks (not processing, not viewers)"; + capa["optional"]["target_mask"]["select"][4u][0u] = 4; + capa["optional"]["target_mask"]["select"][4u][1u] = "Processing tasks (not pushes, not viewers)"; + capa["optional"]["target_mask"]["select"][5u][0u] = 3; + capa["optional"]["target_mask"]["select"][5u][1u] = "Viewer and pushing tasks (not processing)"; + capa["optional"]["target_mask"]["select"][6u][0u] = 5; + capa["optional"]["target_mask"]["select"][6u][1u] = "Viewer and processing tasks (not pushes)"; + capa["optional"]["target_mask"]["select"][7u][0u] = 6; + capa["optional"]["target_mask"]["select"][7u][1u] = "Pushing and processing tasks (not viewers)"; + capa["optional"]["target_mask"]["select"][8u][0u] = 0; + capa["optional"]["target_mask"]["select"][8u][1u] = "Nothing"; + capa["optional"]["target_mask"]["default"] = ""; + + capa["optional"]["exit_unmask"]["name"] = "Undo masks on process exit/fail"; + capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)"; + capa["optional"]["exit_unmask"]["default"] = false; capa["required"]["exec"]["name"] = "Executable"; capa["required"]["exec"]["help"] = "What to executable to run on the stream data"; @@ -179,71 +435,3 @@ int main(int argc, char *argv[]){ return 0; } - -namespace Mist{ - /// check source, sink, source_track, codec, bitrate, flags and process options. - bool ProcMKVExec::CheckConfig(){ - // Check generic configuration variables - if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){ - FAIL_MSG("invalid source in config!"); - return false; - } - - if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){ - INFO_MSG("No sink explicitly set, using source as sink"); - } - - return true; - } - - void ProcMKVExec::Run(){ - Util::Procs p; - int ffer = 2; - pid_t execd_proc = -1; - - - std::string streamName = opt["sink"].asString(); - if (!streamName.size()){streamName = opt["source"].asStringRef();} - Util::streamVariables(streamName, opt["source"].asStringRef()); - - //Do variable substitution on command - std::string tmpCmd = opt["exec"].asStringRef(); - Util::streamVariables(tmpCmd, streamName, opt["source"].asStringRef()); - - // exec command - char exec_cmd[10240]; - strncpy(exec_cmd, tmpCmd.c_str(), 10240); - INFO_MSG("Executing command: %s", exec_cmd); - uint8_t argCnt = 0; - char *startCh = 0; - char *args[1280]; - for (char *i = exec_cmd; i - exec_cmd < 10240; ++i){ - if (!*i){ - if (startCh){args[argCnt++] = startCh;} - break; - } - if (*i == ' '){ - if (startCh){ - args[argCnt++] = startCh; - startCh = 0; - *i = 0; - } - }else{ - if (!startCh){startCh = i;} - } - } - args[argCnt] = 0; - - execd_proc = p.StartPiped(args, &pipein[0], &pipeout[1], &ffer); - - while (conf.is_active && p.isRunning(execd_proc)){Util::sleep(200);} - - while (p.isRunning(execd_proc)){ - INFO_MSG("Stopping process..."); - p.StopAll(); - Util::sleep(200); - } - - INFO_MSG("Closing process clean"); - } -}// namespace Mist diff --git a/src/process/process_exec.h b/src/process/process_exec.h index 2d4dc3dd..eb839a84 100644 --- a/src/process/process_exec.h +++ b/src/process/process_exec.h @@ -19,77 +19,4 @@ namespace Mist{ void Run(); }; - class ProcessSink : public InputEBML{ - public: - ProcessSink(Util::Config *cfg) : InputEBML(cfg){ - capa["name"] = "MKVExec"; - }; - void getNext(size_t idx = INVALID_TRACK_ID){ - static bool recurse = false; - if (recurse){return InputEBML::getNext(idx);} - recurse = true; - InputEBML::getNext(idx); - recurse = false; - if (thisPacket){ - if (!getFirst){ - packetTimeDiff = sendPacketTime - thisPacket.getTime(); - getFirst = true; - } - uint64_t packTime = thisPacket.getTime() + packetTimeDiff; - // change packettime - char *data = thisPacket.getData(); - Bit::htobll(data + 12, packTime); - } - } - void setInFile(int stdin_val){ - inFile = fdopen(stdin_val, "r"); - streamName = opt["sink"].asString(); - if (!streamName.size()){streamName = opt["source"].asString();} - Util::streamVariables(streamName, opt["source"].asString()); - Util::setStreamName(opt["source"].asString() + "→" + streamName); - } - bool needsLock(){return false;} - bool isSingular(){return false;} - }; - - class ProcessSource : public OutEBML{ - public: - bool isRecording(){return false;} - ProcessSource(Socket::Connection &c) : OutEBML(c){ - capa["name"] = "MKVExec"; - realTime = 0; - }; - void sendHeader(){ - if (opt["masksource"].asBool()){ - for (std::map::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){ - if (ti->first == INVALID_TRACK_ID){continue;} - INFO_MSG("Masking source track %zu", ti->first); - meta.validateTrack(ti->first, meta.trackValid(ti->first) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH)); - } - } - realTime = 0; - OutEBML::sendHeader(); - }; - void sendNext(){ - extraKeepAway = 0; - needsLookAhead = 0; - maxSkipAhead = 0; - if (!sendFirst){ - sendPacketTime = thisPacket.getTime(); - sendFirst = true; - /* - uint64_t maxJitter = 1; - for (std::map::iterator ti = userSelect.begin(); ti != - userSelect.end(); ++ti){if (!M.trackValid(ti->first)){continue; - }// ignore missing tracks - if (M.getMinKeepAway(ti->first) > maxJitter){ - maxJitter = M.getMinKeepAway(ti->first); - } - } - DTSC::veryUglyJitterOverride = maxJitter; - */ - } - OutEBML::sendNext(); - } - }; }// namespace Mist diff --git a/src/process/process_ffmpeg.cpp b/src/process/process_ffmpeg.cpp index 1b6abb96..c17ad364 100644 --- a/src/process/process_ffmpeg.cpp +++ b/src/process/process_ffmpeg.cpp @@ -97,11 +97,47 @@ int main(int argc, char *argv[]){ capa["desc"] = "Use a local FFMPEG installed binary to do encoding"; // description capa["sort"] = "n"; // sort the parameters by this key - capa["optional"]["masksource"]["name"] = "Make source track unavailable for users"; - capa["optional"]["masksource"]["help"] = "If enabled, makes the source track internal-only, so that external users and pushes cannot access it."; - capa["optional"]["masksource"]["type"] = "boolean"; - capa["optional"]["masksource"]["default"] = false; + capa["optional"]["source_mask"]["name"] = "Source track mask"; + capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)"; + capa["optional"]["source_mask"]["type"] = "select"; + capa["optional"]["source_mask"]["select"][0u][0u] = ""; + capa["optional"]["source_mask"]["select"][0u][1u] = "Keep original value"; + capa["optional"]["source_mask"]["select"][1u][0u] = 255; + capa["optional"]["source_mask"]["select"][1u][1u] = "Everything"; + capa["optional"]["source_mask"]["select"][2u][0u] = 4; + capa["optional"]["source_mask"]["select"][2u][1u] = "Processing tasks (not viewers, not pushes)"; + capa["optional"]["source_mask"]["select"][3u][0u] = 6; + capa["optional"]["source_mask"]["select"][3u][1u] = "Processing and pushing tasks (not viewers)"; + capa["optional"]["source_mask"]["select"][4u][0u] = 5; + capa["optional"]["source_mask"]["select"][4u][1u] = "Processing and viewer tasks (not pushes)"; + capa["optional"]["source_mask"]["default"] = ""; + capa["optional"]["target_mask"]["name"] = "Output track mask"; + capa["optional"]["target_mask"]["help"] = "What internal processes should have access to the ouput track(s)"; + capa["optional"]["target_mask"]["type"] = "select"; + capa["optional"]["target_mask"]["select"][0u][0u] = ""; + capa["optional"]["target_mask"]["select"][0u][1u] = "Keep original value"; + capa["optional"]["target_mask"]["select"][1u][0u] = 255; + capa["optional"]["target_mask"]["select"][1u][1u] = "Everything"; + capa["optional"]["target_mask"]["select"][2u][0u] = 1; + capa["optional"]["target_mask"]["select"][2u][1u] = "Viewer tasks (not processing, not pushes)"; + capa["optional"]["target_mask"]["select"][3u][0u] = 2; + capa["optional"]["target_mask"]["select"][3u][1u] = "Pushing tasks (not processing, not viewers)"; + capa["optional"]["target_mask"]["select"][4u][0u] = 4; + capa["optional"]["target_mask"]["select"][4u][1u] = "Processing tasks (not pushes, not viewers)"; + capa["optional"]["target_mask"]["select"][5u][0u] = 3; + capa["optional"]["target_mask"]["select"][5u][1u] = "Viewer and pushing tasks (not processing)"; + capa["optional"]["target_mask"]["select"][6u][0u] = 5; + capa["optional"]["target_mask"]["select"][6u][1u] = "Viewer and processing tasks (not pushes)"; + capa["optional"]["target_mask"]["select"][7u][0u] = 6; + capa["optional"]["target_mask"]["select"][7u][1u] = "Pushing and processing tasks (not viewers)"; + capa["optional"]["target_mask"]["select"][8u][0u] = 0; + capa["optional"]["target_mask"]["select"][8u][1u] = "Nothing"; + capa["optional"]["target_mask"]["default"] = ""; + + capa["optional"]["exit_unmask"]["name"] = "Undo masks on process exit/fail"; + capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)"; + capa["optional"]["exit_unmask"]["default"] = false; capa["required"]["x-LSP-kind"]["name"] = "Input type"; // human readable name of option capa["required"]["x-LSP-kind"]["help"] = "The type of input to use"; // extra information @@ -354,6 +390,27 @@ int main(int argc, char *argv[]){ namespace Mist{ + + bool EncodeOutputEBML::onFinish(){ + if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){ + if (userSelect.size()){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + INFO_MSG("Unmasking source track %zu" PRIu64, it->first); + meta.validateTrack(it->first, TRACK_VALID_ALL); + } + } + } + return OutEBML::onFinish(); + } + void EncodeOutputEBML::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){ + if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){ + INFO_MSG("Unmasking source track %zu" PRIu64, trackId); + meta.validateTrack(trackId, TRACK_VALID_ALL); + } + OutEBML::dropTrack(trackId, reason, probablyBad); + } + + void EncodeInputEBML::getNext(size_t idx){ static bool recurse = false; @@ -390,6 +447,9 @@ namespace Mist{ if (!streamName.size()){streamName = opt["source"].asString();} Util::streamVariables(streamName, opt["source"].asString()); Util::setStreamName(opt["source"].asString() + "→" + streamName); + if (opt.isMember("target_mask") && !opt["target_mask"].isNull() && opt["target_mask"].asString() != ""){ + DTSC::trackValidDefault = opt["target_mask"].asInt(); + } } std::string EncodeOutputEBML::getTrackType(int tid){return M.getType(tid);} @@ -411,9 +471,10 @@ namespace Mist{ void EncodeOutputEBML::sendHeader(){ realTime = 0; size_t idx = getMainSelectedTrack(); - if (opt["masksource"].asBool()){ - INFO_MSG("Masking source track %zu", idx); - meta.validateTrack(idx, meta.trackValid(idx) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH)); + if (opt.isMember("source_mask") && !opt["source_mask"].isNull() && opt["source_mask"].asString() != ""){ + uint64_t sourceMask = opt["source_mask"].asInt(); + INFO_MSG("Masking source track %zu to %" PRIu64, idx, sourceMask); + meta.validateTrack(idx, sourceMask); } res_x = M.getWidth(idx); res_y = M.getHeight(idx); diff --git a/src/process/process_ffmpeg.h b/src/process/process_ffmpeg.h index eac7c4c8..bf1e893e 100644 --- a/src/process/process_ffmpeg.h +++ b/src/process/process_ffmpeg.h @@ -51,6 +51,8 @@ namespace Mist{ class EncodeOutputEBML : public OutEBML{ public: + virtual bool onFinish(); + virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true); EncodeOutputEBML(Socket::Connection &c) : OutEBML(c){}; // realTime = 0;}; bool isRecording(){return false;} void setVideoTrack(std::string tid); diff --git a/src/process/process_livepeer.cpp b/src/process/process_livepeer.cpp index 3220a884..ffde737b 100644 --- a/src/process/process_livepeer.cpp +++ b/src/process/process_livepeer.cpp @@ -1,5 +1,6 @@ #include //for std::find #include +#include #include "process_livepeer.h" #include #include @@ -11,10 +12,26 @@ #include //for stat tthread::mutex segMutex; +tthread::mutex broadcasterMutex; + +//Stat related stuff +JSON::Value pStat; +JSON::Value & pData = pStat["proc_status_update"]["status"]; +tthread::mutex statsMutex; +uint64_t statSwitches = 0; +uint64_t statFailN200 = 0; +uint64_t statFailTimeout = 0; +uint64_t statFailParse = 0; +uint64_t statFailOther = 0; +uint64_t statSinkMs = 0; +uint64_t statSourceMs = 0; Util::Config co; Util::Config conf; +size_t insertTurn = 0; +bool isStuck = false; + namespace Mist{ void pickRandomBroadcaster(){ @@ -39,29 +56,62 @@ namespace Mist{ //Source process, takes data from input stream and sends to livepeer class ProcessSource : public TSOutput{ public: - HTTP::Downloader upper; - uint64_t segTime; bool isRecording(){return false;} + bool isReadyForPlay(){ + if (!TSOutput::isReadyForPlay()){return false;} + size_t mTrk = getMainSelectedTrack(); + if (mTrk == INVALID_TRACK_ID || M.getType(mTrk) != "video"){ + HIGH_MSG("NOT READY (non-video main track)"); + return false; + } + return true; + } ProcessSource(Socket::Connection &c) : TSOutput(c){ capa["name"] = "Livepeer"; capa["codecs"][0u][0u].append("+H264"); capa["codecs"][0u][0u].append("+HEVC"); capa["codecs"][0u][0u].append("+MPEG2"); + capa["codecs"][0u][1u].append("+AAC"); realTime = 0; wantRequest = false; parseData = true; - upper.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef()); + currPreSeg = 0; }; - Util::ResizeablePointer tsPck; + virtual bool onFinish(){ + if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){ + if (userSelect.size()){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + INFO_MSG("Unmasking source track %zu" PRIu64, it->first); + meta.validateTrack(it->first, TRACK_VALID_ALL); + } + } + } + return TSOutput::onFinish(); + } + virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true){ + if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){ + INFO_MSG("Unmasking source track %zu" PRIu64, trackId); + meta.validateTrack(trackId, TRACK_VALID_ALL); + } + TSOutput::dropTrack(trackId, reason, probablyBad); + } + size_t currPreSeg; void sendTS(const char *tsData, size_t len = 188){ - tsPck.append(tsData, len); + if (!presegs[currPreSeg].data.size()){ + presegs[currPreSeg].time = thisPacket.getTime(); + } + presegs[currPreSeg].data.append(tsData, len); }; virtual void initialSeek(){ if (!meta){return;} - if (opt["masksource"].asBool()){ - size_t mainTrack = getMainSelectedTrack(); - INFO_MSG("Masking source track %zu", mainTrack); - meta.validateTrack(mainTrack, meta.trackValid(mainTrack) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH)); + if (opt.isMember("source_mask") && !opt["source_mask"].isNull() && opt["source_mask"].asString() != ""){ + uint64_t sourceMask = opt["source_mask"].asInt(); + if (userSelect.size()){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + INFO_MSG("Masking source track %zu to %" PRIu64, it->first, sourceMask); + meta.validateTrack(it->first, sourceMask); + } + } } if (!meta.getLive() || opt["leastlive"].asBool()){ INFO_MSG("Seeking to earliest point in stream"); @@ -70,148 +120,33 @@ namespace Mist{ } Output::initialSeek(); } - ///Inserts a part into the queue of parts to parse - void insertPart(const std::string & rendition, void * ptr, size_t len){ - while (conf.is_active){ - { - tthread::lock_guard guard(segMutex); - if (segs[rendition].fullyRead){ - HIGH_MSG("Inserting %zi bytes of %s", len, rendition.c_str()); - segs[rendition].set(segTime, ptr, len); - return; - } - } - INFO_MSG("Waiting for %s to finish parsing current part...", rendition.c_str()); - Util::sleep(500); - } - } - ///Parses a multipart response - void parseMultipart(){ - std::string cType = upper.getHeader("Content-Type"); - std::string bound; - if (cType.find("boundary=") != std::string::npos){ - bound = "--"+cType.substr(cType.find("boundary=")+9); - } - if (!bound.size()){ - FAIL_MSG("Could not parse boundary string from Content-Type header!"); - return; - } - const std::string & d = upper.const_data(); - size_t startPos = 0; - size_t nextPos = d.find(bound, startPos); - //While there is at least one boundary to be found - while (nextPos != std::string::npos){ - startPos = nextPos+bound.size()+2; - nextPos = d.find(bound, startPos); - if (nextPos != std::string::npos){ - //We have a start and end position, looking good so far... - size_t headEnd = d.find("\r\n\r\n", startPos); - if (headEnd == std::string::npos || headEnd > nextPos){ - FAIL_MSG("Could not find end of headers for multi-part part; skipping to next part"); - continue; - } - //Alright, we know where our headers and data are. Parse the headers - std::map partHeaders; - size_t headPtr = startPos; - size_t nextNL = d.find("\r\n", headPtr); - while (nextNL != std::string::npos && nextNL <= headEnd){ - size_t col = d.find(":", headPtr); - if (col != std::string::npos && col < nextNL){ - partHeaders[d.substr(headPtr, col-headPtr)] = d.substr(col+2, nextNL-col-2); - } - headPtr = nextNL+2; - nextNL = d.find("\r\n", headPtr); - } - for (std::map::iterator it = partHeaders.begin(); it != partHeaders.end(); ++it){ - VERYHIGH_MSG("Header %s = %s", it->first.c_str(), it->second.c_str()); - } - VERYHIGH_MSG("Body has length %zi", nextPos-headEnd-6); - std::string preType = partHeaders["Content-Type"].substr(0, 10); - Util::stringToLower(preType); - if (preType == "video/mp2t"){ - insertPart(partHeaders["Rendition-Name"], (void*)(d.data()+headEnd+4), nextPos-headEnd-6); - } - } - } - } void sendNext(){ - if (thisPacket.getFlag("keyframe") && (thisPacket.getTime() - segTime) >= 1000){ - if (Mist::queueClear){ - //Request to clear the queue! Do so, and wait for a new broadcaster to be picked. - { - tthread::lock_guard guard(segMutex); - segs.clear(); + { + tthread::lock_guard guard(statsMutex); + if (pData["source_tracks"].size() != userSelect.size()){ + pData["source_tracks"].null(); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + pData["source_tracks"].append(it->first); } - doingSetup = false; - //Sleep while we're still being asked to clear - while (queueClear && conf.is_active){ - Util::sleep(100); - } - if (!conf.is_active){return;} } - if (tsPck.size() > 187){ - size_t attempts = 0; - bool retry = false; - do{ - retry = false; - HTTP::URL target(currBroadAddr+"/live/"+lpID+"/"+JSON::Value(keyCount).asString()+".ts"); - upper.setHeader("Accept", "multipart/mixed"); - uint64_t segDuration = thisPacket.getTime() - segTime; - upper.setHeader("Content-Duration", JSON::Value(segDuration).asString()); - if (upper.post(target, tsPck, tsPck.size())){ - if (upper.getStatusCode() == 200){ - HIGH_MSG("Uploaded %zu bytes to %s", tsPck.size(), target.getUrl().c_str()); - if (upper.getHeader("Content-Type").substr(0, 10) == "multipart/"){ - parseMultipart(); - }else{ - FAIL_MSG("Non-multipart response received - this version only works with multipart!"); - } - }else{ - attempts++; - WARN_MSG("Failed to upload %zu bytes to %s: %" PRIu32 " %s", tsPck.size(), target.getUrl().c_str(), upper.getStatusCode(), upper.getStatusText().c_str()); - if ((attempts % 3) == 3){ - Util::sleep(250); - retry = true; - }else{ - if (attempts > 12){ - Util::logExitReason("too many upload failures"); - conf.is_active = false; - return; - } - if (!conf.is_active){return;} - FAIL_MSG("Failed to upload segment %s several times, picking new broadcaster", target.getUrl().c_str()); - pickRandomBroadcaster(); - if (!currBroadAddr.size()){ - Util::logExitReason("no Livepeer broadcasters available"); - conf.is_active = false; - return; - }else{ - WARN_MSG("Switched to broadcaster: %s", currBroadAddr.c_str()); - retry = true; - } - } - } - }else{ - if (!conf.is_active){return;} - FAIL_MSG("Failed to upload segment %s, picking new broadcaster", target.getUrl().c_str()); - pickRandomBroadcaster(); - if (!currBroadAddr.size()){ - Util::logExitReason("no Livepeer broadcasters available"); - conf.is_active = false; - return; - }else{ - WARN_MSG("Switched to broadcaster: %s", currBroadAddr.c_str()); - retry = true; - } - } - }while(retry); + } + if (thisTime > statSourceMs){statSourceMs = thisTime;} + if (thisPacket.getFlag("keyframe") && M.trackLoaded(thisIdx) && M.getType(thisIdx) == "video" && (thisTime - presegs[currPreSeg].time) >= 1000){ + if (presegs[currPreSeg].data.size() > 187){ + presegs[currPreSeg].keyNo = keyCount; + presegs[currPreSeg].width = M.getWidth(thisIdx); + presegs[currPreSeg].height = M.getHeight(thisIdx); + presegs[currPreSeg].segDuration = thisTime - presegs[currPreSeg].time; + presegs[currPreSeg].fullyRead = false; + presegs[currPreSeg].fullyWritten = true; + currPreSeg = (currPreSeg+1) % PRESEG_COUNT; } - tsPck.assign(0, 0); + while (!presegs[currPreSeg].fullyRead && conf.is_active){Util::sleep(100);} + presegs[currPreSeg].data.assign(0, 0); extraKeepAway = 0; needsLookAhead = 0; maxSkipAhead = 0; packCounter = 0; - segTime = thisPacket.getTime(); ++keyCount; sendFirst = true; } @@ -227,7 +162,15 @@ namespace Mist{ streamName = opt["sink"].asString(); if (!streamName.size()){streamName = opt["source"].asString();} Util::streamVariables(streamName, opt["source"].asString()); + { + tthread::lock_guard guard(statsMutex); + pStat["proc_status_update"]["sink"] = streamName; + pStat["proc_status_update"]["source"] = opt["source"]; + } Util::setStreamName(opt["source"].asString() + "→" + streamName); + if (opt.isMember("target_mask") && !opt["target_mask"].isNull() && opt["target_mask"].asString() != ""){ + DTSC::trackValidDefault = opt["target_mask"].asInt(); + } preRun(); }; virtual bool needsLock(){return false;} @@ -239,35 +182,53 @@ namespace Mist{ thisPacket.null(); int64_t timeOffset = 0; uint64_t trackId = 0; + { + tthread::lock_guard guard(statsMutex); + if (pData["sink_tracks"].size() != userSelect.size()){ + pData["sink_tracks"].null(); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + pData["sink_tracks"].append(it->first); + } + } + } while (!thisPacket && conf.is_active){ { tthread::lock_guard guard(segMutex); std::string oRend; - uint64_t lastPacket = segs.begin()->second.lastPacket; + uint64_t lastPacket = 0xFFFFFFFFFFFFFFFFull; for (segIt = segs.begin(); segIt != segs.end(); ++segIt){ - if (segIt->second.lastPacket > lastPacket){continue;} + if (isStuck){ + WARN_MSG("Considering %s: T%" PRIu64 ", fullyWritten: %s, fullyRead: %s", segIt->first.c_str(), segIt->second.lastPacket, segIt->second.fullyWritten?"Y":"N", segIt->second.fullyRead?"Y":"N"); + } if (!segIt->second.fullyWritten){continue;} - if (segIt->second.byteOffset >= segIt->second.data.size()){continue;} + if (segIt->second.lastPacket > lastPacket){continue;} oRend = segIt->first; lastPacket = segIt->second.lastPacket; } if (oRend.size()){ + if (isStuck){WARN_MSG("Picked %s!", oRend.c_str());} readySegment & S = segs[oRend]; while (!S.S.hasPacket() && S.byteOffset <= S.data.size() - 188){ S.S.parse(S.data + S.byteOffset, 0); S.byteOffset += 188; + if (S.byteOffset > S.data.size() - 188){S.S.finish();} } if (S.S.hasPacket()){ S.S.getEarliestPacket(thisPacket); if (!S.offsetCalcd){ S.timeOffset = S.time - thisPacket.getTime(); + HIGH_MSG("First timestamp of %s at time %" PRIu64 " is %" PRIu64 ", adjusting by %" PRId64, oRend.c_str(), S.time, thisPacket.getTime(), S.timeOffset); S.offsetCalcd = true; } timeOffset = S.timeOffset; + if (thisPacket){ + S.lastPacket = thisPacket.getTime() + timeOffset; + if (S.lastPacket >= statSinkMs){statSinkMs = S.lastPacket;} + } trackId = (S.ID << 16) + thisPacket.getTrackId(); size_t idx = M.trackIDToIndex(trackId, getpid()); if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - INFO_MSG("Initializing track %zi (index %zi) as %" PRIu64 " for playlist %" PRIu64, thisPacket.getTrackId(), idx, trackId, S.ID); + INFO_MSG("Initializing track %zi as %" PRIu64 " for playlist %" PRIu64, thisPacket.getTrackId(), trackId, S.ID); S.S.initializeMetadata(meta, thisPacket.getTrackId(), trackId); } } @@ -277,7 +238,13 @@ namespace Mist{ } } } - if (!thisPacket){Util::sleep(25);} + if (!thisPacket){ + Util::sleep(25); + if (userSelect.size() && userSelect.begin()->second.getStatus() == COMM_STATUS_REQDISCONNECT){ + Util::logExitReason("buffer requested shutdown"); + return; + } + } } if (thisPacket){ @@ -332,16 +299,19 @@ namespace Mist{ Util::logExitReason("No Livepeer broadcasters available"); return; } - pickRandomBroadcaster(); - if (!currBroadAddr.size()){ - Util::logExitReason("No Livepeer broadcasters available"); - return; + { + tthread::lock_guard guard(broadcasterMutex); + pickRandomBroadcaster(); + if (!currBroadAddr.size()){ + Util::logExitReason("No Livepeer broadcasters available"); + return; + } + INFO_MSG("Using broadcaster: %s", currBroadAddr.c_str()); } - INFO_MSG("Using broadcaster: %s", currBroadAddr.c_str()); //make transcode request JSON::Value pl; - pl["name"] = "Mist Transcode"; + pl["name"] = opt["source"]; pl["profiles"] = opt["target_profiles"]; dl.setHeader("Content-Type", "application/json"); dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef()); @@ -362,7 +332,36 @@ namespace Mist{ INFO_MSG("Livepeer transcode ID: %s", lpID.c_str()); doingSetup = false; - while (conf.is_active && co.is_active){Util::sleep(200);} + uint64_t lastProcUpdate = Util::bootSecs(); + { + tthread::lock_guard guard(statsMutex); + pStat["proc_status_update"]["id"] = getpid(); + pStat["proc_status_update"]["proc"] = "Livepeer"; + pData["ainfo"]["lp_id"] = lpID; + } + uint64_t startTime = Util::bootSecs(); + while (conf.is_active && co.is_active){ + Util::sleep(200); + if (lastProcUpdate + 5 <= Util::bootSecs()){ + tthread::lock_guard guard(statsMutex); + pData["active_seconds"] = (Util::bootSecs() - startTime); + pData["ainfo"]["switches"] = statSwitches; + pData["ainfo"]["fail_non200"] = statFailN200; + pData["ainfo"]["fail_timeout"] = statFailTimeout; + pData["ainfo"]["fail_parse"] = statFailParse; + pData["ainfo"]["fail_other"] = statFailOther; + pData["ainfo"]["sourceTime"] = statSourceMs; + pData["ainfo"]["sinkTime"] = statSinkMs; + { + tthread::lock_guard guard(broadcasterMutex); + pData["ainfo"]["bc"] = Mist::currBroadAddr; + } + Socket::UDPConnection uSock; + uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); + uSock.SendNow(pStat.toString()); + lastProcUpdate = Util::bootSecs(); + } + } INFO_MSG("Closing process clean"); } }// namespace Mist @@ -389,12 +388,19 @@ void sourceThread(void *){ opt["default"] = ""; opt["arg_num"] = 1; opt["help"] = "Target filename to store EBML file as, or - for stdout."; + //Check for audio selection, default to none + std::string audio_select = "none"; + if (Mist::opt.isMember("audio_select") && Mist::opt["audio_select"].isString() && Mist::opt["audio_select"]){ + audio_select = Mist::opt["audio_select"].asStringRef(); + } + //Check for source track selection, default to maxbps + std::string video_select = "maxbps"; + if (Mist::opt.isMember("source_track") && Mist::opt["source_track"].isString() && Mist::opt["source_track"]){ + video_select = Mist::opt["source_track"].asStringRef(); + } conf.addOption("target", opt); conf.getOption("streamname", true).append(Mist::opt["source"].c_str()); - conf.getOption("target", true).append("-?audio=none&video=maxbps"); - if (Mist::opt.isMember("source_track")){ - conf.getOption("target", true).append("-?audio=none&video=" + Mist::opt["source_track"].asString()); - } + conf.getOption("target", true).append("-?audio="+audio_select+"&video="+video_select); Mist::ProcessSource::init(&conf); conf.is_active = true; int devnull = open("/dev/null", O_RDWR); @@ -413,6 +419,156 @@ void sourceThread(void *){ close(devnull); } +///Inserts a part into the queue of parts to parse +void insertPart(const Mist::preparedSegment & mySeg, const std::string & rendition, void * ptr, size_t len){ + uint64_t waitTime = Util::bootMS(); + uint64_t lastAlert = waitTime; + while (conf.is_active){ + { + tthread::lock_guard guard(segMutex); + if (Mist::segs[rendition].fullyRead){ + HIGH_MSG("Inserting %zi bytes of %s, originally for time %" PRIu64, len, rendition.c_str(), mySeg.time); + Mist::segs[rendition].set(mySeg.time, ptr, len); + return; + } + } + uint64_t currMs = Util::bootMS(); + isStuck = false; + if (currMs-waitTime > 5000 && currMs-lastAlert > 1000){ + lastAlert = currMs; + INFO_MSG("Waiting for %s to finish parsing current part (%" PRIu64 "ms)...", rendition.c_str(), currMs-waitTime); + isStuck = true; + } + Util::sleep(100); + } +} + +///Parses a multipart response +void parseMultipart(const Mist::preparedSegment & mySeg, const std::string & cType, const std::string & d){ + std::string bound; + if (cType.find("boundary=") != std::string::npos){ + bound = "--"+cType.substr(cType.find("boundary=")+9); + } + if (!bound.size()){ + FAIL_MSG("Could not parse boundary string from Content-Type header!"); + return; + } + size_t startPos = 0; + size_t nextPos = d.find(bound, startPos); + //While there is at least one boundary to be found + while (nextPos != std::string::npos){ + startPos = nextPos+bound.size()+2; + nextPos = d.find(bound, startPos); + if (nextPos != std::string::npos){ + //We have a start and end position, looking good so far... + size_t headEnd = d.find("\r\n\r\n", startPos); + if (headEnd == std::string::npos || headEnd > nextPos){ + FAIL_MSG("Could not find end of headers for multi-part part; skipping to next part"); + continue; + } + //Alright, we know where our headers and data are. Parse the headers + std::map partHeaders; + size_t headPtr = startPos; + size_t nextNL = d.find("\r\n", headPtr); + while (nextNL != std::string::npos && nextNL <= headEnd){ + size_t col = d.find(":", headPtr); + if (col != std::string::npos && col < nextNL){ + partHeaders[d.substr(headPtr, col-headPtr)] = d.substr(col+2, nextNL-col-2); + } + headPtr = nextNL+2; + nextNL = d.find("\r\n", headPtr); + } + for (std::map::iterator it = partHeaders.begin(); it != partHeaders.end(); ++it){ + VERYHIGH_MSG("Header %s = %s", it->first.c_str(), it->second.c_str()); + } + VERYHIGH_MSG("Body has length %zi", nextPos-headEnd-6); + std::string preType = partHeaders["Content-Type"].substr(0, 10); + Util::stringToLower(preType); + if (preType == "video/mp2t"){ + insertPart(mySeg, partHeaders["Rendition-Name"], (void*)(d.data()+headEnd+4), nextPos-headEnd-6); + } + } + } +} + +void uploadThread(void * num){ + size_t myNum = (size_t)num; + Mist::preparedSegment & mySeg = Mist::presegs[myNum]; + HTTP::Downloader upper; + while (conf.is_active){ + while (conf.is_active && !mySeg.fullyWritten){Util::sleep(100);} + if (!conf.is_active){return;}//Exit early on shutdown + size_t attempts = 0; + do{ + HTTP::URL target; + { + tthread::lock_guard guard(broadcasterMutex); + target = HTTP::URL(Mist::currBroadAddr+"/live/"+Mist::lpID+"/"+JSON::Value(mySeg.keyNo).asString()+".ts"); + } + upper.dataTimeout = mySeg.segDuration/1000 + 2; + upper.retryCount = 2; + upper.setHeader("Accept", "multipart/mixed"); + upper.setHeader("Content-Duration", JSON::Value(mySeg.segDuration).asString()); + upper.setHeader("Content-Resolution", JSON::Value(mySeg.width).asString()+"x"+JSON::Value(mySeg.height).asString()); + uint64_t uplTime = Util::getMicros(); + if (upper.post(target, mySeg.data, mySeg.data.size())){ + uplTime = Util::getMicros(uplTime); + if (upper.getStatusCode() == 200){ + MEDIUM_MSG("Uploaded %zu bytes (time %" PRIu64 "-%" PRIu64 " = %" PRIu64 " ms) to %s in %.2f ms", mySeg.data.size(), mySeg.time, mySeg.time+mySeg.segDuration, mySeg.segDuration, target.getUrl().c_str(), uplTime/1000.0); + mySeg.fullyWritten = false; + mySeg.fullyRead = true; + //Wait your turn + while (myNum != insertTurn && conf.is_active){Util::sleep(100);} + if (!conf.is_active){return;}//Exit early on shutdown + if (upper.getHeader("Content-Type").substr(0, 10) == "multipart/"){ + parseMultipart(mySeg, upper.getHeader("Content-Type"), upper.const_data()); + }else{ + ++statFailParse; + FAIL_MSG("Non-multipart response received - this version only works with multipart!"); + } + insertTurn = (insertTurn + 1) % PRESEG_COUNT; + break;//Success: no need to retry + }else{ + //Failure due to non-200 status code + ++statFailN200; + WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str()); + } + }else{ + //other failures and aborted uploads + if (!conf.is_active){return;}//Exit early on shutdown + uplTime = Util::getMicros(uplTime); + ++statFailTimeout; + WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0); + } + //Error handling + attempts++; + Util::sleep(100);//Rate-limit retries + if (attempts > 4){ + Util::logExitReason("too many upload failures"); + conf.is_active = false; + return; + } + { + tthread::lock_guard guard(broadcasterMutex); + std::string prevBroadAddr = Mist::currBroadAddr; + Mist::pickRandomBroadcaster(); + if (!Mist::currBroadAddr.size()){ + FAIL_MSG("Cannot switch to new broadcaster: none available"); + Util::logExitReason("no Livepeer broadcasters available"); + conf.is_active = false; + return; + } + if (Mist::currBroadAddr != prevBroadAddr){ + ++statSwitches; + WARN_MSG("Switched to new broadcaster: %s", Mist::currBroadAddr.c_str()); + }else{ + WARN_MSG("Cannot switch broadcaster; only a single option is available"); + } + } + }while(conf.is_active); + } +} + int main(int argc, char *argv[]){ DTSC::trackValidMask = TRACK_VALID_INT_PROCESS; Util::Config config(argv[0]); @@ -441,10 +597,47 @@ int main(int argc, char *argv[]){ capa["name"] = "Livepeer"; capa["desc"] = "Use livepeer to transcode video."; - capa["optional"]["masksource"]["name"] = "Make source track unavailable for users"; - capa["optional"]["masksource"]["help"] = "If enabled, makes the source track internal-only, so that external users and pushes cannot access it."; - capa["optional"]["masksource"]["type"] = "boolean"; - capa["optional"]["masksource"]["default"] = false; + capa["optional"]["source_mask"]["name"] = "Source track mask"; + capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)"; + capa["optional"]["source_mask"]["type"] = "select"; + capa["optional"]["source_mask"]["select"][0u][0u] = ""; + capa["optional"]["source_mask"]["select"][0u][1u] = "Keep original value"; + capa["optional"]["source_mask"]["select"][1u][0u] = 255; + capa["optional"]["source_mask"]["select"][1u][1u] = "Everything"; + capa["optional"]["source_mask"]["select"][2u][0u] = 4; + capa["optional"]["source_mask"]["select"][2u][1u] = "Processing tasks (not viewers, not pushes)"; + capa["optional"]["source_mask"]["select"][3u][0u] = 6; + capa["optional"]["source_mask"]["select"][3u][1u] = "Processing and pushing tasks (not viewers)"; + capa["optional"]["source_mask"]["select"][4u][0u] = 5; + capa["optional"]["source_mask"]["select"][4u][1u] = "Processing and viewer tasks (not pushes)"; + capa["optional"]["source_mask"]["default"] = ""; + + capa["optional"]["target_mask"]["name"] = "Output track mask"; + capa["optional"]["target_mask"]["help"] = "What internal processes should have access to the ouput track(s)"; + capa["optional"]["target_mask"]["type"] = "select"; + capa["optional"]["target_mask"]["select"][0u][0u] = ""; + capa["optional"]["target_mask"]["select"][0u][1u] = "Keep original value"; + capa["optional"]["target_mask"]["select"][1u][0u] = 255; + capa["optional"]["target_mask"]["select"][1u][1u] = "Everything"; + capa["optional"]["target_mask"]["select"][2u][0u] = 1; + capa["optional"]["target_mask"]["select"][2u][1u] = "Viewer tasks (not processing, not pushes)"; + capa["optional"]["target_mask"]["select"][3u][0u] = 2; + capa["optional"]["target_mask"]["select"][3u][1u] = "Pushing tasks (not processing, not viewers)"; + capa["optional"]["target_mask"]["select"][4u][0u] = 4; + capa["optional"]["target_mask"]["select"][4u][1u] = "Processing tasks (not pushes, not viewers)"; + capa["optional"]["target_mask"]["select"][5u][0u] = 3; + capa["optional"]["target_mask"]["select"][5u][1u] = "Viewer and pushing tasks (not processing)"; + capa["optional"]["target_mask"]["select"][6u][0u] = 5; + capa["optional"]["target_mask"]["select"][6u][1u] = "Viewer and processing tasks (not pushes)"; + capa["optional"]["target_mask"]["select"][7u][0u] = 6; + capa["optional"]["target_mask"]["select"][7u][1u] = "Pushing and processing tasks (not viewers)"; + capa["optional"]["target_mask"]["select"][8u][0u] = 0; + capa["optional"]["target_mask"]["select"][8u][1u] = "Nothing"; + capa["optional"]["target_mask"]["default"] = ""; + + capa["optional"]["exit_unmask"]["name"] = "Undo masks on process exit/fail"; + capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)"; + capa["optional"]["exit_unmask"]["default"] = false; capa["optional"]["sink"]["name"] = "Target stream"; capa["optional"]["sink"]["help"] = "What stream the encoded track should be added to. Defaults " @@ -454,10 +647,16 @@ int main(int argc, char *argv[]){ capa["optional"]["source_track"]["name"] = "Input selection"; capa["optional"]["source_track"]["help"] = - "Track ID, codec or language of the source stream to encode."; + "Track selector(s) of the video portion of the source stream. Defaults to highest bit rate video track."; capa["optional"]["source_track"]["type"] = "track_selector_parameter"; capa["optional"]["source_track"]["n"] = 1; - capa["optional"]["source_track"]["default"] = "automatic"; + capa["optional"]["source_track"]["default"] = "maxbps"; + + capa["optional"]["audio_select"]["name"] = "Audio streams"; + capa["optional"]["audio_select"]["help"] = + "Track selector(s) for the audio portion of the source stream. Defaults to 'none' so no audio is passed at all."; + capa["optional"]["audio_select"]["type"] = "track_selector_parameter"; + capa["optional"]["audio_select"]["default"] = "none"; capa["required"]["access_token"]["name"] = "Access token"; capa["required"]["access_token"]["help"] = "Your livepeer access token"; @@ -479,30 +678,40 @@ int main(int argc, char *argv[]){ capa["required"]["target_profiles"]["type"] = "sublist"; capa["required"]["target_profiles"]["itemLabel"] = "profile"; capa["required"]["target_profiles"]["help"] = "Tracks to transcode the source into"; - JSON::Value &grp = capa["required"]["target_profiles"]["required"]; - grp["name"]["name"] = "Name"; - grp["name"]["help"] = "Name for the profle. Must be unique within this transcode."; - grp["name"]["type"] = "str"; - grp["fps"]["name"] = "Framerate"; - grp["fps"]["help"] = "Framerate of the output"; - grp["fps"]["unit"] = "frames per second"; - grp["fps"]["type"] = "int"; - grp["gop"]["name"] = "Keyframe interval / GOP size"; - grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. Empty string means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes."; - grp["gop"]["unit"] = "seconds"; - grp["gop"]["type"] = "str"; - grp["width"]["name"] = "Width"; - grp["width"]["help"] = "Width in pixels of the output"; - grp["width"]["unit"] = "px"; - grp["width"]["type"] = "int"; - grp["height"]["name"] = "Height"; - grp["height"]["help"] = "Height in pixels of the output"; - grp["height"]["unit"] = "px"; - grp["height"]["type"] = "int"; - grp["bitrate"]["name"] = "Bitrate"; - grp["bitrate"]["help"] = "Target bit rate of the output"; - grp["bitrate"]["unit"] = "bits per second"; - grp["bitrate"]["type"] = "int"; + { + JSON::Value &grp = capa["required"]["target_profiles"]["required"]; + grp["name"]["name"] = "Name"; + grp["name"]["help"] = "Name for the profile. Must be unique within this transcode."; + grp["name"]["type"] = "str"; + grp["name"]["n"] = 0; + grp["bitrate"]["name"] = "Bitrate"; + grp["bitrate"]["help"] = "Target bit rate of the output"; + grp["bitrate"]["unit"] = "bits per second"; + grp["bitrate"]["type"] = "int"; + grp["bitrate"]["n"] = 1; + grp["width"]["name"] = "Width"; + grp["width"]["help"] = "Width in pixels of the output"; + grp["width"]["unit"] = "px"; + grp["width"]["type"] = "int"; + grp["width"]["n"] = 2; + grp["height"]["name"] = "Height"; + grp["height"]["help"] = "Height in pixels of the output"; + grp["height"]["unit"] = "px"; + grp["height"]["type"] = "int"; + grp["height"]["n"] = 3; + }{ + JSON::Value &grp = capa["required"]["target_profiles"]["optional"]; + grp["fps"]["name"] = "Framerate"; + grp["fps"]["help"] = "Framerate of the output"; + grp["fps"]["unit"] = "frames per second"; + grp["fps"]["type"] = "int"; + grp["fps"]["n"] = 4; + grp["gop"]["name"] = "Keyframe interval / GOP size"; + grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. Empty string means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes."; + grp["gop"]["unit"] = "seconds"; + grp["gop"]["type"] = "str"; + grp["gop"]["n"] = 5; + } capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)"; capa["optional"]["track_inhibit"]["help"] = @@ -512,6 +721,21 @@ int main(int argc, char *argv[]){ capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector"; capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none"; + capa["optional"]["debug"]["name"] = "Debug level"; + capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed."; + capa["optional"]["debug"]["type"] = "debug"; + + capa["ainfo"]["lp_id"]["name"] = "Livepeer transcode ID"; + capa["ainfo"]["switches"]["name"] = "Broadcaster switches since start"; + capa["ainfo"]["fail_non200"]["name"] = "Failures due to non-200 response codes"; + capa["ainfo"]["fail_timeout"]["name"] = "Failures due to timeout"; + capa["ainfo"]["fail_parse"]["name"] = "Failures due to parse errors in TS response data"; + capa["ainfo"]["fail_other"]["name"] = "Failures due to other reasons"; + capa["ainfo"]["bc"]["name"] = "Currently used broadcaster"; + capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp"; + capa["ainfo"]["sourceTime"]["name"] = "Source timestamp"; + + std::cout << capa.toString() << std::endl; return -1; } @@ -535,12 +759,24 @@ int main(int argc, char *argv[]){ return 1; } + { + //Ensure stream name is set in all threads + std::string streamName = Mist::opt["sink"].asString(); + if (!streamName.size()){streamName = Mist::opt["source"].asString();} + Util::streamVariables(streamName, Mist::opt["source"].asString()); + Util::setStreamName(Mist::opt["source"].asString() + "→" + streamName); + } + // stream which connects to input tthread::thread source(sourceThread, 0); Util::sleep(500); // needs to pass through encoder to outputEBML tthread::thread sink(sinkThread, 0); + // uploads prepared segments + tthread::thread uploader0(uploadThread, (void*)0); + tthread::thread uploader1(uploadThread, (void*)1); + co.is_active = true; @@ -552,6 +788,8 @@ int main(int argc, char *argv[]){ sink.join(); source.join(); + uploader0.join(); + uploader1.join(); INFO_MSG("Livepeer transcode shutting down: %s", Util::exitReason); return 0; diff --git a/src/process/process_livepeer.h b/src/process/process_livepeer.h index 4dbe4af8..d12d1021 100644 --- a/src/process/process_livepeer.h +++ b/src/process/process_livepeer.h @@ -8,7 +8,6 @@ namespace Mist{ bool getFirst = false; bool sendFirst = false; bool doingSetup = true; - bool queueClear = false; uint64_t packetTimeDiff; uint64_t sendPacketTime; @@ -43,15 +42,34 @@ namespace Mist{ time = t; data.assign(ptr, len); fullyRead = false; - fullyWritten = true; offsetCalcd = false; byteOffset = 0; + fullyWritten = true; } }; - - std::map segs; +#define PRESEG_COUNT 2 + class preparedSegment{ + public: + uint64_t time; + uint64_t segDuration; + uint64_t keyNo; + uint64_t width; + uint64_t height; + bool fullyRead; + bool fullyWritten; + Util::ResizeablePointer data; + preparedSegment(){ + time = 0; + keyNo = 0; + segDuration = 0; + fullyRead = true; + fullyWritten = false; + }; + }; + preparedSegment presegs[PRESEG_COUNT]; + JSON::Value lpEnc; JSON::Value lpBroad; std::string currBroadAddr;