Livepeer process:
- Cleanup - Made everything except for bitrate and name optional, auto-detects and sanitizes and configures sane values in almost all cases. - Fix sorting of profile options - Fixed deadlock when video track does not yet have a valid width and/or height - Fixed race condition during process boot
This commit is contained in:
parent
896b15380f
commit
55c03fd886
2 changed files with 208 additions and 130 deletions
|
@ -26,6 +26,8 @@ uint64_t statFailOther = 0;
|
|||
uint64_t statSinkMs = 0;
|
||||
uint64_t statSourceMs = 0;
|
||||
|
||||
std::string api_url;
|
||||
|
||||
Util::Config co;
|
||||
Util::Config conf;
|
||||
|
||||
|
@ -264,106 +266,6 @@ namespace Mist{
|
|||
|
||||
|
||||
|
||||
/// check source, sink, source_track, codec, bitrate, flags and process options.
|
||||
bool ProcLivepeer::CheckConfig(){
|
||||
srand(getpid());
|
||||
// Check generic configuration variables
|
||||
if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){
|
||||
FAIL_MSG("invalid source in config!");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){
|
||||
INFO_MSG("No sink explicitly set, using source as sink");
|
||||
}
|
||||
if (!opt.isMember("custom_url") || !opt["custom_url"] || !opt["custom_url"].isString()){
|
||||
api_url = "https://livepeer.live/api";
|
||||
}else{
|
||||
api_url = opt["custom_url"].asStringRef();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ProcLivepeer::Run(){
|
||||
|
||||
HTTP::Downloader dl;
|
||||
dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef());
|
||||
//Get broadcaster list, pick first valid address
|
||||
if (!dl.get(HTTP::URL(api_url+"/broadcaster"))){
|
||||
Util::logExitReason("Livepeer API responded negatively to request for broadcaster list");
|
||||
return;
|
||||
}
|
||||
lpBroad = JSON::fromString(dl.data());
|
||||
if (!lpBroad || !lpBroad.isArray()){
|
||||
Util::logExitReason("No Livepeer broadcasters available");
|
||||
return;
|
||||
}
|
||||
{
|
||||
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
|
||||
pickRandomBroadcaster();
|
||||
if (!currBroadAddr.size()){
|
||||
Util::logExitReason("No Livepeer broadcasters available");
|
||||
return;
|
||||
}
|
||||
INFO_MSG("Using broadcaster: %s", currBroadAddr.c_str());
|
||||
}
|
||||
|
||||
//make transcode request
|
||||
JSON::Value pl;
|
||||
pl["name"] = opt["source"];
|
||||
pl["profiles"] = opt["target_profiles"];
|
||||
dl.setHeader("Content-Type", "application/json");
|
||||
dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef());
|
||||
if (!dl.post(HTTP::URL(api_url+"/stream"), pl.toString())){
|
||||
Util::logExitReason("Livepeer API responded negatively to encode request");
|
||||
return;
|
||||
}
|
||||
lpEnc = JSON::fromString(dl.data());
|
||||
if (!lpEnc){
|
||||
Util::logExitReason("Livepeer API did not respond with JSON");
|
||||
return;
|
||||
}
|
||||
if (!lpEnc.isMember("id")){
|
||||
Util::logExitReason("Livepeer API did not respond with a valid ID: %s", dl.data().data());
|
||||
return;
|
||||
}
|
||||
lpID = lpEnc["id"].asStringRef();
|
||||
|
||||
INFO_MSG("Livepeer transcode ID: %s", lpID.c_str());
|
||||
doingSetup = false;
|
||||
uint64_t lastProcUpdate = Util::bootSecs();
|
||||
{
|
||||
tthread::lock_guard<tthread::mutex> guard(statsMutex);
|
||||
pStat["proc_status_update"]["id"] = getpid();
|
||||
pStat["proc_status_update"]["proc"] = "Livepeer";
|
||||
pData["ainfo"]["lp_id"] = lpID;
|
||||
}
|
||||
uint64_t startTime = Util::bootSecs();
|
||||
while (conf.is_active && co.is_active){
|
||||
Util::sleep(200);
|
||||
if (lastProcUpdate + 5 <= Util::bootSecs()){
|
||||
tthread::lock_guard<tthread::mutex> guard(statsMutex);
|
||||
pData["active_seconds"] = (Util::bootSecs() - startTime);
|
||||
pData["ainfo"]["switches"] = statSwitches;
|
||||
pData["ainfo"]["fail_non200"] = statFailN200;
|
||||
pData["ainfo"]["fail_timeout"] = statFailTimeout;
|
||||
pData["ainfo"]["fail_parse"] = statFailParse;
|
||||
pData["ainfo"]["fail_other"] = statFailOther;
|
||||
pData["ainfo"]["sourceTime"] = statSourceMs;
|
||||
pData["ainfo"]["sinkTime"] = statSinkMs;
|
||||
{
|
||||
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
|
||||
pData["ainfo"]["bc"] = Mist::currBroadAddr;
|
||||
}
|
||||
Socket::UDPConnection uSock;
|
||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
||||
uSock.SendNow(pStat.toString());
|
||||
lastProcUpdate = Util::bootSecs();
|
||||
}
|
||||
}
|
||||
INFO_MSG("Closing process clean");
|
||||
}
|
||||
}// namespace Mist
|
||||
|
||||
|
||||
|
@ -406,7 +308,6 @@ void sourceThread(void *){
|
|||
int devnull = open("/dev/null", O_RDWR);
|
||||
Socket::Connection c(devnull, devnull);
|
||||
Mist::ProcessSource out(c);
|
||||
while (Mist::doingSetup && conf.is_active){Util::sleep(200);}
|
||||
if (conf.is_active){
|
||||
INFO_MSG("Running source thread...");
|
||||
out.run();
|
||||
|
@ -678,6 +579,7 @@ int main(int argc, char *argv[]){
|
|||
capa["required"]["target_profiles"]["type"] = "sublist";
|
||||
capa["required"]["target_profiles"]["itemLabel"] = "profile";
|
||||
capa["required"]["target_profiles"]["help"] = "Tracks to transcode the source into";
|
||||
capa["required"]["target_profiles"]["sort"] = "n";
|
||||
{
|
||||
JSON::Value &grp = capa["required"]["target_profiles"]["required"];
|
||||
grp["name"]["name"] = "Name";
|
||||
|
@ -689,28 +591,50 @@ int main(int argc, char *argv[]){
|
|||
grp["bitrate"]["unit"] = "bits per second";
|
||||
grp["bitrate"]["type"] = "int";
|
||||
grp["bitrate"]["n"] = 1;
|
||||
}{
|
||||
JSON::Value &grp = capa["required"]["target_profiles"]["optional"];
|
||||
grp["width"]["name"] = "Width";
|
||||
grp["width"]["help"] = "Width in pixels of the output";
|
||||
grp["width"]["help"] = "Width in pixels of the output. Defaults to match aspect with height, or source width if both are default.";
|
||||
grp["width"]["unit"] = "px";
|
||||
grp["width"]["type"] = "int";
|
||||
grp["width"]["n"] = 2;
|
||||
grp["height"]["name"] = "Height";
|
||||
grp["height"]["help"] = "Height in pixels of the output";
|
||||
grp["height"]["help"] = "Height in pixels of the output. Defaults to match aspect with width, or source height if both are default. If only height is given and the source height is greater than the source width, width and height will swap and do what you most likely wanted to do (e.g. follow your config in portrait mode instead of landscape mode).";
|
||||
grp["height"]["unit"] = "px";
|
||||
grp["height"]["type"] = "int";
|
||||
grp["height"]["n"] = 3;
|
||||
}{
|
||||
JSON::Value &grp = capa["required"]["target_profiles"]["optional"];
|
||||
grp["fps"]["name"] = "Framerate";
|
||||
grp["fps"]["help"] = "Framerate of the output";
|
||||
grp["fps"]["help"] = "Framerate of the output. Zero means to match the input (= the default).";
|
||||
grp["fps"]["unit"] = "frames per second";
|
||||
grp["fps"]["default"] = 0;
|
||||
grp["fps"]["type"] = "int";
|
||||
grp["fps"]["n"] = 4;
|
||||
grp["gop"]["name"] = "Keyframe interval / GOP size";
|
||||
grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. Empty string means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes.";
|
||||
grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. \"0.0\" means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes.";
|
||||
grp["gop"]["unit"] = "seconds";
|
||||
grp["gop"]["default"] = "0.0";
|
||||
grp["gop"]["type"] = "str";
|
||||
grp["gop"]["n"] = 5;
|
||||
|
||||
grp["profile"]["name"] = "H264 Profile";
|
||||
grp["profile"]["help"] = "Profile to use. Defaults to \"High\".";
|
||||
grp["profile"]["type"] = "select";
|
||||
grp["profile"]["select"][0u][0u] = "H264High";
|
||||
grp["profile"]["select"][0u][1u] = "High";
|
||||
grp["profile"]["select"][1u][0u] = "H264Baseline";
|
||||
grp["profile"]["select"][1u][1u] = "Baseline";
|
||||
grp["profile"]["select"][2u][0u] = "H264Main";
|
||||
grp["profile"]["select"][2u][1u] = "Main";
|
||||
grp["profile"]["select"][3u][0u] = "H264ConstrainedHigh";
|
||||
grp["profile"]["select"][3u][1u] = "High, without b-frames";
|
||||
grp["profile"]["default"] = "H264High";
|
||||
|
||||
grp["track_inhibit"]["name"] = "Track inhibitor(s)";
|
||||
grp["track_inhibit"]["help"] =
|
||||
"What tracks to use as inhibitors. If this track selector is able to select a track, the profile is not used. Only verified on initial boot of the process and then never again. Defaults to none.";
|
||||
grp["track_inhibit"]["type"] = "string";
|
||||
grp["track_inhibit"]["validate"][0u] = "track_selector";
|
||||
grp["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none";
|
||||
}
|
||||
|
||||
capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)";
|
||||
|
@ -753,12 +677,23 @@ int main(int argc, char *argv[]){
|
|||
}
|
||||
|
||||
// check config for generic options
|
||||
Mist::ProcLivepeer Enc;
|
||||
if (!Enc.CheckConfig()){
|
||||
FAIL_MSG("Error config syntax error!");
|
||||
srand(getpid());
|
||||
// Check generic configuration variables
|
||||
if (!Mist::opt.isMember("source") || !Mist::opt["source"] || !Mist::opt["source"].isString()){
|
||||
FAIL_MSG("Missing or blank source in config!");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!Mist::opt.isMember("sink") || !Mist::opt["sink"] || !Mist::opt["sink"].isString()){
|
||||
INFO_MSG("No sink explicitly set, using source as sink");
|
||||
}
|
||||
if (!Mist::opt.isMember("custom_url") || !Mist::opt["custom_url"] || !Mist::opt["custom_url"].isString()){
|
||||
api_url = "https://livepeer.live/api";
|
||||
}else{
|
||||
api_url = Mist::opt["custom_url"].asStringRef();
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
//Ensure stream name is set in all threads
|
||||
std::string streamName = Mist::opt["sink"].asString();
|
||||
|
@ -767,21 +702,173 @@ int main(int argc, char *argv[]){
|
|||
Util::setStreamName(Mist::opt["source"].asString() + "→" + streamName);
|
||||
}
|
||||
|
||||
// stream which connects to input
|
||||
tthread::thread source(sourceThread, 0);
|
||||
Util::sleep(500);
|
||||
//connect to source metadata
|
||||
DTSC::Meta M(Mist::opt["source"].asStringRef(), false);
|
||||
|
||||
// needs to pass through encoder to outputEBML
|
||||
//find source video track
|
||||
std::map<std::string, std::string> targetParams;
|
||||
targetParams["video"] = "maxbps";
|
||||
JSON::Value sourceCapa;
|
||||
sourceCapa["name"] = "Livepeer";
|
||||
sourceCapa["codecs"][0u][0u].append("+H264");
|
||||
sourceCapa["codecs"][0u][0u].append("+HEVC");
|
||||
sourceCapa["codecs"][0u][0u].append("+MPEG2");
|
||||
if (Mist::opt.isMember("source_track") && Mist::opt["source_track"].isString() && Mist::opt["source_track"]){
|
||||
targetParams["video"] = Mist::opt["source_track"].asStringRef();
|
||||
}
|
||||
std::set<size_t> vidTrack = Util::wouldSelect(M, targetParams, sourceCapa);
|
||||
size_t sourceIdx = *(vidTrack.begin());
|
||||
if (!M.getWidth(sourceIdx) || !M.getHeight(sourceIdx)){
|
||||
FAIL_MSG("Source track does not have a valid width and height");
|
||||
return 1;
|
||||
}
|
||||
|
||||
//build transcode request
|
||||
JSON::Value pl;
|
||||
pl["name"] = Mist::opt["source"];
|
||||
pl["profiles"] = Mist::opt["target_profiles"];
|
||||
jsonForEach(pl["profiles"], prof){
|
||||
if (!prof->isMember("gop")){(*prof)["gop"] = "0.0";}
|
||||
//no or automatic framerate? default to source rate, if set, or 25 otherwise
|
||||
if (!prof->isMember("fps") || (*prof)["fps"].asDouble() == 0.0){
|
||||
(*prof)["fps"] = M.getFpks(sourceIdx);
|
||||
if (!(*prof)["fps"].asInt()){(*prof)["fps"] = 25000;}
|
||||
(*prof)["fpsDen"] = 1000;
|
||||
}
|
||||
if (!prof->isMember("profile")){(*prof)["profile"] = "H264High";}
|
||||
if ((!prof->isMember("height") || !(*prof)["height"].asInt()) && (!prof->isMember("width") || !(*prof)["width"].asInt())){
|
||||
//no width and no height
|
||||
(*prof)["width"] = M.getWidth(sourceIdx);
|
||||
(*prof)["height"] = M.getHeight(sourceIdx);
|
||||
}
|
||||
if (!prof->isMember("width") || !(*prof)["width"].asInt()){
|
||||
//no width, but we have height
|
||||
//first, check if our source is in portrait mode, if so, we assume they meant width instead of height
|
||||
if (M.getWidth(sourceIdx) < M.getHeight(sourceIdx)){
|
||||
//portrait mode
|
||||
uint32_t heightSetting = (*prof)["height"].asInt();
|
||||
(*prof)["width"] = heightSetting;
|
||||
(*prof)["height"] = M.getHeight(sourceIdx) * heightSetting / M.getWidth(sourceIdx);
|
||||
}else{
|
||||
//landscape mode
|
||||
uint32_t heightSetting = (*prof)["height"].asInt();
|
||||
(*prof)["width"] = M.getWidth(sourceIdx) * heightSetting / M.getHeight(sourceIdx);
|
||||
}
|
||||
}
|
||||
if (!prof->isMember("height") || !(*prof)["height"].asInt()){
|
||||
//no height, but we have width
|
||||
//No portrait/landscape check, as per documentation
|
||||
uint32_t widthSetting = (*prof)["width"].asInt();
|
||||
(*prof)["height"] = M.getHeight(sourceIdx) * widthSetting / M.getWidth(sourceIdx);
|
||||
}
|
||||
//force width/height to multiples of 16
|
||||
(*prof)["width"] = ((*prof)["width"].asInt() / 16) * 16;
|
||||
(*prof)["height"] = ((*prof)["height"].asInt() / 16) * 16;
|
||||
|
||||
if (prof->isMember("track_inhibit")){
|
||||
std::set<size_t> wouldSelect = Util::wouldSelect(
|
||||
M, std::string("audio=none&video=none&subtitle=none&") + (*prof)["track_inhibit"].asStringRef());
|
||||
if (wouldSelect.size()){
|
||||
if (prof->isMember("name")){
|
||||
INFO_MSG("Removing profile because track inhibitor matches: %s", (*prof)["name"].asStringRef().c_str());
|
||||
}else{
|
||||
INFO_MSG("Removing profile because track inhibitor matches: %s", prof->toString().c_str());
|
||||
}
|
||||
prof.remove();
|
||||
continue;
|
||||
}else{
|
||||
prof->removeMember("track_inhibit");
|
||||
}
|
||||
}
|
||||
INFO_MSG("Profile parsed: %s", prof->toString().c_str());
|
||||
}
|
||||
|
||||
//Connect to livepeer API
|
||||
HTTP::Downloader dl;
|
||||
dl.setHeader("Authorization", "Bearer "+Mist::opt["access_token"].asStringRef());
|
||||
//Get broadcaster list, pick first valid address
|
||||
if (!dl.get(HTTP::URL(api_url+"/broadcaster"))){
|
||||
FAIL_MSG("Livepeer API responded negatively to request for broadcaster list");
|
||||
return 1;
|
||||
}
|
||||
Mist::lpBroad = JSON::fromString(dl.data());
|
||||
if (!Mist::lpBroad || !Mist::lpBroad.isArray()){
|
||||
FAIL_MSG("No Livepeer broadcasters available");
|
||||
return 1;
|
||||
}
|
||||
Mist::pickRandomBroadcaster();
|
||||
if (!Mist::currBroadAddr.size()){
|
||||
FAIL_MSG("No Livepeer broadcasters available");
|
||||
return 1;
|
||||
}
|
||||
INFO_MSG("Using broadcaster: %s", Mist::currBroadAddr.c_str());
|
||||
|
||||
//send transcode request
|
||||
dl.setHeader("Content-Type", "application/json");
|
||||
dl.setHeader("Authorization", "Bearer "+Mist::opt["access_token"].asStringRef());
|
||||
if (!dl.post(HTTP::URL(api_url+"/stream"), pl.toString())){
|
||||
FAIL_MSG("Livepeer API responded negatively to encode request");
|
||||
return 1;
|
||||
}
|
||||
Mist::lpEnc = JSON::fromString(dl.data());
|
||||
if (!Mist::lpEnc){
|
||||
FAIL_MSG("Livepeer API did not respond with JSON");
|
||||
return 1;
|
||||
}
|
||||
if (!Mist::lpEnc.isMember("id")){
|
||||
FAIL_MSG("Livepeer API did not respond with a valid ID: %s", dl.data().data());
|
||||
return 1;
|
||||
}
|
||||
Mist::lpID = Mist::lpEnc["id"].asStringRef();
|
||||
|
||||
INFO_MSG("Livepeer transcode ID: %s", Mist::lpID.c_str());
|
||||
uint64_t lastProcUpdate = Util::bootSecs();
|
||||
pStat["proc_status_update"]["id"] = getpid();
|
||||
pStat["proc_status_update"]["proc"] = "Livepeer";
|
||||
pData["ainfo"]["lp_id"] = Mist::lpID;
|
||||
uint64_t startTime = Util::bootSecs();
|
||||
|
||||
//Here be threads.
|
||||
|
||||
//Source thread, from Mist to LP.
|
||||
tthread::thread source(sourceThread, 0);
|
||||
while (!conf.is_active && Util::bootSecs() < lastProcUpdate + 5){Util::sleep(50);}
|
||||
if (!conf.is_active){WARN_MSG("Timeout waiting for source thread to boot!");}
|
||||
lastProcUpdate = Util::bootSecs();
|
||||
|
||||
//Sink thread, from LP to Mist
|
||||
tthread::thread sink(sinkThread, 0);
|
||||
// uploads prepared segments
|
||||
while (!co.is_active && Util::bootSecs() < lastProcUpdate + 5){Util::sleep(50);}
|
||||
if (!co.is_active){WARN_MSG("Timeout waiting for sink thread to boot!");}
|
||||
lastProcUpdate = Util::bootSecs();
|
||||
|
||||
// These threads upload prepared segments
|
||||
tthread::thread uploader0(uploadThread, (void*)0);
|
||||
tthread::thread uploader1(uploadThread, (void*)1);
|
||||
|
||||
|
||||
co.is_active = true;
|
||||
|
||||
// run process
|
||||
Enc.Run();
|
||||
while (conf.is_active && co.is_active){
|
||||
Util::sleep(200);
|
||||
if (lastProcUpdate + 5 <= Util::bootSecs()){
|
||||
tthread::lock_guard<tthread::mutex> guard(statsMutex);
|
||||
pData["active_seconds"] = (Util::bootSecs() - startTime);
|
||||
pData["ainfo"]["switches"] = statSwitches;
|
||||
pData["ainfo"]["fail_non200"] = statFailN200;
|
||||
pData["ainfo"]["fail_timeout"] = statFailTimeout;
|
||||
pData["ainfo"]["fail_parse"] = statFailParse;
|
||||
pData["ainfo"]["fail_other"] = statFailOther;
|
||||
pData["ainfo"]["sourceTime"] = statSourceMs;
|
||||
pData["ainfo"]["sinkTime"] = statSinkMs;
|
||||
{
|
||||
tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
|
||||
pData["ainfo"]["bc"] = Mist::currBroadAddr;
|
||||
}
|
||||
Socket::UDPConnection uSock;
|
||||
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
|
||||
uSock.SendNow(pStat.toString());
|
||||
lastProcUpdate = Util::bootSecs();
|
||||
}
|
||||
}
|
||||
INFO_MSG("Clean shutdown; joining threads");
|
||||
|
||||
co.is_active = false;
|
||||
conf.is_active = false;
|
||||
|
@ -791,7 +878,7 @@ int main(int argc, char *argv[]){
|
|||
uploader0.join();
|
||||
uploader1.join();
|
||||
|
||||
INFO_MSG("Livepeer transcode shutting down: %s", Util::exitReason);
|
||||
INFO_MSG("Shutdown reason: %s", Util::exitReason);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
namespace Mist{
|
||||
bool getFirst = false;
|
||||
bool sendFirst = false;
|
||||
bool doingSetup = true;
|
||||
|
||||
uint64_t packetTimeDiff;
|
||||
uint64_t sendPacketTime;
|
||||
|
@ -75,13 +74,5 @@ namespace Mist{
|
|||
std::string currBroadAddr;
|
||||
std::string lpID;
|
||||
|
||||
class ProcLivepeer{
|
||||
public:
|
||||
std::string api_url;
|
||||
ProcLivepeer(){};
|
||||
bool CheckConfig();
|
||||
void Run();
|
||||
};
|
||||
|
||||
}// namespace Mist
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue