Process system updates:

All processes:
- Added process status system and relevant API calls
- Added ability to set track masks for input/output in processes
- Added support for unmasking tracks when there is a push target, by the "unmask" parameter.
- Added track unmasking support for processes on exit/error
- Make processes start faster, if possible, in the first few seconds of a stream
- Delay stream ready state if there are processes attempting to start

Livepeer process updates:
- Added Content-Resolution header to MistProcLivepeer as per Livepeer's request
- Renamed transcode from "Mist Transcode" to source stream name
- Added ability to send audio to livepeer
- Robustified livepeer timing code, shutdown code, and improved GUI
- Prevent "audio keyframes" from starting segments in MistProcLivepeer
- Multithreaded (2 upload threads) livepeer process
- Stricter downloader/uploader timeout behaviour
- Robustness improvements
- Fix small segment size 😒
- Streamname correction
- Prevent getting stuck when transcoding multiple qualities and they are not equal length
- Corrected log message print error
- Race condition fix
- Now always waits for at least 1 video track
This commit is contained in:
Thulinma 2020-07-29 16:50:09 +02:00
parent f88a8fc51c
commit 209cd4c0fc
15 changed files with 891 additions and 352 deletions

View file

@ -24,6 +24,8 @@ namespace DTSC{
/// The mask that the current process will use to check if a track is valid /// The mask that the current process will use to check if a track is valid
uint8_t trackValidMask = TRACK_VALID_ALL; uint8_t trackValidMask = TRACK_VALID_ALL;
/// The mask that will be set by the current process for new tracks
uint8_t trackValidDefault = TRACK_VALID_ALL;
/// Default constructor for packets - sets a null pointer and invalid packet. /// Default constructor for packets - sets a null pointer and invalid packet.
Packet::Packet(){ Packet::Packet(){
@ -1701,7 +1703,7 @@ namespace DTSC{
trackList.setInt(trackPidField, getpid(), tNumber); trackList.setInt(trackPidField, getpid(), tNumber);
trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber); trackList.setInt(trackSourceTidField, INVALID_TRACK_ID, tNumber);
trackList.addRecords(1); trackList.addRecords(1);
if (setValid){validateTrack(tNumber);} if (setValid){validateTrack(tNumber, trackValidDefault);}
if (!isMemBuf){trackLock.post();} if (!isMemBuf){trackLock.post();}
return tNumber; return tNumber;
} }

View file

@ -38,6 +38,7 @@ namespace DTSC{
extern uint64_t veryUglyJitterOverride; extern uint64_t veryUglyJitterOverride;
extern uint8_t trackValidMask; extern uint8_t trackValidMask;
extern uint8_t trackValidDefault;
///\brief This enum holds all possible datatypes for DTSC packets. ///\brief This enum holds all possible datatypes for DTSC packets.
enum datatype{ enum datatype{

View file

@ -520,6 +520,12 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
setPushStatus(statUp["id"].asInt(), statUp["status"]); setPushStatus(statUp["id"].asInt(), statUp["status"]);
} }
} }
if (Request.isMember("proc_status_update")){
JSON::Value &statUp = Request["proc_status_update"];
if (statUp.isMember("id") && statUp.isMember("status") && statUp.isMember("source") && statUp.isMember("proc") && statUp.isMember("sink")){
setProcStatus(statUp["id"].asInt(), statUp["proc"].asStringRef(), statUp["source"].asStringRef(), statUp["sink"].asStringRef(), statUp["status"]);
}
}
/*LTS-END*/ /*LTS-END*/
if (Request.isMember("config_backup")){ if (Request.isMember("config_backup")){
@ -1076,6 +1082,10 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
} }
} }
if (Request.isMember("proc_list")){
getProcsForStream(Request["proc_list"].asStringRef(), Response["proc_list"]);
}
if (Request.isMember("push_list")){Controller::listPush(Response["push_list"]);} if (Request.isMember("push_list")){Controller::listPush(Response["push_list"]);}
if (Request.isMember("push_stop")){ if (Request.isMember("push_stop")){

View file

@ -55,6 +55,7 @@ namespace Controller{
Storage["log"].append(m); Storage["log"].append(m);
Storage["log"].shrink(100); // limit to 100 log messages Storage["log"].shrink(100); // limit to 100 log messages
if (isPushActive(progPid)){pushLogMessage(progPid, m);} //LTS if (isPushActive(progPid)){pushLogMessage(progPid, m);} //LTS
if (isProcActive(progPid)){procLogMessage(progPid, m);} //LTS
logCounter++; logCounter++;
if (rlxLogs && rlxLogs->isReady()){ if (rlxLogs && rlxLogs->isReady()){
if (!firstLog){firstLog = logCounter;} if (!firstLog){firstLog = logCounter;}

View file

@ -3,6 +3,7 @@
#include "controller_statistics.h" #include "controller_statistics.h"
#include "controller_storage.h" #include "controller_storage.h"
#include "controller_streams.h" #include "controller_streams.h"
#include <mist/timing.h>
#include <map> #include <map>
#include <mist/config.h> #include <mist/config.h>
#include <mist/defines.h> #include <mist/defines.h>
@ -18,6 +19,62 @@
namespace Controller{ namespace Controller{
std::map<std::string, pid_t> inputProcesses; std::map<std::string, pid_t> inputProcesses;
/// Internal list of currently active processes
class procInfo{
public:
JSON::Value stats;
std::string source;
std::string proc;
std::string sink;
uint64_t lastupdate;
JSON::Value logs;
};
std::map<pid_t, procInfo> activeProcs;
void procLogMessage(uint64_t id, const JSON::Value & msg){
JSON::Value &log = activeProcs[id].logs;
log.append(msg);
log.shrink(25);
}
bool isProcActive(uint64_t id){
return activeProcs.count(id);
}
void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList){
std::set<pid_t> wipeList;
for (std::map<pid_t, procInfo>::iterator it = activeProcs.begin(); it != activeProcs.end(); ++it){
if (!stream.size() || stream == it->second.sink || stream == it->second.source){
JSON::Value & thisProc = returnedProcList[JSON::Value(it->first).asString()];
thisProc = it->second.stats;
thisProc["source"] = it->second.source;
thisProc["sink"] = it->second.sink;
thisProc["process"] = it->second.proc;
thisProc["logs"] = it->second.logs;
if (!Util::Procs::isRunning(it->first)){
thisProc["terminated"] = true;
wipeList.insert(it->first);
}
}
}
while (wipeList.size()){
activeProcs.erase(*wipeList.begin());
wipeList.erase(wipeList.begin());
}
}
void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status){
procInfo & prc = activeProcs[id];
prc.lastupdate = Util::bootSecs();
prc.stats.extend(status);
if (!prc.proc.size() && sink.size() && source.size() && proc.size()){
prc.sink = sink;
prc.source = source;
prc.proc = proc;
}
}
///\brief Checks whether two streams are equal. ///\brief Checks whether two streams are equal.
///\param one The first stream for the comparison. ///\param one The first stream for the comparison.
///\param two The second stream for the comparison. ///\param two The second stream for the comparison.

View file

@ -1,6 +1,10 @@
#include <mist/json.h> #include <mist/json.h>
namespace Controller{ namespace Controller{
void setProcStatus(uint64_t id, const std::string & proc, const std::string & source, const std::string & sink, const JSON::Value & status);
void getProcsForStream(const std::string & stream, JSON::Value & returnedProcList);
void procLogMessage(uint64_t id, const JSON::Value & msg);
bool isProcActive(uint64_t id);
bool streamsEqual(JSON::Value &one, JSON::Value &two); bool streamsEqual(JSON::Value &one, JSON::Value &two);
void checkStream(std::string name, JSON::Value &data); void checkStream(std::string name, JSON::Value &data);
bool CheckAllStreams(JSON::Value &data); bool CheckAllStreams(JSON::Value &data);

View file

@ -27,6 +27,9 @@
namespace Mist{ namespace Mist{
inputBuffer::inputBuffer(Util::Config *cfg) : Input(cfg){ inputBuffer::inputBuffer(Util::Config *cfg) : Input(cfg){
firstProcTime = 0;
lastProcTime = 0;
allProcsRunning = false;
capa["optional"].removeMember("realtime"); capa["optional"].removeMember("realtime");
@ -506,6 +509,29 @@ namespace Mist{
/*LTS-START*/ /*LTS-START*/
// Reload the configuration to make sure we stay up to date with changes through the api // Reload the configuration to make sure we stay up to date with changes through the api
if (Util::epoch() - lastReTime > 4){preRun();} if (Util::epoch() - lastReTime > 4){preRun();}
size_t procInterval = 5000;
if (!firstProcTime || Util::bootMS() - firstProcTime < 30000){
if (!firstProcTime){firstProcTime = Util::bootMS();}
if (Util::bootMS() - firstProcTime < 10000){
procInterval = 200;
}else{
procInterval = 1000;
}
}
if (Util::bootMS() - lastProcTime > procInterval){
lastProcTime = Util::bootMS();
std::string strName = config->getString("streamname");
Util::sanitizeName(strName);
strName = strName.substr(0, (strName.find_first_of("+ ")));
char tmpBuf[NAME_BUFFER_SIZE];
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str());
Util::DTSCShmReader rStrmConf(tmpBuf);
DTSC::Scan streamCfg = rStrmConf.getScan();
if (streamCfg){
JSON::Value configuredProcesses = streamCfg.getMember("processes").asJSON();
checkProcesses(configuredProcesses);
}
}
/*LTS-END*/ /*LTS-END*/
connectedUsers = 0; connectedUsers = 0;
@ -537,7 +563,9 @@ namespace Mist{
} }
} }
void inputBuffer::userLeadOut(){ void inputBuffer::userLeadOut(){
if (config->is_active && streamStatus){streamStatus.mapped[0] = hasPush ? STRMSTAT_READY : STRMSTAT_WAIT;} if (config->is_active && streamStatus){
streamStatus.mapped[0] = (hasPush && allProcsRunning) ? STRMSTAT_READY : STRMSTAT_WAIT;
}
if (hasPush){everHadPush = true;} if (hasPush){everHadPush = true;}
if (!hasPush && everHadPush && !resumeMode && config->is_active){ if (!hasPush && everHadPush && !resumeMode && config->is_active){
Util::logExitReason("source disconnected for non-resumable stream"); Util::logExitReason("source disconnected for non-resumable stream");
@ -583,10 +611,6 @@ namespace Mist{
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str());
Util::DTSCShmReader rStrmConf(tmpBuf); Util::DTSCShmReader rStrmConf(tmpBuf);
DTSC::Scan streamCfg = rStrmConf.getScan(); DTSC::Scan streamCfg = rStrmConf.getScan();
if (streamCfg){
JSON::Value configuredProcesses = streamCfg.getMember("processes").asJSON();
checkProcesses(configuredProcesses);
}
//Check if bufferTime setting is correct //Check if bufferTime setting is correct
uint64_t tmpNum = retrieveSetting(streamCfg, "DVR", "bufferTime"); uint64_t tmpNum = retrieveSetting(streamCfg, "DVR", "bufferTime");
@ -682,6 +706,7 @@ namespace Mist{
/*LTS-START*/ /*LTS-START*/
/// Checks if all processes are running, starts them if needed, stops them if needed /// Checks if all processes are running, starts them if needed, stops them if needed
void inputBuffer::checkProcesses(const JSON::Value &procs){ void inputBuffer::checkProcesses(const JSON::Value &procs){
allProcsRunning = true;
if (!M.getValidTracks().size()){return;} if (!M.getValidTracks().size()){return;}
std::set<std::string> newProcs; std::set<std::string> newProcs;
@ -762,6 +787,7 @@ namespace Mist{
argarr[3] = (char*)debugLvl.c_str();; argarr[3] = (char*)debugLvl.c_str();;
argarr[4] = 0; argarr[4] = 0;
} }
allProcsRunning = false;
INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]); INFO_MSG("Starting process: %s %s", argarr[0], argarr[1]);
runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err); runningProcs[*newProcs.begin()] = Util::Procs::StartPiped(argarr, 0, 0, &err);
} }

View file

@ -16,9 +16,12 @@ namespace Mist{
uint64_t cutTime; uint64_t cutTime;
size_t segmentSize; /*LTS*/ size_t segmentSize; /*LTS*/
uint64_t lastReTime; /*LTS*/ uint64_t lastReTime; /*LTS*/
uint64_t lastProcTime; /*LTS*/
uint64_t firstProcTime; /*LTS*/
uint64_t finalMillis; uint64_t finalMillis;
bool hasPush;//Is a push currently being received? bool hasPush;//Is a push currently being received?
bool everHadPush;//Was there ever a push received? bool everHadPush;//Was there ever a push received?
bool allProcsRunning;
bool resumeMode; bool resumeMode;
uint64_t maxKeepAway; uint64_t maxKeepAway;
IPC::semaphore *liveMeta; IPC::semaphore *liveMeta;

View file

@ -112,6 +112,7 @@ namespace Mist{
} }
if (isRecording() && DTSC::trackValidMask == TRACK_VALID_EXT_HUMAN){ if (isRecording() && DTSC::trackValidMask == TRACK_VALID_EXT_HUMAN){
DTSC::trackValidMask = TRACK_VALID_EXT_PUSH; DTSC::trackValidMask = TRACK_VALID_EXT_PUSH;
if (targetParams.count("unmask")){DTSC::trackValidMask = TRACK_VALID_ALL;}
} }
/*LTS-END*/ /*LTS-END*/
} }
@ -1721,7 +1722,7 @@ namespace Mist{
std::string APIcall = std::string APIcall =
"{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}"; "{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}";
Socket::UDPConnection uSock; Socket::UDPConnection uSock;
uSock.SetDestination("localhost", 4242); uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
uSock.SendNow(APIcall); uSock.SendNow(APIcall);
newUA = false; newUA = false;
} }

View file

@ -14,6 +14,221 @@ int pipein[2], pipeout[2];
Util::Config co; Util::Config co;
Util::Config conf; Util::Config conf;
//Stat related stuff
JSON::Value pStat;
JSON::Value & pData = pStat["proc_status_update"]["status"];
tthread::mutex statsMutex;
uint64_t statSinkMs = 0;
uint64_t statSourceMs = 0;
namespace Mist{
class ProcessSink : public InputEBML{
public:
ProcessSink(Util::Config *cfg) : InputEBML(cfg){
capa["name"] = "MKVExec";
};
void getNext(size_t idx = INVALID_TRACK_ID){
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
if (pData["sink_tracks"].size() != userSelect.size()){
pData["sink_tracks"].null();
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
pData["sink_tracks"].append(it->first);
}
}
}
static bool recurse = false;
if (recurse){return InputEBML::getNext(idx);}
recurse = true;
InputEBML::getNext(idx);
recurse = false;
uint64_t pTime = thisPacket.getTime();
if (thisPacket){
if (!getFirst){
packetTimeDiff = sendPacketTime - pTime;
getFirst = true;
}
pTime += packetTimeDiff;
// change packettime
char *data = thisPacket.getData();
Bit::htobll(data + 12, pTime);
if (pTime >= statSinkMs){statSinkMs = pTime;}
}
}
void setInFile(int stdin_val){
inFile = fdopen(stdin_val, "r");
streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString());
Util::setStreamName(opt["source"].asString() + "" + streamName);
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
pStat["proc_status_update"]["sink"] = streamName;
pStat["proc_status_update"]["source"] = opt["source"];
}
}
bool needsLock(){return false;}
bool isSingular(){return false;}
};
class ProcessSource : public OutEBML{
public:
bool isRecording(){return false;}
ProcessSource(Socket::Connection &c) : OutEBML(c){
capa["name"] = "MKVExec";
targetParams["keeptimes"] = true;
realTime = 0;
};
virtual bool onFinish(){
if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){
if (userSelect.size()){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
INFO_MSG("Unmasking source track %zu" PRIu64, it->first);
meta.validateTrack(it->first, TRACK_VALID_ALL);
}
}
}
return OutEBML::onFinish();
}
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true){
if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){
INFO_MSG("Unmasking source track %zu" PRIu64, trackId);
meta.validateTrack(trackId, TRACK_VALID_ALL);
}
OutEBML::dropTrack(trackId, reason, probablyBad);
}
void sendHeader(){
if (opt["masksource"].asBool()){
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){
if (ti->first == INVALID_TRACK_ID){continue;}
INFO_MSG("Masking source track %zu", ti->first);
meta.validateTrack(ti->first, meta.trackValid(ti->first) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH));
}
}
realTime = 0;
OutEBML::sendHeader();
};
void sendNext(){
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
if (pData["source_tracks"].size() != userSelect.size()){
pData["source_tracks"].null();
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
pData["source_tracks"].append(it->first);
}
}
}
if (thisTime > statSourceMs){statSourceMs = thisTime;}
extraKeepAway = 0;
needsLookAhead = 0;
maxSkipAhead = 0;
if (!sendFirst){
sendPacketTime = thisPacket.getTime();
sendFirst = true;
/*
uint64_t maxJitter = 1;
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti !=
userSelect.end(); ++ti){if (!M.trackValid(ti->first)){continue;
}// ignore missing tracks
if (M.getMinKeepAway(ti->first) > maxJitter){
maxJitter = M.getMinKeepAway(ti->first);
}
}
DTSC::veryUglyJitterOverride = maxJitter;
*/
}
OutEBML::sendNext();
}
};
/// check source, sink, source_track, codec, bitrate, flags and process options.
bool ProcMKVExec::CheckConfig(){
// Check generic configuration variables
if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){
FAIL_MSG("invalid source in config!");
return false;
}
if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){
INFO_MSG("No sink explicitly set, using source as sink");
}
return true;
}
void ProcMKVExec::Run(){
int ffer = 2;
pid_t execd_proc = -1;
std::string streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asStringRef();}
Util::streamVariables(streamName, opt["source"].asStringRef());
//Do variable substitution on command
std::string tmpCmd = opt["exec"].asStringRef();
Util::streamVariables(tmpCmd, streamName, opt["source"].asStringRef());
// exec command
char exec_cmd[10240];
strncpy(exec_cmd, tmpCmd.c_str(), 10240);
INFO_MSG("Executing command: %s", exec_cmd);
uint8_t argCnt = 0;
char *startCh = 0;
char *args[1280];
for (char *i = exec_cmd; i - exec_cmd < 10240; ++i){
if (!*i){
if (startCh){args[argCnt++] = startCh;}
break;
}
if (*i == ' '){
if (startCh){
args[argCnt++] = startCh;
startCh = 0;
*i = 0;
}
}else{
if (!startCh){startCh = i;}
}
}
args[argCnt] = 0;
execd_proc = Util::Procs::StartPiped(args, &pipein[0], &pipeout[1], &ffer);
uint64_t lastProcUpdate = Util::bootSecs();
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
pStat["proc_status_update"]["id"] = getpid();
pStat["proc_status_update"]["proc"] = "MKVExec";
pData["ainfo"]["child_pid"] = execd_proc;
pData["ainfo"]["cmd"] = opt["exec"];
}
uint64_t startTime = Util::bootSecs();
while (conf.is_active && Util::Procs::isRunning(execd_proc)){
Util::sleep(200);
if (lastProcUpdate + 5 <= Util::bootSecs()){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
pData["active_seconds"] = (Util::bootSecs() - startTime);
pData["ainfo"]["sourceTime"] = statSourceMs;
pData["ainfo"]["sinkTime"] = statSinkMs;
Socket::UDPConnection uSock;
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
uSock.SendNow(pStat.toString());
lastProcUpdate = Util::bootSecs();
}
}
while (Util::Procs::isRunning(execd_proc)){
INFO_MSG("Stopping process...");
Util::Procs::StopAll();
Util::sleep(200);
}
INFO_MSG("Closing process clean");
}
}// namespace Mist
void sinkThread(void *){ void sinkThread(void *){
Mist::ProcessSink in(&co); Mist::ProcessSink in(&co);
co.getOption("output", true).append("-"); co.getOption("output", true).append("-");
@ -80,17 +295,58 @@ int main(int argc, char *argv[]){
capa["codecs"][0u][1u].append("DTS"); capa["codecs"][0u][1u].append("DTS");
capa["codecs"][0u][2u].append("+JSON"); capa["codecs"][0u][2u].append("+JSON");
capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp";
capa["ainfo"]["sourceTime"]["name"] = "Source timestamp";
capa["ainfo"]["child_pid"]["name"] = "Child process PID";
capa["ainfo"]["cmd"]["name"] = "Child process command";
if (!(config.parseArgs(argc, argv))){return 1;} if (!(config.parseArgs(argc, argv))){return 1;}
if (config.getBool("json")){ if (config.getBool("json")){
capa["name"] = "MKVExec"; capa["name"] = "MKVExec";
capa["desc"] = "Pipe MKV in, expect MKV out. You choose the executable in between yourself."; capa["desc"] = "Pipe MKV in, expect MKV out. You choose the executable in between yourself.";
capa["optional"]["masksource"]["name"] = "Make source track(s) unavailable for users"; capa["optional"]["source_mask"]["name"] = "Source track mask";
capa["optional"]["masksource"]["help"] = "If enabled, makes the source track(s) internal-only, so that external users and pushes cannot access them."; capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
capa["optional"]["masksource"]["type"] = "boolean"; capa["optional"]["source_mask"]["type"] = "select";
capa["optional"]["masksource"]["default"] = false; capa["optional"]["source_mask"]["select"][0u][0u] = "";
capa["optional"]["source_mask"]["select"][0u][1u] = "Keep original value";
capa["optional"]["source_mask"]["select"][1u][0u] = 255;
capa["optional"]["source_mask"]["select"][1u][1u] = "Everything";
capa["optional"]["source_mask"]["select"][2u][0u] = 4;
capa["optional"]["source_mask"]["select"][2u][1u] = "Processing tasks (not viewers, not pushes)";
capa["optional"]["source_mask"]["select"][3u][0u] = 6;
capa["optional"]["source_mask"]["select"][3u][1u] = "Processing and pushing tasks (not viewers)";
capa["optional"]["source_mask"]["select"][4u][0u] = 5;
capa["optional"]["source_mask"]["select"][4u][1u] = "Processing and viewer tasks (not pushes)";
capa["optional"]["source_mask"]["default"] = "";
capa["optional"]["target_mask"]["name"] = "Output track mask";
capa["optional"]["target_mask"]["help"] = "What internal processes should have access to the ouput track(s)";
capa["optional"]["target_mask"]["type"] = "select";
capa["optional"]["target_mask"]["select"][0u][0u] = "";
capa["optional"]["target_mask"]["select"][0u][1u] = "Keep original value";
capa["optional"]["target_mask"]["select"][1u][0u] = 255;
capa["optional"]["target_mask"]["select"][1u][1u] = "Everything";
capa["optional"]["target_mask"]["select"][2u][0u] = 1;
capa["optional"]["target_mask"]["select"][2u][1u] = "Viewer tasks (not processing, not pushes)";
capa["optional"]["target_mask"]["select"][3u][0u] = 2;
capa["optional"]["target_mask"]["select"][3u][1u] = "Pushing tasks (not processing, not viewers)";
capa["optional"]["target_mask"]["select"][4u][0u] = 4;
capa["optional"]["target_mask"]["select"][4u][1u] = "Processing tasks (not pushes, not viewers)";
capa["optional"]["target_mask"]["select"][5u][0u] = 3;
capa["optional"]["target_mask"]["select"][5u][1u] = "Viewer and pushing tasks (not processing)";
capa["optional"]["target_mask"]["select"][6u][0u] = 5;
capa["optional"]["target_mask"]["select"][6u][1u] = "Viewer and processing tasks (not pushes)";
capa["optional"]["target_mask"]["select"][7u][0u] = 6;
capa["optional"]["target_mask"]["select"][7u][1u] = "Pushing and processing tasks (not viewers)";
capa["optional"]["target_mask"]["select"][8u][0u] = 0;
capa["optional"]["target_mask"]["select"][8u][1u] = "Nothing";
capa["optional"]["target_mask"]["default"] = "";
capa["optional"]["exit_unmask"]["name"] = "Undo masks on process exit/fail";
capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)";
capa["optional"]["exit_unmask"]["default"] = false;
capa["required"]["exec"]["name"] = "Executable"; capa["required"]["exec"]["name"] = "Executable";
capa["required"]["exec"]["help"] = "What to executable to run on the stream data"; capa["required"]["exec"]["help"] = "What to executable to run on the stream data";
@ -179,71 +435,3 @@ int main(int argc, char *argv[]){
return 0; return 0;
} }
namespace Mist{
/// check source, sink, source_track, codec, bitrate, flags and process options.
bool ProcMKVExec::CheckConfig(){
// Check generic configuration variables
if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){
FAIL_MSG("invalid source in config!");
return false;
}
if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){
INFO_MSG("No sink explicitly set, using source as sink");
}
return true;
}
void ProcMKVExec::Run(){
Util::Procs p;
int ffer = 2;
pid_t execd_proc = -1;
std::string streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asStringRef();}
Util::streamVariables(streamName, opt["source"].asStringRef());
//Do variable substitution on command
std::string tmpCmd = opt["exec"].asStringRef();
Util::streamVariables(tmpCmd, streamName, opt["source"].asStringRef());
// exec command
char exec_cmd[10240];
strncpy(exec_cmd, tmpCmd.c_str(), 10240);
INFO_MSG("Executing command: %s", exec_cmd);
uint8_t argCnt = 0;
char *startCh = 0;
char *args[1280];
for (char *i = exec_cmd; i - exec_cmd < 10240; ++i){
if (!*i){
if (startCh){args[argCnt++] = startCh;}
break;
}
if (*i == ' '){
if (startCh){
args[argCnt++] = startCh;
startCh = 0;
*i = 0;
}
}else{
if (!startCh){startCh = i;}
}
}
args[argCnt] = 0;
execd_proc = p.StartPiped(args, &pipein[0], &pipeout[1], &ffer);
while (conf.is_active && p.isRunning(execd_proc)){Util::sleep(200);}
while (p.isRunning(execd_proc)){
INFO_MSG("Stopping process...");
p.StopAll();
Util::sleep(200);
}
INFO_MSG("Closing process clean");
}
}// namespace Mist

View file

@ -19,77 +19,4 @@ namespace Mist{
void Run(); void Run();
}; };
class ProcessSink : public InputEBML{
public:
ProcessSink(Util::Config *cfg) : InputEBML(cfg){
capa["name"] = "MKVExec";
};
void getNext(size_t idx = INVALID_TRACK_ID){
static bool recurse = false;
if (recurse){return InputEBML::getNext(idx);}
recurse = true;
InputEBML::getNext(idx);
recurse = false;
if (thisPacket){
if (!getFirst){
packetTimeDiff = sendPacketTime - thisPacket.getTime();
getFirst = true;
}
uint64_t packTime = thisPacket.getTime() + packetTimeDiff;
// change packettime
char *data = thisPacket.getData();
Bit::htobll(data + 12, packTime);
}
}
void setInFile(int stdin_val){
inFile = fdopen(stdin_val, "r");
streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString());
Util::setStreamName(opt["source"].asString() + "" + streamName);
}
bool needsLock(){return false;}
bool isSingular(){return false;}
};
class ProcessSource : public OutEBML{
public:
bool isRecording(){return false;}
ProcessSource(Socket::Connection &c) : OutEBML(c){
capa["name"] = "MKVExec";
realTime = 0;
};
void sendHeader(){
if (opt["masksource"].asBool()){
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti != userSelect.end(); ++ti){
if (ti->first == INVALID_TRACK_ID){continue;}
INFO_MSG("Masking source track %zu", ti->first);
meta.validateTrack(ti->first, meta.trackValid(ti->first) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH));
}
}
realTime = 0;
OutEBML::sendHeader();
};
void sendNext(){
extraKeepAway = 0;
needsLookAhead = 0;
maxSkipAhead = 0;
if (!sendFirst){
sendPacketTime = thisPacket.getTime();
sendFirst = true;
/*
uint64_t maxJitter = 1;
for (std::map<size_t, Comms::Users>::iterator ti = userSelect.begin(); ti !=
userSelect.end(); ++ti){if (!M.trackValid(ti->first)){continue;
}// ignore missing tracks
if (M.getMinKeepAway(ti->first) > maxJitter){
maxJitter = M.getMinKeepAway(ti->first);
}
}
DTSC::veryUglyJitterOverride = maxJitter;
*/
}
OutEBML::sendNext();
}
};
}// namespace Mist }// namespace Mist

View file

@ -97,11 +97,47 @@ int main(int argc, char *argv[]){
capa["desc"] = "Use a local FFMPEG installed binary to do encoding"; // description capa["desc"] = "Use a local FFMPEG installed binary to do encoding"; // description
capa["sort"] = "n"; // sort the parameters by this key capa["sort"] = "n"; // sort the parameters by this key
capa["optional"]["masksource"]["name"] = "Make source track unavailable for users"; capa["optional"]["source_mask"]["name"] = "Source track mask";
capa["optional"]["masksource"]["help"] = "If enabled, makes the source track internal-only, so that external users and pushes cannot access it."; capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
capa["optional"]["masksource"]["type"] = "boolean"; capa["optional"]["source_mask"]["type"] = "select";
capa["optional"]["masksource"]["default"] = false; capa["optional"]["source_mask"]["select"][0u][0u] = "";
capa["optional"]["source_mask"]["select"][0u][1u] = "Keep original value";
capa["optional"]["source_mask"]["select"][1u][0u] = 255;
capa["optional"]["source_mask"]["select"][1u][1u] = "Everything";
capa["optional"]["source_mask"]["select"][2u][0u] = 4;
capa["optional"]["source_mask"]["select"][2u][1u] = "Processing tasks (not viewers, not pushes)";
capa["optional"]["source_mask"]["select"][3u][0u] = 6;
capa["optional"]["source_mask"]["select"][3u][1u] = "Processing and pushing tasks (not viewers)";
capa["optional"]["source_mask"]["select"][4u][0u] = 5;
capa["optional"]["source_mask"]["select"][4u][1u] = "Processing and viewer tasks (not pushes)";
capa["optional"]["source_mask"]["default"] = "";
capa["optional"]["target_mask"]["name"] = "Output track mask";
capa["optional"]["target_mask"]["help"] = "What internal processes should have access to the ouput track(s)";
capa["optional"]["target_mask"]["type"] = "select";
capa["optional"]["target_mask"]["select"][0u][0u] = "";
capa["optional"]["target_mask"]["select"][0u][1u] = "Keep original value";
capa["optional"]["target_mask"]["select"][1u][0u] = 255;
capa["optional"]["target_mask"]["select"][1u][1u] = "Everything";
capa["optional"]["target_mask"]["select"][2u][0u] = 1;
capa["optional"]["target_mask"]["select"][2u][1u] = "Viewer tasks (not processing, not pushes)";
capa["optional"]["target_mask"]["select"][3u][0u] = 2;
capa["optional"]["target_mask"]["select"][3u][1u] = "Pushing tasks (not processing, not viewers)";
capa["optional"]["target_mask"]["select"][4u][0u] = 4;
capa["optional"]["target_mask"]["select"][4u][1u] = "Processing tasks (not pushes, not viewers)";
capa["optional"]["target_mask"]["select"][5u][0u] = 3;
capa["optional"]["target_mask"]["select"][5u][1u] = "Viewer and pushing tasks (not processing)";
capa["optional"]["target_mask"]["select"][6u][0u] = 5;
capa["optional"]["target_mask"]["select"][6u][1u] = "Viewer and processing tasks (not pushes)";
capa["optional"]["target_mask"]["select"][7u][0u] = 6;
capa["optional"]["target_mask"]["select"][7u][1u] = "Pushing and processing tasks (not viewers)";
capa["optional"]["target_mask"]["select"][8u][0u] = 0;
capa["optional"]["target_mask"]["select"][8u][1u] = "Nothing";
capa["optional"]["target_mask"]["default"] = "";
capa["optional"]["exit_unmask"]["name"] = "Undo masks on process exit/fail";
capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)";
capa["optional"]["exit_unmask"]["default"] = false;
capa["required"]["x-LSP-kind"]["name"] = "Input type"; // human readable name of option capa["required"]["x-LSP-kind"]["name"] = "Input type"; // human readable name of option
capa["required"]["x-LSP-kind"]["help"] = "The type of input to use"; // extra information capa["required"]["x-LSP-kind"]["help"] = "The type of input to use"; // extra information
@ -354,6 +390,27 @@ int main(int argc, char *argv[]){
namespace Mist{ namespace Mist{
bool EncodeOutputEBML::onFinish(){
if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){
if (userSelect.size()){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
INFO_MSG("Unmasking source track %zu" PRIu64, it->first);
meta.validateTrack(it->first, TRACK_VALID_ALL);
}
}
}
return OutEBML::onFinish();
}
void EncodeOutputEBML::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){
if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){
INFO_MSG("Unmasking source track %zu" PRIu64, trackId);
meta.validateTrack(trackId, TRACK_VALID_ALL);
}
OutEBML::dropTrack(trackId, reason, probablyBad);
}
void EncodeInputEBML::getNext(size_t idx){ void EncodeInputEBML::getNext(size_t idx){
static bool recurse = false; static bool recurse = false;
@ -390,6 +447,9 @@ namespace Mist{
if (!streamName.size()){streamName = opt["source"].asString();} if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString()); Util::streamVariables(streamName, opt["source"].asString());
Util::setStreamName(opt["source"].asString() + "" + streamName); Util::setStreamName(opt["source"].asString() + "" + streamName);
if (opt.isMember("target_mask") && !opt["target_mask"].isNull() && opt["target_mask"].asString() != ""){
DTSC::trackValidDefault = opt["target_mask"].asInt();
}
} }
std::string EncodeOutputEBML::getTrackType(int tid){return M.getType(tid);} std::string EncodeOutputEBML::getTrackType(int tid){return M.getType(tid);}
@ -411,9 +471,10 @@ namespace Mist{
void EncodeOutputEBML::sendHeader(){ void EncodeOutputEBML::sendHeader(){
realTime = 0; realTime = 0;
size_t idx = getMainSelectedTrack(); size_t idx = getMainSelectedTrack();
if (opt["masksource"].asBool()){ if (opt.isMember("source_mask") && !opt["source_mask"].isNull() && opt["source_mask"].asString() != ""){
INFO_MSG("Masking source track %zu", idx); uint64_t sourceMask = opt["source_mask"].asInt();
meta.validateTrack(idx, meta.trackValid(idx) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH)); INFO_MSG("Masking source track %zu to %" PRIu64, idx, sourceMask);
meta.validateTrack(idx, sourceMask);
} }
res_x = M.getWidth(idx); res_x = M.getWidth(idx);
res_y = M.getHeight(idx); res_y = M.getHeight(idx);

View file

@ -51,6 +51,8 @@ namespace Mist{
class EncodeOutputEBML : public OutEBML{ class EncodeOutputEBML : public OutEBML{
public: public:
virtual bool onFinish();
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true);
EncodeOutputEBML(Socket::Connection &c) : OutEBML(c){}; // realTime = 0;}; EncodeOutputEBML(Socket::Connection &c) : OutEBML(c){}; // realTime = 0;};
bool isRecording(){return false;} bool isRecording(){return false;}
void setVideoTrack(std::string tid); void setVideoTrack(std::string tid);

View file

@ -1,5 +1,6 @@
#include <algorithm> //for std::find #include <algorithm> //for std::find
#include <fstream> #include <fstream>
#include <mist/timing.h>
#include "process_livepeer.h" #include "process_livepeer.h"
#include <mist/procs.h> #include <mist/procs.h>
#include <mist/util.h> #include <mist/util.h>
@ -11,10 +12,26 @@
#include <unistd.h> //for stat #include <unistd.h> //for stat
tthread::mutex segMutex; tthread::mutex segMutex;
tthread::mutex broadcasterMutex;
//Stat related stuff
JSON::Value pStat;
JSON::Value & pData = pStat["proc_status_update"]["status"];
tthread::mutex statsMutex;
uint64_t statSwitches = 0;
uint64_t statFailN200 = 0;
uint64_t statFailTimeout = 0;
uint64_t statFailParse = 0;
uint64_t statFailOther = 0;
uint64_t statSinkMs = 0;
uint64_t statSourceMs = 0;
Util::Config co; Util::Config co;
Util::Config conf; Util::Config conf;
size_t insertTurn = 0;
bool isStuck = false;
namespace Mist{ namespace Mist{
void pickRandomBroadcaster(){ void pickRandomBroadcaster(){
@ -39,29 +56,62 @@ namespace Mist{
//Source process, takes data from input stream and sends to livepeer //Source process, takes data from input stream and sends to livepeer
class ProcessSource : public TSOutput{ class ProcessSource : public TSOutput{
public: public:
HTTP::Downloader upper;
uint64_t segTime;
bool isRecording(){return false;} bool isRecording(){return false;}
bool isReadyForPlay(){
if (!TSOutput::isReadyForPlay()){return false;}
size_t mTrk = getMainSelectedTrack();
if (mTrk == INVALID_TRACK_ID || M.getType(mTrk) != "video"){
HIGH_MSG("NOT READY (non-video main track)");
return false;
}
return true;
}
ProcessSource(Socket::Connection &c) : TSOutput(c){ ProcessSource(Socket::Connection &c) : TSOutput(c){
capa["name"] = "Livepeer"; capa["name"] = "Livepeer";
capa["codecs"][0u][0u].append("+H264"); capa["codecs"][0u][0u].append("+H264");
capa["codecs"][0u][0u].append("+HEVC"); capa["codecs"][0u][0u].append("+HEVC");
capa["codecs"][0u][0u].append("+MPEG2"); capa["codecs"][0u][0u].append("+MPEG2");
capa["codecs"][0u][1u].append("+AAC");
realTime = 0; realTime = 0;
wantRequest = false; wantRequest = false;
parseData = true; parseData = true;
upper.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef()); currPreSeg = 0;
}; };
Util::ResizeablePointer tsPck; virtual bool onFinish(){
if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){
if (userSelect.size()){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
INFO_MSG("Unmasking source track %zu" PRIu64, it->first);
meta.validateTrack(it->first, TRACK_VALID_ALL);
}
}
}
return TSOutput::onFinish();
}
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true){
if (opt.isMember("exit_unmask") && opt["exit_unmask"].asBool()){
INFO_MSG("Unmasking source track %zu" PRIu64, trackId);
meta.validateTrack(trackId, TRACK_VALID_ALL);
}
TSOutput::dropTrack(trackId, reason, probablyBad);
}
size_t currPreSeg;
void sendTS(const char *tsData, size_t len = 188){ void sendTS(const char *tsData, size_t len = 188){
tsPck.append(tsData, len); if (!presegs[currPreSeg].data.size()){
presegs[currPreSeg].time = thisPacket.getTime();
}
presegs[currPreSeg].data.append(tsData, len);
}; };
virtual void initialSeek(){ virtual void initialSeek(){
if (!meta){return;} if (!meta){return;}
if (opt["masksource"].asBool()){ if (opt.isMember("source_mask") && !opt["source_mask"].isNull() && opt["source_mask"].asString() != ""){
size_t mainTrack = getMainSelectedTrack(); uint64_t sourceMask = opt["source_mask"].asInt();
INFO_MSG("Masking source track %zu", mainTrack); if (userSelect.size()){
meta.validateTrack(mainTrack, meta.trackValid(mainTrack) & ~(TRACK_VALID_EXT_HUMAN | TRACK_VALID_EXT_PUSH)); for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
INFO_MSG("Masking source track %zu to %" PRIu64, it->first, sourceMask);
meta.validateTrack(it->first, sourceMask);
}
}
} }
if (!meta.getLive() || opt["leastlive"].asBool()){ if (!meta.getLive() || opt["leastlive"].asBool()){
INFO_MSG("Seeking to earliest point in stream"); INFO_MSG("Seeking to earliest point in stream");
@ -70,148 +120,33 @@ namespace Mist{
} }
Output::initialSeek(); Output::initialSeek();
} }
///Inserts a part into the queue of parts to parse
void insertPart(const std::string & rendition, void * ptr, size_t len){
while (conf.is_active){
{
tthread::lock_guard<tthread::mutex> guard(segMutex);
if (segs[rendition].fullyRead){
HIGH_MSG("Inserting %zi bytes of %s", len, rendition.c_str());
segs[rendition].set(segTime, ptr, len);
return;
}
}
INFO_MSG("Waiting for %s to finish parsing current part...", rendition.c_str());
Util::sleep(500);
}
}
///Parses a multipart response
void parseMultipart(){
std::string cType = upper.getHeader("Content-Type");
std::string bound;
if (cType.find("boundary=") != std::string::npos){
bound = "--"+cType.substr(cType.find("boundary=")+9);
}
if (!bound.size()){
FAIL_MSG("Could not parse boundary string from Content-Type header!");
return;
}
const std::string & d = upper.const_data();
size_t startPos = 0;
size_t nextPos = d.find(bound, startPos);
//While there is at least one boundary to be found
while (nextPos != std::string::npos){
startPos = nextPos+bound.size()+2;
nextPos = d.find(bound, startPos);
if (nextPos != std::string::npos){
//We have a start and end position, looking good so far...
size_t headEnd = d.find("\r\n\r\n", startPos);
if (headEnd == std::string::npos || headEnd > nextPos){
FAIL_MSG("Could not find end of headers for multi-part part; skipping to next part");
continue;
}
//Alright, we know where our headers and data are. Parse the headers
std::map<std::string, std::string> partHeaders;
size_t headPtr = startPos;
size_t nextNL = d.find("\r\n", headPtr);
while (nextNL != std::string::npos && nextNL <= headEnd){
size_t col = d.find(":", headPtr);
if (col != std::string::npos && col < nextNL){
partHeaders[d.substr(headPtr, col-headPtr)] = d.substr(col+2, nextNL-col-2);
}
headPtr = nextNL+2;
nextNL = d.find("\r\n", headPtr);
}
for (std::map<std::string, std::string>::iterator it = partHeaders.begin(); it != partHeaders.end(); ++it){
VERYHIGH_MSG("Header %s = %s", it->first.c_str(), it->second.c_str());
}
VERYHIGH_MSG("Body has length %zi", nextPos-headEnd-6);
std::string preType = partHeaders["Content-Type"].substr(0, 10);
Util::stringToLower(preType);
if (preType == "video/mp2t"){
insertPart(partHeaders["Rendition-Name"], (void*)(d.data()+headEnd+4), nextPos-headEnd-6);
}
}
}
}
void sendNext(){ void sendNext(){
if (thisPacket.getFlag("keyframe") && (thisPacket.getTime() - segTime) >= 1000){
if (Mist::queueClear){
//Request to clear the queue! Do so, and wait for a new broadcaster to be picked.
{ {
tthread::lock_guard<tthread::mutex> guard(segMutex); tthread::lock_guard<tthread::mutex> guard(statsMutex);
segs.clear(); if (pData["source_tracks"].size() != userSelect.size()){
} pData["source_tracks"].null();
doingSetup = false; for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
//Sleep while we're still being asked to clear pData["source_tracks"].append(it->first);
while (queueClear && conf.is_active){
Util::sleep(100);
}
if (!conf.is_active){return;}
}
if (tsPck.size() > 187){
size_t attempts = 0;
bool retry = false;
do{
retry = false;
HTTP::URL target(currBroadAddr+"/live/"+lpID+"/"+JSON::Value(keyCount).asString()+".ts");
upper.setHeader("Accept", "multipart/mixed");
uint64_t segDuration = thisPacket.getTime() - segTime;
upper.setHeader("Content-Duration", JSON::Value(segDuration).asString());
if (upper.post(target, tsPck, tsPck.size())){
if (upper.getStatusCode() == 200){
HIGH_MSG("Uploaded %zu bytes to %s", tsPck.size(), target.getUrl().c_str());
if (upper.getHeader("Content-Type").substr(0, 10) == "multipart/"){
parseMultipart();
}else{
FAIL_MSG("Non-multipart response received - this version only works with multipart!");
}
}else{
attempts++;
WARN_MSG("Failed to upload %zu bytes to %s: %" PRIu32 " %s", tsPck.size(), target.getUrl().c_str(), upper.getStatusCode(), upper.getStatusText().c_str());
if ((attempts % 3) == 3){
Util::sleep(250);
retry = true;
}else{
if (attempts > 12){
Util::logExitReason("too many upload failures");
conf.is_active = false;
return;
}
if (!conf.is_active){return;}
FAIL_MSG("Failed to upload segment %s several times, picking new broadcaster", target.getUrl().c_str());
pickRandomBroadcaster();
if (!currBroadAddr.size()){
Util::logExitReason("no Livepeer broadcasters available");
conf.is_active = false;
return;
}else{
WARN_MSG("Switched to broadcaster: %s", currBroadAddr.c_str());
retry = true;
} }
} }
} }
}else{ if (thisTime > statSourceMs){statSourceMs = thisTime;}
if (!conf.is_active){return;} if (thisPacket.getFlag("keyframe") && M.trackLoaded(thisIdx) && M.getType(thisIdx) == "video" && (thisTime - presegs[currPreSeg].time) >= 1000){
FAIL_MSG("Failed to upload segment %s, picking new broadcaster", target.getUrl().c_str()); if (presegs[currPreSeg].data.size() > 187){
pickRandomBroadcaster(); presegs[currPreSeg].keyNo = keyCount;
if (!currBroadAddr.size()){ presegs[currPreSeg].width = M.getWidth(thisIdx);
Util::logExitReason("no Livepeer broadcasters available"); presegs[currPreSeg].height = M.getHeight(thisIdx);
conf.is_active = false; presegs[currPreSeg].segDuration = thisTime - presegs[currPreSeg].time;
return; presegs[currPreSeg].fullyRead = false;
}else{ presegs[currPreSeg].fullyWritten = true;
WARN_MSG("Switched to broadcaster: %s", currBroadAddr.c_str()); currPreSeg = (currPreSeg+1) % PRESEG_COUNT;
retry = true;
} }
} while (!presegs[currPreSeg].fullyRead && conf.is_active){Util::sleep(100);}
}while(retry); presegs[currPreSeg].data.assign(0, 0);
}
tsPck.assign(0, 0);
extraKeepAway = 0; extraKeepAway = 0;
needsLookAhead = 0; needsLookAhead = 0;
maxSkipAhead = 0; maxSkipAhead = 0;
packCounter = 0; packCounter = 0;
segTime = thisPacket.getTime();
++keyCount; ++keyCount;
sendFirst = true; sendFirst = true;
} }
@ -227,7 +162,15 @@ namespace Mist{
streamName = opt["sink"].asString(); streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();} if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString()); Util::streamVariables(streamName, opt["source"].asString());
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
pStat["proc_status_update"]["sink"] = streamName;
pStat["proc_status_update"]["source"] = opt["source"];
}
Util::setStreamName(opt["source"].asString() + "" + streamName); Util::setStreamName(opt["source"].asString() + "" + streamName);
if (opt.isMember("target_mask") && !opt["target_mask"].isNull() && opt["target_mask"].asString() != ""){
DTSC::trackValidDefault = opt["target_mask"].asInt();
}
preRun(); preRun();
}; };
virtual bool needsLock(){return false;} virtual bool needsLock(){return false;}
@ -239,35 +182,53 @@ namespace Mist{
thisPacket.null(); thisPacket.null();
int64_t timeOffset = 0; int64_t timeOffset = 0;
uint64_t trackId = 0; uint64_t trackId = 0;
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
if (pData["sink_tracks"].size() != userSelect.size()){
pData["sink_tracks"].null();
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
pData["sink_tracks"].append(it->first);
}
}
}
while (!thisPacket && conf.is_active){ while (!thisPacket && conf.is_active){
{ {
tthread::lock_guard<tthread::mutex> guard(segMutex); tthread::lock_guard<tthread::mutex> guard(segMutex);
std::string oRend; std::string oRend;
uint64_t lastPacket = segs.begin()->second.lastPacket; uint64_t lastPacket = 0xFFFFFFFFFFFFFFFFull;
for (segIt = segs.begin(); segIt != segs.end(); ++segIt){ for (segIt = segs.begin(); segIt != segs.end(); ++segIt){
if (segIt->second.lastPacket > lastPacket){continue;} if (isStuck){
WARN_MSG("Considering %s: T%" PRIu64 ", fullyWritten: %s, fullyRead: %s", segIt->first.c_str(), segIt->second.lastPacket, segIt->second.fullyWritten?"Y":"N", segIt->second.fullyRead?"Y":"N");
}
if (!segIt->second.fullyWritten){continue;} if (!segIt->second.fullyWritten){continue;}
if (segIt->second.byteOffset >= segIt->second.data.size()){continue;} if (segIt->second.lastPacket > lastPacket){continue;}
oRend = segIt->first; oRend = segIt->first;
lastPacket = segIt->second.lastPacket; lastPacket = segIt->second.lastPacket;
} }
if (oRend.size()){ if (oRend.size()){
if (isStuck){WARN_MSG("Picked %s!", oRend.c_str());}
readySegment & S = segs[oRend]; readySegment & S = segs[oRend];
while (!S.S.hasPacket() && S.byteOffset <= S.data.size() - 188){ while (!S.S.hasPacket() && S.byteOffset <= S.data.size() - 188){
S.S.parse(S.data + S.byteOffset, 0); S.S.parse(S.data + S.byteOffset, 0);
S.byteOffset += 188; S.byteOffset += 188;
if (S.byteOffset > S.data.size() - 188){S.S.finish();}
} }
if (S.S.hasPacket()){ if (S.S.hasPacket()){
S.S.getEarliestPacket(thisPacket); S.S.getEarliestPacket(thisPacket);
if (!S.offsetCalcd){ if (!S.offsetCalcd){
S.timeOffset = S.time - thisPacket.getTime(); S.timeOffset = S.time - thisPacket.getTime();
HIGH_MSG("First timestamp of %s at time %" PRIu64 " is %" PRIu64 ", adjusting by %" PRId64, oRend.c_str(), S.time, thisPacket.getTime(), S.timeOffset);
S.offsetCalcd = true; S.offsetCalcd = true;
} }
timeOffset = S.timeOffset; timeOffset = S.timeOffset;
if (thisPacket){
S.lastPacket = thisPacket.getTime() + timeOffset;
if (S.lastPacket >= statSinkMs){statSinkMs = S.lastPacket;}
}
trackId = (S.ID << 16) + thisPacket.getTrackId(); trackId = (S.ID << 16) + thisPacket.getTrackId();
size_t idx = M.trackIDToIndex(trackId, getpid()); size_t idx = M.trackIDToIndex(trackId, getpid());
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
INFO_MSG("Initializing track %zi (index %zi) as %" PRIu64 " for playlist %" PRIu64, thisPacket.getTrackId(), idx, trackId, S.ID); INFO_MSG("Initializing track %zi as %" PRIu64 " for playlist %" PRIu64, thisPacket.getTrackId(), trackId, S.ID);
S.S.initializeMetadata(meta, thisPacket.getTrackId(), trackId); S.S.initializeMetadata(meta, thisPacket.getTrackId(), trackId);
} }
} }
@ -277,7 +238,13 @@ namespace Mist{
} }
} }
} }
if (!thisPacket){Util::sleep(25);} if (!thisPacket){
Util::sleep(25);
if (userSelect.size() && userSelect.begin()->second.getStatus() == COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown");
return;
}
}
} }
if (thisPacket){ if (thisPacket){
@ -332,16 +299,19 @@ namespace Mist{
Util::logExitReason("No Livepeer broadcasters available"); Util::logExitReason("No Livepeer broadcasters available");
return; return;
} }
{
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
pickRandomBroadcaster(); pickRandomBroadcaster();
if (!currBroadAddr.size()){ if (!currBroadAddr.size()){
Util::logExitReason("No Livepeer broadcasters available"); Util::logExitReason("No Livepeer broadcasters available");
return; return;
} }
INFO_MSG("Using broadcaster: %s", currBroadAddr.c_str()); INFO_MSG("Using broadcaster: %s", currBroadAddr.c_str());
}
//make transcode request //make transcode request
JSON::Value pl; JSON::Value pl;
pl["name"] = "Mist Transcode"; pl["name"] = opt["source"];
pl["profiles"] = opt["target_profiles"]; pl["profiles"] = opt["target_profiles"];
dl.setHeader("Content-Type", "application/json"); dl.setHeader("Content-Type", "application/json");
dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef()); dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef());
@ -362,7 +332,36 @@ namespace Mist{
INFO_MSG("Livepeer transcode ID: %s", lpID.c_str()); INFO_MSG("Livepeer transcode ID: %s", lpID.c_str());
doingSetup = false; doingSetup = false;
while (conf.is_active && co.is_active){Util::sleep(200);} uint64_t lastProcUpdate = Util::bootSecs();
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
pStat["proc_status_update"]["id"] = getpid();
pStat["proc_status_update"]["proc"] = "Livepeer";
pData["ainfo"]["lp_id"] = lpID;
}
uint64_t startTime = Util::bootSecs();
while (conf.is_active && co.is_active){
Util::sleep(200);
if (lastProcUpdate + 5 <= Util::bootSecs()){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
pData["active_seconds"] = (Util::bootSecs() - startTime);
pData["ainfo"]["switches"] = statSwitches;
pData["ainfo"]["fail_non200"] = statFailN200;
pData["ainfo"]["fail_timeout"] = statFailTimeout;
pData["ainfo"]["fail_parse"] = statFailParse;
pData["ainfo"]["fail_other"] = statFailOther;
pData["ainfo"]["sourceTime"] = statSourceMs;
pData["ainfo"]["sinkTime"] = statSinkMs;
{
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
pData["ainfo"]["bc"] = Mist::currBroadAddr;
}
Socket::UDPConnection uSock;
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
uSock.SendNow(pStat.toString());
lastProcUpdate = Util::bootSecs();
}
}
INFO_MSG("Closing process clean"); INFO_MSG("Closing process clean");
} }
}// namespace Mist }// namespace Mist
@ -389,12 +388,19 @@ void sourceThread(void *){
opt["default"] = ""; opt["default"] = "";
opt["arg_num"] = 1; opt["arg_num"] = 1;
opt["help"] = "Target filename to store EBML file as, or - for stdout."; opt["help"] = "Target filename to store EBML file as, or - for stdout.";
//Check for audio selection, default to none
std::string audio_select = "none";
if (Mist::opt.isMember("audio_select") && Mist::opt["audio_select"].isString() && Mist::opt["audio_select"]){
audio_select = Mist::opt["audio_select"].asStringRef();
}
//Check for source track selection, default to maxbps
std::string video_select = "maxbps";
if (Mist::opt.isMember("source_track") && Mist::opt["source_track"].isString() && Mist::opt["source_track"]){
video_select = Mist::opt["source_track"].asStringRef();
}
conf.addOption("target", opt); conf.addOption("target", opt);
conf.getOption("streamname", true).append(Mist::opt["source"].c_str()); conf.getOption("streamname", true).append(Mist::opt["source"].c_str());
conf.getOption("target", true).append("-?audio=none&video=maxbps"); conf.getOption("target", true).append("-?audio="+audio_select+"&video="+video_select);
if (Mist::opt.isMember("source_track")){
conf.getOption("target", true).append("-?audio=none&video=" + Mist::opt["source_track"].asString());
}
Mist::ProcessSource::init(&conf); Mist::ProcessSource::init(&conf);
conf.is_active = true; conf.is_active = true;
int devnull = open("/dev/null", O_RDWR); int devnull = open("/dev/null", O_RDWR);
@ -413,6 +419,156 @@ void sourceThread(void *){
close(devnull); close(devnull);
} }
///Inserts a part into the queue of parts to parse
void insertPart(const Mist::preparedSegment & mySeg, const std::string & rendition, void * ptr, size_t len){
uint64_t waitTime = Util::bootMS();
uint64_t lastAlert = waitTime;
while (conf.is_active){
{
tthread::lock_guard<tthread::mutex> guard(segMutex);
if (Mist::segs[rendition].fullyRead){
HIGH_MSG("Inserting %zi bytes of %s, originally for time %" PRIu64, len, rendition.c_str(), mySeg.time);
Mist::segs[rendition].set(mySeg.time, ptr, len);
return;
}
}
uint64_t currMs = Util::bootMS();
isStuck = false;
if (currMs-waitTime > 5000 && currMs-lastAlert > 1000){
lastAlert = currMs;
INFO_MSG("Waiting for %s to finish parsing current part (%" PRIu64 "ms)...", rendition.c_str(), currMs-waitTime);
isStuck = true;
}
Util::sleep(100);
}
}
///Parses a multipart response
void parseMultipart(const Mist::preparedSegment & mySeg, const std::string & cType, const std::string & d){
std::string bound;
if (cType.find("boundary=") != std::string::npos){
bound = "--"+cType.substr(cType.find("boundary=")+9);
}
if (!bound.size()){
FAIL_MSG("Could not parse boundary string from Content-Type header!");
return;
}
size_t startPos = 0;
size_t nextPos = d.find(bound, startPos);
//While there is at least one boundary to be found
while (nextPos != std::string::npos){
startPos = nextPos+bound.size()+2;
nextPos = d.find(bound, startPos);
if (nextPos != std::string::npos){
//We have a start and end position, looking good so far...
size_t headEnd = d.find("\r\n\r\n", startPos);
if (headEnd == std::string::npos || headEnd > nextPos){
FAIL_MSG("Could not find end of headers for multi-part part; skipping to next part");
continue;
}
//Alright, we know where our headers and data are. Parse the headers
std::map<std::string, std::string> partHeaders;
size_t headPtr = startPos;
size_t nextNL = d.find("\r\n", headPtr);
while (nextNL != std::string::npos && nextNL <= headEnd){
size_t col = d.find(":", headPtr);
if (col != std::string::npos && col < nextNL){
partHeaders[d.substr(headPtr, col-headPtr)] = d.substr(col+2, nextNL-col-2);
}
headPtr = nextNL+2;
nextNL = d.find("\r\n", headPtr);
}
for (std::map<std::string, std::string>::iterator it = partHeaders.begin(); it != partHeaders.end(); ++it){
VERYHIGH_MSG("Header %s = %s", it->first.c_str(), it->second.c_str());
}
VERYHIGH_MSG("Body has length %zi", nextPos-headEnd-6);
std::string preType = partHeaders["Content-Type"].substr(0, 10);
Util::stringToLower(preType);
if (preType == "video/mp2t"){
insertPart(mySeg, partHeaders["Rendition-Name"], (void*)(d.data()+headEnd+4), nextPos-headEnd-6);
}
}
}
}
void uploadThread(void * num){
size_t myNum = (size_t)num;
Mist::preparedSegment & mySeg = Mist::presegs[myNum];
HTTP::Downloader upper;
while (conf.is_active){
while (conf.is_active && !mySeg.fullyWritten){Util::sleep(100);}
if (!conf.is_active){return;}//Exit early on shutdown
size_t attempts = 0;
do{
HTTP::URL target;
{
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
target = HTTP::URL(Mist::currBroadAddr+"/live/"+Mist::lpID+"/"+JSON::Value(mySeg.keyNo).asString()+".ts");
}
upper.dataTimeout = mySeg.segDuration/1000 + 2;
upper.retryCount = 2;
upper.setHeader("Accept", "multipart/mixed");
upper.setHeader("Content-Duration", JSON::Value(mySeg.segDuration).asString());
upper.setHeader("Content-Resolution", JSON::Value(mySeg.width).asString()+"x"+JSON::Value(mySeg.height).asString());
uint64_t uplTime = Util::getMicros();
if (upper.post(target, mySeg.data, mySeg.data.size())){
uplTime = Util::getMicros(uplTime);
if (upper.getStatusCode() == 200){
MEDIUM_MSG("Uploaded %zu bytes (time %" PRIu64 "-%" PRIu64 " = %" PRIu64 " ms) to %s in %.2f ms", mySeg.data.size(), mySeg.time, mySeg.time+mySeg.segDuration, mySeg.segDuration, target.getUrl().c_str(), uplTime/1000.0);
mySeg.fullyWritten = false;
mySeg.fullyRead = true;
//Wait your turn
while (myNum != insertTurn && conf.is_active){Util::sleep(100);}
if (!conf.is_active){return;}//Exit early on shutdown
if (upper.getHeader("Content-Type").substr(0, 10) == "multipart/"){
parseMultipart(mySeg, upper.getHeader("Content-Type"), upper.const_data());
}else{
++statFailParse;
FAIL_MSG("Non-multipart response received - this version only works with multipart!");
}
insertTurn = (insertTurn + 1) % PRESEG_COUNT;
break;//Success: no need to retry
}else{
//Failure due to non-200 status code
++statFailN200;
WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str());
}
}else{
//other failures and aborted uploads
if (!conf.is_active){return;}//Exit early on shutdown
uplTime = Util::getMicros(uplTime);
++statFailTimeout;
WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0);
}
//Error handling
attempts++;
Util::sleep(100);//Rate-limit retries
if (attempts > 4){
Util::logExitReason("too many upload failures");
conf.is_active = false;
return;
}
{
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
std::string prevBroadAddr = Mist::currBroadAddr;
Mist::pickRandomBroadcaster();
if (!Mist::currBroadAddr.size()){
FAIL_MSG("Cannot switch to new broadcaster: none available");
Util::logExitReason("no Livepeer broadcasters available");
conf.is_active = false;
return;
}
if (Mist::currBroadAddr != prevBroadAddr){
++statSwitches;
WARN_MSG("Switched to new broadcaster: %s", Mist::currBroadAddr.c_str());
}else{
WARN_MSG("Cannot switch broadcaster; only a single option is available");
}
}
}while(conf.is_active);
}
}
int main(int argc, char *argv[]){ int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_INT_PROCESS; DTSC::trackValidMask = TRACK_VALID_INT_PROCESS;
Util::Config config(argv[0]); Util::Config config(argv[0]);
@ -441,10 +597,47 @@ int main(int argc, char *argv[]){
capa["name"] = "Livepeer"; capa["name"] = "Livepeer";
capa["desc"] = "Use livepeer to transcode video."; capa["desc"] = "Use livepeer to transcode video.";
capa["optional"]["masksource"]["name"] = "Make source track unavailable for users"; capa["optional"]["source_mask"]["name"] = "Source track mask";
capa["optional"]["masksource"]["help"] = "If enabled, makes the source track internal-only, so that external users and pushes cannot access it."; capa["optional"]["source_mask"]["help"] = "What internal processes should have access to the source track(s)";
capa["optional"]["masksource"]["type"] = "boolean"; capa["optional"]["source_mask"]["type"] = "select";
capa["optional"]["masksource"]["default"] = false; capa["optional"]["source_mask"]["select"][0u][0u] = "";
capa["optional"]["source_mask"]["select"][0u][1u] = "Keep original value";
capa["optional"]["source_mask"]["select"][1u][0u] = 255;
capa["optional"]["source_mask"]["select"][1u][1u] = "Everything";
capa["optional"]["source_mask"]["select"][2u][0u] = 4;
capa["optional"]["source_mask"]["select"][2u][1u] = "Processing tasks (not viewers, not pushes)";
capa["optional"]["source_mask"]["select"][3u][0u] = 6;
capa["optional"]["source_mask"]["select"][3u][1u] = "Processing and pushing tasks (not viewers)";
capa["optional"]["source_mask"]["select"][4u][0u] = 5;
capa["optional"]["source_mask"]["select"][4u][1u] = "Processing and viewer tasks (not pushes)";
capa["optional"]["source_mask"]["default"] = "";
capa["optional"]["target_mask"]["name"] = "Output track mask";
capa["optional"]["target_mask"]["help"] = "What internal processes should have access to the ouput track(s)";
capa["optional"]["target_mask"]["type"] = "select";
capa["optional"]["target_mask"]["select"][0u][0u] = "";
capa["optional"]["target_mask"]["select"][0u][1u] = "Keep original value";
capa["optional"]["target_mask"]["select"][1u][0u] = 255;
capa["optional"]["target_mask"]["select"][1u][1u] = "Everything";
capa["optional"]["target_mask"]["select"][2u][0u] = 1;
capa["optional"]["target_mask"]["select"][2u][1u] = "Viewer tasks (not processing, not pushes)";
capa["optional"]["target_mask"]["select"][3u][0u] = 2;
capa["optional"]["target_mask"]["select"][3u][1u] = "Pushing tasks (not processing, not viewers)";
capa["optional"]["target_mask"]["select"][4u][0u] = 4;
capa["optional"]["target_mask"]["select"][4u][1u] = "Processing tasks (not pushes, not viewers)";
capa["optional"]["target_mask"]["select"][5u][0u] = 3;
capa["optional"]["target_mask"]["select"][5u][1u] = "Viewer and pushing tasks (not processing)";
capa["optional"]["target_mask"]["select"][6u][0u] = 5;
capa["optional"]["target_mask"]["select"][6u][1u] = "Viewer and processing tasks (not pushes)";
capa["optional"]["target_mask"]["select"][7u][0u] = 6;
capa["optional"]["target_mask"]["select"][7u][1u] = "Pushing and processing tasks (not viewers)";
capa["optional"]["target_mask"]["select"][8u][0u] = 0;
capa["optional"]["target_mask"]["select"][8u][1u] = "Nothing";
capa["optional"]["target_mask"]["default"] = "";
capa["optional"]["exit_unmask"]["name"] = "Undo masks on process exit/fail";
capa["optional"]["exit_unmask"]["help"] = "If/when the process exits or fails, the masks for input tracks will be reset to defaults. (NOT to previous value, but to defaults!)";
capa["optional"]["exit_unmask"]["default"] = false;
capa["optional"]["sink"]["name"] = "Target stream"; capa["optional"]["sink"]["name"] = "Target stream";
capa["optional"]["sink"]["help"] = "What stream the encoded track should be added to. Defaults " capa["optional"]["sink"]["help"] = "What stream the encoded track should be added to. Defaults "
@ -454,10 +647,16 @@ int main(int argc, char *argv[]){
capa["optional"]["source_track"]["name"] = "Input selection"; capa["optional"]["source_track"]["name"] = "Input selection";
capa["optional"]["source_track"]["help"] = capa["optional"]["source_track"]["help"] =
"Track ID, codec or language of the source stream to encode."; "Track selector(s) of the video portion of the source stream. Defaults to highest bit rate video track.";
capa["optional"]["source_track"]["type"] = "track_selector_parameter"; capa["optional"]["source_track"]["type"] = "track_selector_parameter";
capa["optional"]["source_track"]["n"] = 1; capa["optional"]["source_track"]["n"] = 1;
capa["optional"]["source_track"]["default"] = "automatic"; capa["optional"]["source_track"]["default"] = "maxbps";
capa["optional"]["audio_select"]["name"] = "Audio streams";
capa["optional"]["audio_select"]["help"] =
"Track selector(s) for the audio portion of the source stream. Defaults to 'none' so no audio is passed at all.";
capa["optional"]["audio_select"]["type"] = "track_selector_parameter";
capa["optional"]["audio_select"]["default"] = "none";
capa["required"]["access_token"]["name"] = "Access token"; capa["required"]["access_token"]["name"] = "Access token";
capa["required"]["access_token"]["help"] = "Your livepeer access token"; capa["required"]["access_token"]["help"] = "Your livepeer access token";
@ -479,30 +678,40 @@ int main(int argc, char *argv[]){
capa["required"]["target_profiles"]["type"] = "sublist"; capa["required"]["target_profiles"]["type"] = "sublist";
capa["required"]["target_profiles"]["itemLabel"] = "profile"; capa["required"]["target_profiles"]["itemLabel"] = "profile";
capa["required"]["target_profiles"]["help"] = "Tracks to transcode the source into"; capa["required"]["target_profiles"]["help"] = "Tracks to transcode the source into";
{
JSON::Value &grp = capa["required"]["target_profiles"]["required"]; JSON::Value &grp = capa["required"]["target_profiles"]["required"];
grp["name"]["name"] = "Name"; grp["name"]["name"] = "Name";
grp["name"]["help"] = "Name for the profle. Must be unique within this transcode."; grp["name"]["help"] = "Name for the profile. Must be unique within this transcode.";
grp["name"]["type"] = "str"; grp["name"]["type"] = "str";
grp["fps"]["name"] = "Framerate"; grp["name"]["n"] = 0;
grp["fps"]["help"] = "Framerate of the output";
grp["fps"]["unit"] = "frames per second";
grp["fps"]["type"] = "int";
grp["gop"]["name"] = "Keyframe interval / GOP size";
grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. Empty string means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes.";
grp["gop"]["unit"] = "seconds";
grp["gop"]["type"] = "str";
grp["width"]["name"] = "Width";
grp["width"]["help"] = "Width in pixels of the output";
grp["width"]["unit"] = "px";
grp["width"]["type"] = "int";
grp["height"]["name"] = "Height";
grp["height"]["help"] = "Height in pixels of the output";
grp["height"]["unit"] = "px";
grp["height"]["type"] = "int";
grp["bitrate"]["name"] = "Bitrate"; grp["bitrate"]["name"] = "Bitrate";
grp["bitrate"]["help"] = "Target bit rate of the output"; grp["bitrate"]["help"] = "Target bit rate of the output";
grp["bitrate"]["unit"] = "bits per second"; grp["bitrate"]["unit"] = "bits per second";
grp["bitrate"]["type"] = "int"; grp["bitrate"]["type"] = "int";
grp["bitrate"]["n"] = 1;
grp["width"]["name"] = "Width";
grp["width"]["help"] = "Width in pixels of the output";
grp["width"]["unit"] = "px";
grp["width"]["type"] = "int";
grp["width"]["n"] = 2;
grp["height"]["name"] = "Height";
grp["height"]["help"] = "Height in pixels of the output";
grp["height"]["unit"] = "px";
grp["height"]["type"] = "int";
grp["height"]["n"] = 3;
}{
JSON::Value &grp = capa["required"]["target_profiles"]["optional"];
grp["fps"]["name"] = "Framerate";
grp["fps"]["help"] = "Framerate of the output";
grp["fps"]["unit"] = "frames per second";
grp["fps"]["type"] = "int";
grp["fps"]["n"] = 4;
grp["gop"]["name"] = "Keyframe interval / GOP size";
grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. Empty string means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes.";
grp["gop"]["unit"] = "seconds";
grp["gop"]["type"] = "str";
grp["gop"]["n"] = 5;
}
capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)"; capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)";
capa["optional"]["track_inhibit"]["help"] = capa["optional"]["track_inhibit"]["help"] =
@ -512,6 +721,21 @@ int main(int argc, char *argv[]){
capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector"; capa["optional"]["track_inhibit"]["validate"][0u] = "track_selector";
capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none"; capa["optional"]["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
capa["optional"]["debug"]["name"] = "Debug level";
capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
capa["optional"]["debug"]["type"] = "debug";
capa["ainfo"]["lp_id"]["name"] = "Livepeer transcode ID";
capa["ainfo"]["switches"]["name"] = "Broadcaster switches since start";
capa["ainfo"]["fail_non200"]["name"] = "Failures due to non-200 response codes";
capa["ainfo"]["fail_timeout"]["name"] = "Failures due to timeout";
capa["ainfo"]["fail_parse"]["name"] = "Failures due to parse errors in TS response data";
capa["ainfo"]["fail_other"]["name"] = "Failures due to other reasons";
capa["ainfo"]["bc"]["name"] = "Currently used broadcaster";
capa["ainfo"]["sinkTime"]["name"] = "Sink timestamp";
capa["ainfo"]["sourceTime"]["name"] = "Source timestamp";
std::cout << capa.toString() << std::endl; std::cout << capa.toString() << std::endl;
return -1; return -1;
} }
@ -535,12 +759,24 @@ int main(int argc, char *argv[]){
return 1; return 1;
} }
{
//Ensure stream name is set in all threads
std::string streamName = Mist::opt["sink"].asString();
if (!streamName.size()){streamName = Mist::opt["source"].asString();}
Util::streamVariables(streamName, Mist::opt["source"].asString());
Util::setStreamName(Mist::opt["source"].asString() + "" + streamName);
}
// stream which connects to input // stream which connects to input
tthread::thread source(sourceThread, 0); tthread::thread source(sourceThread, 0);
Util::sleep(500); Util::sleep(500);
// needs to pass through encoder to outputEBML // needs to pass through encoder to outputEBML
tthread::thread sink(sinkThread, 0); tthread::thread sink(sinkThread, 0);
// uploads prepared segments
tthread::thread uploader0(uploadThread, (void*)0);
tthread::thread uploader1(uploadThread, (void*)1);
co.is_active = true; co.is_active = true;
@ -552,6 +788,8 @@ int main(int argc, char *argv[]){
sink.join(); sink.join();
source.join(); source.join();
uploader0.join();
uploader1.join();
INFO_MSG("Livepeer transcode shutting down: %s", Util::exitReason); INFO_MSG("Livepeer transcode shutting down: %s", Util::exitReason);
return 0; return 0;

View file

@ -8,7 +8,6 @@ namespace Mist{
bool getFirst = false; bool getFirst = false;
bool sendFirst = false; bool sendFirst = false;
bool doingSetup = true; bool doingSetup = true;
bool queueClear = false;
uint64_t packetTimeDiff; uint64_t packetTimeDiff;
uint64_t sendPacketTime; uint64_t sendPacketTime;
@ -43,15 +42,34 @@ namespace Mist{
time = t; time = t;
data.assign(ptr, len); data.assign(ptr, len);
fullyRead = false; fullyRead = false;
fullyWritten = true;
offsetCalcd = false; offsetCalcd = false;
byteOffset = 0; byteOffset = 0;
fullyWritten = true;
} }
}; };
std::map<std::string, readySegment> segs; std::map<std::string, readySegment> segs;
#define PRESEG_COUNT 2
class preparedSegment{
public:
uint64_t time;
uint64_t segDuration;
uint64_t keyNo;
uint64_t width;
uint64_t height;
bool fullyRead;
bool fullyWritten;
Util::ResizeablePointer data;
preparedSegment(){
time = 0;
keyNo = 0;
segDuration = 0;
fullyRead = true;
fullyWritten = false;
};
};
preparedSegment presegs[PRESEG_COUNT];
JSON::Value lpEnc; JSON::Value lpEnc;
JSON::Value lpBroad; JSON::Value lpBroad;
std::string currBroadAddr; std::string currBroadAddr;