Implemented 422 status code segment skip, new reporting trigger, implements #91
This commit is contained in:
		
							parent
							
								
									b62747c402
								
							
						
					
					
						commit
						9232451a74
					
				
					 2 changed files with 53 additions and 1 deletions
				
			
		| 
						 | 
					@ -222,6 +222,12 @@ namespace Controller{
 | 
				
			||||||
    trgs["PUSH_END"]["payload"] = "push ID (integer)\nstream name (string)\ntarget URI, before variables/triggers affected it (string)\ntarget URI, afterwards, as actually used (string)\nlast 10 log messages (JSON array string)\nmost recent push status (JSON object string)";
 | 
					    trgs["PUSH_END"]["payload"] = "push ID (integer)\nstream name (string)\ntarget URI, before variables/triggers affected it (string)\ntarget URI, afterwards, as actually used (string)\nlast 10 log messages (JSON array string)\nmost recent push status (JSON object string)";
 | 
				
			||||||
    trgs["PUSH_END"]["response"] = "ignored";
 | 
					    trgs["PUSH_END"]["response"] = "ignored";
 | 
				
			||||||
    trgs["PUSH_END"]["response_action"] = "None.";
 | 
					    trgs["PUSH_END"]["response_action"] = "None.";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    trgs["LIVEPEER_SEGMENT_REJECTED"]["when"] = "Whenever a segment is rejected by MistProcLivepeer with a 422 status code either twice in a row for different broadcasters, or once with no secondary broadcasters available.";
 | 
				
			||||||
 | 
					    trgs["LIVEPEER_SEGMENT_REJECTED"]["stream_specific"] = true;
 | 
				
			||||||
 | 
					    trgs["LIVEPEER_SEGMENT_REJECTED"]["payload"] = "transcode options (json string)\nraw segment that was rejected (base64 encoded)\ninformation about the source track (json string)\nfirst attempted broadcaster URL\nsecond attempted broadcaster URL or the text \"N/A\" if no secondary was available";
 | 
				
			||||||
 | 
					    trgs["LIVEPEER_SEGMENT_REJECTED"]["response"] = "ignored";
 | 
				
			||||||
 | 
					    trgs["LIVEPEER_SEGMENT_REJECTED"]["response_action"] = "None.";
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Aquire list of available protocols, storing in global 'capabilities' JSON::Value.
 | 
					  /// Aquire list of available protocols, storing in global 'capabilities' JSON::Value.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -5,6 +5,8 @@
 | 
				
			||||||
#include <mist/procs.h>
 | 
					#include <mist/procs.h>
 | 
				
			||||||
#include <mist/util.h>
 | 
					#include <mist/util.h>
 | 
				
			||||||
#include <mist/downloader.h>
 | 
					#include <mist/downloader.h>
 | 
				
			||||||
 | 
					#include <mist/triggers.h>
 | 
				
			||||||
 | 
					#include <mist/encode.h>
 | 
				
			||||||
#include "../input/input.h"
 | 
					#include "../input/input.h"
 | 
				
			||||||
#include <ostream>
 | 
					#include <ostream>
 | 
				
			||||||
#include <sys/stat.h>  //for stat
 | 
					#include <sys/stat.h>  //for stat
 | 
				
			||||||
| 
						 | 
					@ -397,10 +399,29 @@ void parseMultipart(const Mist::preparedSegment & mySeg, const std::string & cTy
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void segmentRejectedTrigger(Mist::preparedSegment & mySeg, const std::string & bc1, const std::string & bc2){
 | 
				
			||||||
 | 
					  if (Triggers::shouldTrigger("LIVEPEER_SEGMENT_REJECTED", Util::streamName)){
 | 
				
			||||||
 | 
					    FAIL_MSG("Segment could not be transcoded, skipping to next and submitting for analysis");
 | 
				
			||||||
 | 
					    JSON::Value trackInfo;
 | 
				
			||||||
 | 
					    trackInfo["width"] = mySeg.width;
 | 
				
			||||||
 | 
					    trackInfo["height"] = mySeg.height;
 | 
				
			||||||
 | 
					    trackInfo["duration"] = mySeg.segDuration;
 | 
				
			||||||
 | 
					    std::string payload = Mist::opt.toString()+"\n"+Encodings::Base64::encode(std::string(mySeg.data, mySeg.data.size()))+"\n"+trackInfo.toString()+"\n"+bc1+"\n"+bc2;
 | 
				
			||||||
 | 
					    Triggers::doTrigger("LIVEPEER_SEGMENT_REJECTED", payload, Util::streamName);
 | 
				
			||||||
 | 
					  }else{
 | 
				
			||||||
 | 
					    FAIL_MSG("Segment could not be transcoded, skipping to next");
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  mySeg.fullyWritten = false;
 | 
				
			||||||
 | 
					  mySeg.fullyRead = true;
 | 
				
			||||||
 | 
					  insertTurn = (insertTurn + 1) % PRESEG_COUNT;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void uploadThread(void * num){
 | 
					void uploadThread(void * num){
 | 
				
			||||||
  size_t myNum = (size_t)num;
 | 
					  size_t myNum = (size_t)num;
 | 
				
			||||||
  Mist::preparedSegment & mySeg = Mist::presegs[myNum];
 | 
					  Mist::preparedSegment & mySeg = Mist::presegs[myNum];
 | 
				
			||||||
  HTTP::Downloader upper;
 | 
					  HTTP::Downloader upper;
 | 
				
			||||||
 | 
					  bool was422 = false;
 | 
				
			||||||
 | 
					  std::string prevURL;
 | 
				
			||||||
  while (conf.is_active){
 | 
					  while (conf.is_active){
 | 
				
			||||||
    while (conf.is_active && !mySeg.fullyWritten){Util::sleep(100);}
 | 
					    while (conf.is_active && !mySeg.fullyWritten){Util::sleep(100);}
 | 
				
			||||||
    if (!conf.is_active){return;}//Exit early on shutdown
 | 
					    if (!conf.is_active){return;}//Exit early on shutdown
 | 
				
			||||||
| 
						 | 
					@ -421,6 +442,8 @@ void uploadThread(void * num){
 | 
				
			||||||
        uplTime = Util::getMicros(uplTime);
 | 
					        uplTime = Util::getMicros(uplTime);
 | 
				
			||||||
        if (upper.getStatusCode() == 200){
 | 
					        if (upper.getStatusCode() == 200){
 | 
				
			||||||
          MEDIUM_MSG("Uploaded %zu bytes (time %" PRIu64 "-%" PRIu64 " = %" PRIu64 " ms) to %s in %.2f ms", mySeg.data.size(), mySeg.time, mySeg.time+mySeg.segDuration, mySeg.segDuration, target.getUrl().c_str(), uplTime/1000.0);
 | 
					          MEDIUM_MSG("Uploaded %zu bytes (time %" PRIu64 "-%" PRIu64 " = %" PRIu64 " ms) to %s in %.2f ms", mySeg.data.size(), mySeg.time, mySeg.time+mySeg.segDuration, mySeg.segDuration, target.getUrl().c_str(), uplTime/1000.0);
 | 
				
			||||||
 | 
					          was422 = false;
 | 
				
			||||||
 | 
					          prevURL.clear();
 | 
				
			||||||
          mySeg.fullyWritten = false;
 | 
					          mySeg.fullyWritten = false;
 | 
				
			||||||
          mySeg.fullyRead = true;
 | 
					          mySeg.fullyRead = true;
 | 
				
			||||||
          //Wait your turn
 | 
					          //Wait your turn
 | 
				
			||||||
| 
						 | 
					@ -434,8 +457,22 @@ void uploadThread(void * num){
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
          insertTurn = (insertTurn + 1) % PRESEG_COUNT;
 | 
					          insertTurn = (insertTurn + 1) % PRESEG_COUNT;
 | 
				
			||||||
          break;//Success: no need to retry
 | 
					          break;//Success: no need to retry
 | 
				
			||||||
 | 
					        }else if (upper.getStatusCode() == 422){
 | 
				
			||||||
 | 
					          //segment rejected by broadcaster node; try a different broadcaster at most once and keep track
 | 
				
			||||||
 | 
					          ++statFailN200;
 | 
				
			||||||
 | 
					          WARN_MSG("Rejected upload of %zu bytes to %s after %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str());
 | 
				
			||||||
 | 
					          if (was422){
 | 
				
			||||||
 | 
					            //second error in a row, fire off LIVEPEER_SEGMENT_REJECTED trigger
 | 
				
			||||||
 | 
					            segmentRejectedTrigger(mySeg, prevURL, target.getUrl());
 | 
				
			||||||
 | 
					            was422 = false;
 | 
				
			||||||
 | 
					            prevURL.clear();
 | 
				
			||||||
 | 
					            break;
 | 
				
			||||||
 | 
					          }else{
 | 
				
			||||||
 | 
					            prevURL = target.getUrl();
 | 
				
			||||||
 | 
					            was422 = true;
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
        }else{
 | 
					        }else{
 | 
				
			||||||
          //Failure due to non-200 status code
 | 
					          //Failure due to non-200/422 status code
 | 
				
			||||||
          ++statFailN200;
 | 
					          ++statFailN200;
 | 
				
			||||||
          WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str());
 | 
					          WARN_MSG("Failed to upload %zu bytes to %s in %.2f ms: %" PRIu32 " %s", mySeg.data.size(), target.getUrl().c_str(), uplTime/1000.0, upper.getStatusCode(), upper.getStatusText().c_str());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
| 
						 | 
					@ -454,6 +491,7 @@ void uploadThread(void * num){
 | 
				
			||||||
        conf.is_active = false;
 | 
					        conf.is_active = false;
 | 
				
			||||||
        return;
 | 
					        return;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					      bool switchSuccess = false;
 | 
				
			||||||
      {
 | 
					      {
 | 
				
			||||||
        tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
 | 
					        tthread::lock_guard<tthread::mutex> guard(broadcasterMutex);
 | 
				
			||||||
        std::string prevBroadAddr = Mist::currBroadAddr;
 | 
					        std::string prevBroadAddr = Mist::currBroadAddr;
 | 
				
			||||||
| 
						 | 
					@ -466,11 +504,19 @@ void uploadThread(void * num){
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (Mist::currBroadAddr != prevBroadAddr){
 | 
					        if (Mist::currBroadAddr != prevBroadAddr){
 | 
				
			||||||
          ++statSwitches;
 | 
					          ++statSwitches;
 | 
				
			||||||
 | 
					          switchSuccess = true;
 | 
				
			||||||
          WARN_MSG("Switched to new broadcaster: %s", Mist::currBroadAddr.c_str());
 | 
					          WARN_MSG("Switched to new broadcaster: %s", Mist::currBroadAddr.c_str());
 | 
				
			||||||
        }else{
 | 
					        }else{
 | 
				
			||||||
          WARN_MSG("Cannot switch broadcaster; only a single option is available");
 | 
					          WARN_MSG("Cannot switch broadcaster; only a single option is available");
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					      if (!switchSuccess && was422){
 | 
				
			||||||
 | 
					        //no switch possible, fire off LIVEPEER_SEGMENT_REJECTED trigger
 | 
				
			||||||
 | 
					        segmentRejectedTrigger(mySeg, prevURL, "N/A");
 | 
				
			||||||
 | 
					        was422 = false;
 | 
				
			||||||
 | 
					        prevURL.clear();
 | 
				
			||||||
 | 
					        break;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
    }while(conf.is_active);
 | 
					    }while(conf.is_active);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue