From 55c03fd886b738faf6e93c7d1afb55c0343aa40e Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 27 Jul 2021 03:10:51 +0200 Subject: [PATCH] Livepeer process: - Cleanup - Made everything except for bitrate and name optional, auto-detects and sanitizes and configures sane values in almost all cases. - Fix sorting of profile options - Fixed deadlock when video track does not yet have a valid width and/or height - Fixed race condition during process boot --- src/process/process_livepeer.cpp | 329 +++++++++++++++++++------------ src/process/process_livepeer.h | 9 - 2 files changed, 208 insertions(+), 130 deletions(-) diff --git a/src/process/process_livepeer.cpp b/src/process/process_livepeer.cpp index ffde737b..87121a04 100644 --- a/src/process/process_livepeer.cpp +++ b/src/process/process_livepeer.cpp @@ -26,6 +26,8 @@ uint64_t statFailOther = 0; uint64_t statSinkMs = 0; uint64_t statSourceMs = 0; +std::string api_url; + Util::Config co; Util::Config conf; @@ -264,106 +266,6 @@ namespace Mist{ - /// check source, sink, source_track, codec, bitrate, flags and process options. - bool ProcLivepeer::CheckConfig(){ - srand(getpid()); - // Check generic configuration variables - if (!opt.isMember("source") || !opt["source"] || !opt["source"].isString()){ - FAIL_MSG("invalid source in config!"); - return false; - } - - if (!opt.isMember("sink") || !opt["sink"] || !opt["sink"].isString()){ - INFO_MSG("No sink explicitly set, using source as sink"); - } - if (!opt.isMember("custom_url") || !opt["custom_url"] || !opt["custom_url"].isString()){ - api_url = "https://livepeer.live/api"; - }else{ - api_url = opt["custom_url"].asStringRef(); - } - - return true; - } - - void ProcLivepeer::Run(){ - - HTTP::Downloader dl; - dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef()); - //Get broadcaster list, pick first valid address - if (!dl.get(HTTP::URL(api_url+"/broadcaster"))){ - Util::logExitReason("Livepeer API responded negatively to request for broadcaster list"); - return; - } - lpBroad = JSON::fromString(dl.data()); - if (!lpBroad || !lpBroad.isArray()){ - Util::logExitReason("No Livepeer broadcasters available"); - return; - } - { - tthread::lock_guard guard(broadcasterMutex); - pickRandomBroadcaster(); - if (!currBroadAddr.size()){ - Util::logExitReason("No Livepeer broadcasters available"); - return; - } - INFO_MSG("Using broadcaster: %s", currBroadAddr.c_str()); - } - - //make transcode request - JSON::Value pl; - pl["name"] = opt["source"]; - pl["profiles"] = opt["target_profiles"]; - dl.setHeader("Content-Type", "application/json"); - dl.setHeader("Authorization", "Bearer "+opt["access_token"].asStringRef()); - if (!dl.post(HTTP::URL(api_url+"/stream"), pl.toString())){ - Util::logExitReason("Livepeer API responded negatively to encode request"); - return; - } - lpEnc = JSON::fromString(dl.data()); - if (!lpEnc){ - Util::logExitReason("Livepeer API did not respond with JSON"); - return; - } - if (!lpEnc.isMember("id")){ - Util::logExitReason("Livepeer API did not respond with a valid ID: %s", dl.data().data()); - return; - } - lpID = lpEnc["id"].asStringRef(); - - INFO_MSG("Livepeer transcode ID: %s", lpID.c_str()); - doingSetup = false; - uint64_t lastProcUpdate = Util::bootSecs(); - { - tthread::lock_guard guard(statsMutex); - pStat["proc_status_update"]["id"] = getpid(); - pStat["proc_status_update"]["proc"] = "Livepeer"; - pData["ainfo"]["lp_id"] = lpID; - } - uint64_t startTime = Util::bootSecs(); - while (conf.is_active && co.is_active){ - Util::sleep(200); - if (lastProcUpdate + 5 <= Util::bootSecs()){ - tthread::lock_guard guard(statsMutex); - pData["active_seconds"] = (Util::bootSecs() - startTime); - pData["ainfo"]["switches"] = statSwitches; - pData["ainfo"]["fail_non200"] = statFailN200; - pData["ainfo"]["fail_timeout"] = statFailTimeout; - pData["ainfo"]["fail_parse"] = statFailParse; - pData["ainfo"]["fail_other"] = statFailOther; - pData["ainfo"]["sourceTime"] = statSourceMs; - pData["ainfo"]["sinkTime"] = statSinkMs; - { - tthread::lock_guard guard(broadcasterMutex); - pData["ainfo"]["bc"] = Mist::currBroadAddr; - } - Socket::UDPConnection uSock; - uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); - uSock.SendNow(pStat.toString()); - lastProcUpdate = Util::bootSecs(); - } - } - INFO_MSG("Closing process clean"); - } }// namespace Mist @@ -406,7 +308,6 @@ void sourceThread(void *){ int devnull = open("/dev/null", O_RDWR); Socket::Connection c(devnull, devnull); Mist::ProcessSource out(c); - while (Mist::doingSetup && conf.is_active){Util::sleep(200);} if (conf.is_active){ INFO_MSG("Running source thread..."); out.run(); @@ -678,6 +579,7 @@ int main(int argc, char *argv[]){ capa["required"]["target_profiles"]["type"] = "sublist"; capa["required"]["target_profiles"]["itemLabel"] = "profile"; capa["required"]["target_profiles"]["help"] = "Tracks to transcode the source into"; + capa["required"]["target_profiles"]["sort"] = "n"; { JSON::Value &grp = capa["required"]["target_profiles"]["required"]; grp["name"]["name"] = "Name"; @@ -689,28 +591,50 @@ int main(int argc, char *argv[]){ grp["bitrate"]["unit"] = "bits per second"; grp["bitrate"]["type"] = "int"; grp["bitrate"]["n"] = 1; + }{ + JSON::Value &grp = capa["required"]["target_profiles"]["optional"]; grp["width"]["name"] = "Width"; - grp["width"]["help"] = "Width in pixels of the output"; + grp["width"]["help"] = "Width in pixels of the output. Defaults to match aspect with height, or source width if both are default."; grp["width"]["unit"] = "px"; grp["width"]["type"] = "int"; grp["width"]["n"] = 2; grp["height"]["name"] = "Height"; - grp["height"]["help"] = "Height in pixels of the output"; + grp["height"]["help"] = "Height in pixels of the output. Defaults to match aspect with width, or source height if both are default. If only height is given and the source height is greater than the source width, width and height will swap and do what you most likely wanted to do (e.g. follow your config in portrait mode instead of landscape mode)."; grp["height"]["unit"] = "px"; grp["height"]["type"] = "int"; grp["height"]["n"] = 3; - }{ - JSON::Value &grp = capa["required"]["target_profiles"]["optional"]; grp["fps"]["name"] = "Framerate"; - grp["fps"]["help"] = "Framerate of the output"; + grp["fps"]["help"] = "Framerate of the output. Zero means to match the input (= the default)."; grp["fps"]["unit"] = "frames per second"; + grp["fps"]["default"] = 0; grp["fps"]["type"] = "int"; grp["fps"]["n"] = 4; grp["gop"]["name"] = "Keyframe interval / GOP size"; - grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. Empty string means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes."; + grp["gop"]["help"] = "Interval of keyframes / duration of GOPs for the transcode. \"0.0\" means to match input (= the default), 'intra' means to send only key frames. Otherwise, fractional seconds between keyframes."; grp["gop"]["unit"] = "seconds"; + grp["gop"]["default"] = "0.0"; grp["gop"]["type"] = "str"; grp["gop"]["n"] = 5; + + grp["profile"]["name"] = "H264 Profile"; + grp["profile"]["help"] = "Profile to use. Defaults to \"High\"."; + grp["profile"]["type"] = "select"; + grp["profile"]["select"][0u][0u] = "H264High"; + grp["profile"]["select"][0u][1u] = "High"; + grp["profile"]["select"][1u][0u] = "H264Baseline"; + grp["profile"]["select"][1u][1u] = "Baseline"; + grp["profile"]["select"][2u][0u] = "H264Main"; + grp["profile"]["select"][2u][1u] = "Main"; + grp["profile"]["select"][3u][0u] = "H264ConstrainedHigh"; + grp["profile"]["select"][3u][1u] = "High, without b-frames"; + grp["profile"]["default"] = "H264High"; + + grp["track_inhibit"]["name"] = "Track inhibitor(s)"; + grp["track_inhibit"]["help"] = + "What tracks to use as inhibitors. If this track selector is able to select a track, the profile is not used. Only verified on initial boot of the process and then never again. Defaults to none."; + grp["track_inhibit"]["type"] = "string"; + grp["track_inhibit"]["validate"][0u] = "track_selector"; + grp["track_inhibit"]["default"] = "audio=none&video=none&subtitle=none"; } capa["optional"]["track_inhibit"]["name"] = "Track inhibitor(s)"; @@ -753,12 +677,23 @@ int main(int argc, char *argv[]){ } // check config for generic options - Mist::ProcLivepeer Enc; - if (!Enc.CheckConfig()){ - FAIL_MSG("Error config syntax error!"); + srand(getpid()); + // Check generic configuration variables + if (!Mist::opt.isMember("source") || !Mist::opt["source"] || !Mist::opt["source"].isString()){ + FAIL_MSG("Missing or blank source in config!"); return 1; } + if (!Mist::opt.isMember("sink") || !Mist::opt["sink"] || !Mist::opt["sink"].isString()){ + INFO_MSG("No sink explicitly set, using source as sink"); + } + if (!Mist::opt.isMember("custom_url") || !Mist::opt["custom_url"] || !Mist::opt["custom_url"].isString()){ + api_url = "https://livepeer.live/api"; + }else{ + api_url = Mist::opt["custom_url"].asStringRef(); + } + + { //Ensure stream name is set in all threads std::string streamName = Mist::opt["sink"].asString(); @@ -767,21 +702,173 @@ int main(int argc, char *argv[]){ Util::setStreamName(Mist::opt["source"].asString() + "→" + streamName); } - // stream which connects to input - tthread::thread source(sourceThread, 0); - Util::sleep(500); + //connect to source metadata + DTSC::Meta M(Mist::opt["source"].asStringRef(), false); - // needs to pass through encoder to outputEBML + //find source video track + std::map targetParams; + targetParams["video"] = "maxbps"; + JSON::Value sourceCapa; + sourceCapa["name"] = "Livepeer"; + sourceCapa["codecs"][0u][0u].append("+H264"); + sourceCapa["codecs"][0u][0u].append("+HEVC"); + sourceCapa["codecs"][0u][0u].append("+MPEG2"); + if (Mist::opt.isMember("source_track") && Mist::opt["source_track"].isString() && Mist::opt["source_track"]){ + targetParams["video"] = Mist::opt["source_track"].asStringRef(); + } + std::set vidTrack = Util::wouldSelect(M, targetParams, sourceCapa); + size_t sourceIdx = *(vidTrack.begin()); + if (!M.getWidth(sourceIdx) || !M.getHeight(sourceIdx)){ + FAIL_MSG("Source track does not have a valid width and height"); + return 1; + } + + //build transcode request + JSON::Value pl; + pl["name"] = Mist::opt["source"]; + pl["profiles"] = Mist::opt["target_profiles"]; + jsonForEach(pl["profiles"], prof){ + if (!prof->isMember("gop")){(*prof)["gop"] = "0.0";} + //no or automatic framerate? default to source rate, if set, or 25 otherwise + if (!prof->isMember("fps") || (*prof)["fps"].asDouble() == 0.0){ + (*prof)["fps"] = M.getFpks(sourceIdx); + if (!(*prof)["fps"].asInt()){(*prof)["fps"] = 25000;} + (*prof)["fpsDen"] = 1000; + } + if (!prof->isMember("profile")){(*prof)["profile"] = "H264High";} + if ((!prof->isMember("height") || !(*prof)["height"].asInt()) && (!prof->isMember("width") || !(*prof)["width"].asInt())){ + //no width and no height + (*prof)["width"] = M.getWidth(sourceIdx); + (*prof)["height"] = M.getHeight(sourceIdx); + } + if (!prof->isMember("width") || !(*prof)["width"].asInt()){ + //no width, but we have height + //first, check if our source is in portrait mode, if so, we assume they meant width instead of height + if (M.getWidth(sourceIdx) < M.getHeight(sourceIdx)){ + //portrait mode + uint32_t heightSetting = (*prof)["height"].asInt(); + (*prof)["width"] = heightSetting; + (*prof)["height"] = M.getHeight(sourceIdx) * heightSetting / M.getWidth(sourceIdx); + }else{ + //landscape mode + uint32_t heightSetting = (*prof)["height"].asInt(); + (*prof)["width"] = M.getWidth(sourceIdx) * heightSetting / M.getHeight(sourceIdx); + } + } + if (!prof->isMember("height") || !(*prof)["height"].asInt()){ + //no height, but we have width + //No portrait/landscape check, as per documentation + uint32_t widthSetting = (*prof)["width"].asInt(); + (*prof)["height"] = M.getHeight(sourceIdx) * widthSetting / M.getWidth(sourceIdx); + } + //force width/height to multiples of 16 + (*prof)["width"] = ((*prof)["width"].asInt() / 16) * 16; + (*prof)["height"] = ((*prof)["height"].asInt() / 16) * 16; + + if (prof->isMember("track_inhibit")){ + std::set wouldSelect = Util::wouldSelect( + M, std::string("audio=none&video=none&subtitle=none&") + (*prof)["track_inhibit"].asStringRef()); + if (wouldSelect.size()){ + if (prof->isMember("name")){ + INFO_MSG("Removing profile because track inhibitor matches: %s", (*prof)["name"].asStringRef().c_str()); + }else{ + INFO_MSG("Removing profile because track inhibitor matches: %s", prof->toString().c_str()); + } + prof.remove(); + continue; + }else{ + prof->removeMember("track_inhibit"); + } + } + INFO_MSG("Profile parsed: %s", prof->toString().c_str()); + } + + //Connect to livepeer API + HTTP::Downloader dl; + dl.setHeader("Authorization", "Bearer "+Mist::opt["access_token"].asStringRef()); + //Get broadcaster list, pick first valid address + if (!dl.get(HTTP::URL(api_url+"/broadcaster"))){ + FAIL_MSG("Livepeer API responded negatively to request for broadcaster list"); + return 1; + } + Mist::lpBroad = JSON::fromString(dl.data()); + if (!Mist::lpBroad || !Mist::lpBroad.isArray()){ + FAIL_MSG("No Livepeer broadcasters available"); + return 1; + } + Mist::pickRandomBroadcaster(); + if (!Mist::currBroadAddr.size()){ + FAIL_MSG("No Livepeer broadcasters available"); + return 1; + } + INFO_MSG("Using broadcaster: %s", Mist::currBroadAddr.c_str()); + + //send transcode request + dl.setHeader("Content-Type", "application/json"); + dl.setHeader("Authorization", "Bearer "+Mist::opt["access_token"].asStringRef()); + if (!dl.post(HTTP::URL(api_url+"/stream"), pl.toString())){ + FAIL_MSG("Livepeer API responded negatively to encode request"); + return 1; + } + Mist::lpEnc = JSON::fromString(dl.data()); + if (!Mist::lpEnc){ + FAIL_MSG("Livepeer API did not respond with JSON"); + return 1; + } + if (!Mist::lpEnc.isMember("id")){ + FAIL_MSG("Livepeer API did not respond with a valid ID: %s", dl.data().data()); + return 1; + } + Mist::lpID = Mist::lpEnc["id"].asStringRef(); + + INFO_MSG("Livepeer transcode ID: %s", Mist::lpID.c_str()); + uint64_t lastProcUpdate = Util::bootSecs(); + pStat["proc_status_update"]["id"] = getpid(); + pStat["proc_status_update"]["proc"] = "Livepeer"; + pData["ainfo"]["lp_id"] = Mist::lpID; + uint64_t startTime = Util::bootSecs(); + + //Here be threads. + + //Source thread, from Mist to LP. + tthread::thread source(sourceThread, 0); + while (!conf.is_active && Util::bootSecs() < lastProcUpdate + 5){Util::sleep(50);} + if (!conf.is_active){WARN_MSG("Timeout waiting for source thread to boot!");} + lastProcUpdate = Util::bootSecs(); + + //Sink thread, from LP to Mist tthread::thread sink(sinkThread, 0); - // uploads prepared segments + while (!co.is_active && Util::bootSecs() < lastProcUpdate + 5){Util::sleep(50);} + if (!co.is_active){WARN_MSG("Timeout waiting for sink thread to boot!");} + lastProcUpdate = Util::bootSecs(); + + // These threads upload prepared segments tthread::thread uploader0(uploadThread, (void*)0); tthread::thread uploader1(uploadThread, (void*)1); - - co.is_active = true; - - // run process - Enc.Run(); + while (conf.is_active && co.is_active){ + Util::sleep(200); + if (lastProcUpdate + 5 <= Util::bootSecs()){ + tthread::lock_guard guard(statsMutex); + pData["active_seconds"] = (Util::bootSecs() - startTime); + pData["ainfo"]["switches"] = statSwitches; + pData["ainfo"]["fail_non200"] = statFailN200; + pData["ainfo"]["fail_timeout"] = statFailTimeout; + pData["ainfo"]["fail_parse"] = statFailParse; + pData["ainfo"]["fail_other"] = statFailOther; + pData["ainfo"]["sourceTime"] = statSourceMs; + pData["ainfo"]["sinkTime"] = statSinkMs; + { + tthread::lock_guard guard(broadcasterMutex); + pData["ainfo"]["bc"] = Mist::currBroadAddr; + } + Socket::UDPConnection uSock; + uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); + uSock.SendNow(pStat.toString()); + lastProcUpdate = Util::bootSecs(); + } + } + INFO_MSG("Clean shutdown; joining threads"); co.is_active = false; conf.is_active = false; @@ -791,7 +878,7 @@ int main(int argc, char *argv[]){ uploader0.join(); uploader1.join(); - INFO_MSG("Livepeer transcode shutting down: %s", Util::exitReason); + INFO_MSG("Shutdown reason: %s", Util::exitReason); return 0; } diff --git a/src/process/process_livepeer.h b/src/process/process_livepeer.h index d12d1021..b2577411 100644 --- a/src/process/process_livepeer.h +++ b/src/process/process_livepeer.h @@ -7,7 +7,6 @@ namespace Mist{ bool getFirst = false; bool sendFirst = false; - bool doingSetup = true; uint64_t packetTimeDiff; uint64_t sendPacketTime; @@ -75,13 +74,5 @@ namespace Mist{ std::string currBroadAddr; std::string lpID; - class ProcLivepeer{ - public: - std::string api_url; - ProcLivepeer(){}; - bool CheckConfig(); - void Run(); - }; - }// namespace Mist