From 9232451a7407969c9e6f5681c587397a4ccbe8f5 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 2 Nov 2021 01:34:44 +0100 Subject: [PATCH] Implemented 422 status code segment skip, new reporting trigger, implements #91 --- src/controller/controller_capabilities.cpp | 6 +++ src/process/process_livepeer.cpp | 48 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/controller/controller_capabilities.cpp b/src/controller/controller_capabilities.cpp index 319b9779..25afd71e 100644 --- a/src/controller/controller_capabilities.cpp +++ b/src/controller/controller_capabilities.cpp @@ -222,6 +222,12 @@ namespace Controller{ trgs["PUSH_END"]["payload"] = "push ID (integer)\nstream name (string)\ntarget URI, before variables/triggers affected it (string)\ntarget URI, afterwards, as actually used (string)\nlast 10 log messages (JSON array string)\nmost recent push status (JSON object string)"; trgs["PUSH_END"]["response"] = "ignored"; trgs["PUSH_END"]["response_action"] = "None."; + + trgs["LIVEPEER_SEGMENT_REJECTED"]["when"] = "Whenever a segment is rejected by MistProcLivepeer with a 422 status code either twice in a row for different broadcasters, or once with no secondary broadcasters available."; + trgs["LIVEPEER_SEGMENT_REJECTED"]["stream_specific"] = true; + trgs["LIVEPEER_SEGMENT_REJECTED"]["payload"] = "transcode options (json string)\nraw segment that was rejected (base64 encoded)\ninformation about the source track (json string)\nfirst attempted broadcaster URL\nsecond attempted broadcaster URL or the text \"N/A\" if no secondary was available"; + trgs["LIVEPEER_SEGMENT_REJECTED"]["response"] = "ignored"; + trgs["LIVEPEER_SEGMENT_REJECTED"]["response_action"] = "None."; } /// Aquire list of available protocols, storing in global 'capabilities' JSON::Value. diff --git a/src/process/process_livepeer.cpp b/src/process/process_livepeer.cpp index 0dc0091c..1da71e24 100644 --- a/src/process/process_livepeer.cpp +++ b/src/process/process_livepeer.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "../input/input.h" #include #include //for stat @@ -397,10 +399,29 @@ void parseMultipart(const Mist::preparedSegment & mySeg, const std::string & cTy } } +void segmentRejectedTrigger(Mist::preparedSegment & mySeg, const std::string & bc1, const std::string & bc2){ + if (Triggers::shouldTrigger("LIVEPEER_SEGMENT_REJECTED", Util::streamName)){ + FAIL_MSG("Segment could not be transcoded, skipping to next and submitting for analysis"); + JSON::Value trackInfo; + trackInfo["width"] = mySeg.width; + trackInfo["height"] = mySeg.height; + trackInfo["duration"] = mySeg.segDuration; + std::string payload = Mist::opt.toString()+"\n"+Encodings::Base64::encode(std::string(mySeg.data, mySeg.data.size()))+"\n"+trackInfo.toString()+"\n"+bc1+"\n"+bc2; + Triggers::doTrigger("LIVEPEER_SEGMENT_REJECTED", payload, Util::streamName); + }else{ + FAIL_MSG("Segment could not be transcoded, skipping to next"); + } + mySeg.fullyWritten = false; + mySeg.fullyRead = true; + insertTurn = (insertTurn + 1) % PRESEG_COUNT; +} + void uploadThread(void * num){ size_t myNum = (size_t)num; Mist::preparedSegment & mySeg = Mist::presegs[myNum]; HTTP::Downloader upper; + bool was422 = false; + std::string prevURL; while (conf.is_active){ while (conf.is_active && !mySeg.fullyWritten){Util::sleep(100);} if (!conf.is_active){return;}//Exit early on shutdown @@ -421,6 +442,8 @@ void uploadThread(void * num){ uplTime = Util::getMicros(uplTime); if (upper.getStatusCode() == 200){ MEDIUM_MSG("Uploaded %zu bytes (time %" PRIu64 "-%" PRIu64 " = %" PRIu64 " ms) to %s in %.2f ms", mySeg.data.size(), mySeg.time, mySeg.time+mySeg.segDuration, mySeg.segDuration, target.getUrl().c_str(), uplTime/1000.0); + was422 = false; + prevURL.clear(); mySeg.fullyWritten = false; mySeg.fullyRead = true; //Wait your turn @@ -434,8 +457,22 @@ void uploadThread(void * num){ } insertTurn = (insertTurn + 1) % PRESEG_COUNT; break;//Success: no need to retry + }else if (upper.getStatusCode() == 422){ + //segment rejected by broadcaster node; try a different broadcaster at most once and keep track + ++statFailN200; + WARN_MSG("Rejected upload of %zu bytes to %s after %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str()); + if (was422){ + //second error in a row, fire off LIVEPEER_SEGMENT_REJECTED trigger + segmentRejectedTrigger(mySeg, prevURL, target.getUrl()); + was422 = false; + prevURL.clear(); + break; + }else{ + prevURL = target.getUrl(); + was422 = true; + } }else{ - //Failure due to non-200 status code + //Failure due to non-200/422 status code ++statFailN200; WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str()); } @@ -454,6 +491,7 @@ void uploadThread(void * num){ conf.is_active = false; return; } + bool switchSuccess = false; { tthread::lock_guard guard(broadcasterMutex); std::string prevBroadAddr = Mist::currBroadAddr; @@ -466,11 +504,19 @@ void uploadThread(void * num){ } if (Mist::currBroadAddr != prevBroadAddr){ ++statSwitches; + switchSuccess = true; WARN_MSG("Switched to new broadcaster: %s", Mist::currBroadAddr.c_str()); }else{ WARN_MSG("Cannot switch broadcaster; only a single option is available"); } } + if (!switchSuccess && was422){ + //no switch possible, fire off LIVEPEER_SEGMENT_REJECTED trigger + segmentRejectedTrigger(mySeg, prevURL, "N/A"); + was422 = false; + prevURL.clear(); + break; + } }while(conf.is_active); } }