Merge branch 'development' into LTS_development

# Conflicts:
#	src/output/output_http_internal.cpp
This commit is contained in:
Thulinma 2018-11-13 16:35:12 +01:00
commit 109995809d
13 changed files with 85 additions and 73 deletions

View file

@ -607,37 +607,51 @@ bool JSON::Value::operator!=(const JSON::Value & rhs) const {
} }
bool JSON::Value::compareExcept(const Value & rhs, const std::set<std::string> & skip) const { bool JSON::Value::compareExcept(const Value & rhs, const std::set<std::string> & skip) const {
if (myType != OBJECT) { if (myType == OBJECT) {
return ((*this) == rhs); jsonForEachConst(*this, it){
} if (skip.count(it.key())){continue;}
jsonForEachConst(*this, it){ if (!rhs.isMember(it.key()) || !(*it).compareExcept(rhs[it.key()], skip)) {
if (skip.count(it.key())){continue;} return false;
if (!rhs.isMember(it.key()) || !(*it).compareExcept(rhs[it.key()], skip)) { }
return false;
} }
jsonForEachConst(rhs, it){
if (skip.count(it.key())){continue;}
if (!(*this).isMember(it.key())){return false;}
}
return true;
} }
jsonForEachConst(rhs, it){ if (myType == ARRAY) {
if (skip.count(it.key())){continue;} if (size() != rhs.size()){return false;}
if (!(*this).isMember(it.key())){return false;} jsonForEachConst(*this, it){
if (!(*it).compareExcept(rhs[it.num()], skip)){return false;}
}
return true;
} }
return true; return ((*this) == rhs);
} }
bool JSON::Value::compareOnly(const Value & rhs, const std::set<std::string> & check) const { bool JSON::Value::compareOnly(const Value & rhs, const std::set<std::string> & check) const {
if (myType != OBJECT) { if (myType == OBJECT) {
return ((*this) == rhs); jsonForEachConst(*this, it){
} if (!check.count(it.key())){continue;}
jsonForEachConst(*this, it){ if (!rhs.isMember(it.key()) || !(*it).compareOnly(rhs[it.key()], check)) {
if (!check.count(it.key())){continue;} return false;
if (!rhs.isMember(it.key()) || !(*it).compareOnly(rhs[it.key()], check)) { }
return false;
} }
jsonForEachConst(rhs, it){
if (!check.count(it.key())){continue;}
if (!(*this).isMember(it.key())){return false;}
}
return true;
} }
jsonForEachConst(rhs, it){ if (myType == ARRAY) {
if (!check.count(it.key())){continue;} if (size() != rhs.size()){return false;}
if (!(*this).isMember(it.key())){return false;} jsonForEachConst(*this, it){
if (!(*it).compareOnly(rhs[it.num()], check)){return false;}
}
return true;
} }
return true; return ((*this) == rhs);
} }
/// Completely clears the contents of this value, /// Completely clears the contents of this value,

View file

@ -4,14 +4,30 @@
void AnalyserDTSC::init(Util::Config &conf){ void AnalyserDTSC::init(Util::Config &conf){
Analyser::init(conf); Analyser::init(conf);
JSON::Value opt;
opt["long"] = "headless";
opt["short"] = "H";
opt["help"] = "Parse entire file or streams as a single headless DTSC packet";
conf.addOption("headless", opt);
opt.null();
} }
AnalyserDTSC::AnalyserDTSC(Util::Config &conf) : Analyser(conf){ AnalyserDTSC::AnalyserDTSC(Util::Config &conf) : Analyser(conf){
conn = Socket::Connection(0, fileno(stdin)); conn = Socket::Connection(0, fileno(stdin));
totalBytes = 0; totalBytes = 0;
headLess = conf.getBool("headless");
} }
bool AnalyserDTSC::parsePacket(){ bool AnalyserDTSC::parsePacket(){
if (headLess){
while (conn){
if (!conn.spool()){Util::sleep(50);}
}
std::string dataBuf = conn.Received().remove(conn.Received().bytes(0xFFFFFFFFul));
DTSC::Scan S((char*)dataBuf.data(), dataBuf.size());
std::cout << S.toPrettyString() << std::endl;
return false;
}
P.reInit(conn); P.reInit(conn);
if (conn && !P){ if (conn && !P){
FAIL_MSG("Invalid DTSC packet @ byte %llu", totalBytes) FAIL_MSG("Invalid DTSC packet @ byte %llu", totalBytes)

View file

@ -8,6 +8,7 @@ public:
static void init(Util::Config &conf); static void init(Util::Config &conf);
private: private:
bool headLess;
DTSC::Packet P; DTSC::Packet P;
Socket::Connection conn; Socket::Connection conn;
uint64_t totalBytes; uint64_t totalBytes;

View file

@ -173,8 +173,12 @@ namespace Mist{
/// Called when stream initialization has failed. /// Called when stream initialization has failed.
/// The standard implementation will set isInitialized to false and close the client connection, /// The standard implementation will set isInitialized to false and close the client connection,
/// thus causing the process to exit cleanly. /// thus causing the process to exit cleanly.
void Output::onFail(){ void Output::onFail(const std::string & msg, bool critical){
MEDIUM_MSG("onFail"); if (critical){
FAIL_MSG("onFail '%s': %s", streamName.c_str(), msg.c_str());
}else{
MEDIUM_MSG("onFail '%s': %s", streamName.c_str(), msg.c_str());
}
isInitialized = false; isInitialized = false;
wantRequest = true; wantRequest = true;
parseData= false; parseData= false;
@ -196,7 +200,7 @@ namespace Mist{
reconnect(); reconnect();
//if the connection failed, fail //if the connection failed, fail
if (streamName.size() < 1){ if (streamName.size() < 1){
onFail(); onFail("Could not connect to stream", true);
return; return;
} }
sought = false; sought = false;
@ -365,14 +369,12 @@ namespace Mist{
if (config->hasOption("noinput") && config->getBool("noinput")){ if (config->hasOption("noinput") && config->getBool("noinput")){
Util::sanitizeName(streamName); Util::sanitizeName(streamName);
if (!Util::streamAlive(streamName)){ if (!Util::streamAlive(streamName)){
FAIL_MSG("Stream %s not already active - aborting initialization", streamName.c_str()); onFail("Stream not active already, aborting");
onFail();
return; return;
} }
}else{ }else{
if (!Util::startInput(streamName, "", true, isPushing())){ if (!Util::startInput(streamName, "", true, isPushing())){
FAIL_MSG("Opening stream %s failed - aborting initialization", streamName.c_str()); onFail("Stream open failed", true);
onFail();
return; return;
} }
} }
@ -385,16 +387,14 @@ namespace Mist{
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
} }
if (!nProxy.userClient.isAlive()){ if (!nProxy.userClient.isAlive()){
FAIL_MSG("Could not register as client for %s", streamName.c_str()); onFail("Could not register as client", true);
onFail();
return; return;
} }
char pageId[NAME_BUFFER_SIZE]; char pageId[NAME_BUFFER_SIZE];
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
nProxy.metaPages[0].init(pageId, DEFAULT_STRM_PAGE_SIZE); nProxy.metaPages[0].init(pageId, DEFAULT_STRM_PAGE_SIZE);
if (!nProxy.metaPages[0].mapped){ if (!nProxy.metaPages[0].mapped){
FAIL_MSG("Could not connect to data for %s", streamName.c_str()); onFail("Could not connect to stream data", true);
onFail();
return; return;
} }
isInitialized = true; isInitialized = true;
@ -1710,12 +1710,7 @@ namespace Mist{
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
if (!nProxy.userClient.getData()){ if (!nProxy.userClient.getData()){
WARN_MSG("Player connection failure - aborting output"); onFail("Player connection failure - aborting output", true);
if (!onFinish()){
myConn.close();
}else{
disconnect();
}
return; return;
} }
} }
@ -1723,21 +1718,11 @@ namespace Mist{
if (isPushing() && !pushIsOngoing){ if (isPushing() && !pushIsOngoing){
waitForStreamPushReady(); waitForStreamPushReady();
if (!nProxy.userClient.isAlive()){ if (!nProxy.userClient.isAlive()){
WARN_MSG("Failed to wait for buffer, aborting incoming push"); onFail("Failed to wait for buffer, aborting incoming push", true);
if (!onFinish()){
myConn.close();
}else{
disconnect();
}
return; return;
} }
}else{ }else{
INFO_MSG("Received disconnect request from input"); onFail("Received disconnect request from input");
if (!onFinish()){
myConn.close();
}else{
disconnect();
}
return; return;
} }
} }

View file

@ -78,7 +78,7 @@ namespace Mist {
void disconnect(); void disconnect();
virtual void initialize(); virtual void initialize();
virtual void sendHeader(); virtual void sendHeader();
virtual void onFail(); virtual void onFail(const std::string & msg, bool critical = false);
virtual void requestHandler(); virtual void requestHandler();
static Util::Config * config; static Util::Config * config;
void playbackSleep(uint64_t millis); void playbackSleep(uint64_t millis);

View file

@ -418,7 +418,7 @@ namespace Mist {
} }
if (H.url.find("hls") == std::string::npos){ if (H.url.find("hls") == std::string::npos){
onFail(); onFail("HLS handler active, but this is not a HLS URL. Eh... What...?");
return; return;
} }
@ -434,11 +434,7 @@ namespace Mist {
} }
initialize(); initialize();
if (!keepGoing()){return;}
if (!keepGoing()){
onFail();
return;
}
if (H.url.substr(5 + streamName.size(), 5) == "/push"){ if (H.url.substr(5 + streamName.size(), 5) == "/push"){
std::string relPushUrl = H.url.substr(10 + streamName.size()); std::string relPushUrl = H.url.substr(10 + streamName.size());

View file

@ -41,13 +41,14 @@ namespace Mist {
config = cfg; config = cfg;
} }
void HTTPOutput::onFail(){ void HTTPOutput::onFail(const std::string & msg, bool critical){
INFO_MSG("Failing '%s': %s: %s", streamName.c_str(), H.url.c_str(), msg.c_str());
if (!webSock){ if (!webSock){
H.Clean(); //make sure no parts of old requests are left in any buffers H.Clean(); //make sure no parts of old requests are left in any buffers
H.SetBody("Stream not found. Sorry, we tried."); H.SetBody("Could not retrieve stream: "+msg);
H.SendResponse("404", "Stream not found", myConn); H.SendResponse("404", "Error opening stream", myConn);
} }
Output::onFail(); Output::onFail(msg, critical);
} }
bool isMatch(const std::string & url, const std::string & m, std::string & streamname){ bool isMatch(const std::string & url, const std::string & m, std::string & streamname){

View file

@ -12,7 +12,7 @@ namespace Mist {
virtual ~HTTPOutput(); virtual ~HTTPOutput();
static void init(Util::Config * cfg); static void init(Util::Config * cfg);
void onRequest(); void onRequest();
virtual void onFail(); virtual void onFail(const std::string & msg, bool critical = false);
virtual void onHTTP(){}; virtual void onHTTP(){};
virtual void onIdle(){}; virtual void onIdle(){};
virtual void onWebsocketFrame(){}; virtual void onWebsocketFrame(){};

View file

@ -66,7 +66,7 @@ namespace Mist {
return !(config->getString("ip").size()); return !(config->getString("ip").size());
} }
void OutHTTP::onFail(){ void OutHTTP::onFail(const std::string & msg, bool critical){
std::string method = H.method; std::string method = H.method;
// send logo icon // send logo icon
if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){ if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){
@ -81,6 +81,7 @@ namespace Mist {
if (websocketHandler()){return;} if (websocketHandler()){return;}
JSON::Value json_resp; JSON::Value json_resp;
json_resp["error"] = "Could not retrieve stream. Sorry."; json_resp["error"] = "Could not retrieve stream. Sorry.";
json_resp["error_guru"] = msg;
if (config->getString("nostreamtext") != ""){ if (config->getString("nostreamtext") != ""){
json_resp["on_error"] = config->getString("nostreamtext"); json_resp["on_error"] = config->getString("nostreamtext");
} }
@ -101,9 +102,7 @@ namespace Mist {
H.Clean(); H.Clean();
return; return;
} }
INFO_MSG("Failing: %s", H.url.c_str()); HTTPOutput::onFail(msg, critical);
HTTPOutput::onFail();
Output::onFail();
} }
void OutHTTP::init(Util::Config * cfg){ void OutHTTP::init(Util::Config * cfg){

View file

@ -8,7 +8,7 @@ namespace Mist {
~OutHTTP(); ~OutHTTP();
static void init(Util::Config * cfg); static void init(Util::Config * cfg);
static bool listenMode(); static bool listenMode();
virtual void onFail(); virtual void onFail(const std::string & msg, bool critical = false);
///preHTTP is disabled in the internal HTTP output, since most don't need the stream alive to work ///preHTTP is disabled in the internal HTTP output, since most don't need the stream alive to work
virtual void preHTTP(){}; virtual void preHTTP(){};
void HTMLResponse(); void HTMLResponse();

View file

@ -81,10 +81,10 @@ namespace Mist {
sentHeader = true; sentHeader = true;
} }
void OutJSON::onFail(){ void OutJSON::onFail(const std::string & msg, bool critical){
//Only run failure handle if we're not being persistent //Only run failure handle if we're not being persistent
if (!keepReselecting){ if (!keepReselecting){
HTTPOutput::onFail(); HTTPOutput::onFail(msg, critical);
}else{ }else{
onFinish(); onFinish();
} }

View file

@ -12,7 +12,7 @@ namespace Mist {
virtual void onWebsocketConnect(); virtual void onWebsocketConnect();
virtual void preWebsocketConnect(); virtual void preWebsocketConnect();
bool onFinish(); bool onFinish();
void onFail(); void onFail(const std::string & msg, bool critical = false);
void sendNext(); void sendNext();
void sendHeader(); void sendHeader();
bool doesWebsockets(){return true;} bool doesWebsockets(){return true;}

View file

@ -583,7 +583,7 @@ namespace Mist{
myConn.SendNow(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1) myConn.SendNow(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1)
myConn.SendNow(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5) myConn.SendNow(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5)
myConn.SendNow(RTMPStream::SendCTL(6, RTMPStream::rec_window_size)); //send rec window acknowledgement size (msg 6) myConn.SendNow(RTMPStream::SendCTL(6, RTMPStream::rec_window_size)); //send rec window acknowledgement size (msg 6)
myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 //myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1
//send onBWDone packet - no clue what it is, but real server sends it... //send onBWDone packet - no clue what it is, but real server sends it...
//amfReply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); //amfReply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
//amfReply.addContent(AMF::Object("", "onBWDone"));//result //amfReply.addContent(AMF::Object("", "onBWDone"));//result
@ -600,7 +600,7 @@ namespace Mist{
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
amfReply.addContent(AMF::Object("", (double)1)); //stream ID - we use 1 amfReply.addContent(AMF::Object("", (double)1)); //stream ID - we use 1
sendCommand(amfReply, messageType, streamId); sendCommand(amfReply, messageType, streamId);
myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 //myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1
return; return;
}//createStream }//createStream
if (amfData.getContentP(0)->StrValue() == "ping"){ if (amfData.getContentP(0)->StrValue() == "ping"){