Added machine readable exit reasons, INPUT_END trigger, OUTPUT_END trigger, and updated RECORDING_END trigger to include all of these.

This commit is contained in:
Marco 2022-12-22 13:41:10 +01:00 committed by Thulinma
parent a16d98b7b2
commit b0d4422d27
47 changed files with 493 additions and 256 deletions

View file

@ -20,7 +20,7 @@ int spawnForked(Socket::Connection &S){
void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
HIGH_MSG("USR1 received - triggering rolling restart");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1");
Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1");
Util::Config::is_active = false;
}

View file

@ -113,6 +113,7 @@ namespace Mist{
firstData = true;
newUA = true;
lastPushUpdate = 0;
Util::Config::binaryType = Util::OUTPUT;
lastRecv = Util::bootSecs();
if (myConn){
@ -185,7 +186,7 @@ namespace Mist{
}else{
MEDIUM_MSG("onFail '%s': %s", streamName.c_str(), msg.c_str());
}
Util::logExitReason(msg.c_str());
Util::logExitReason(ER_UNKNOWN, msg.c_str());
isInitialized = false;
wantRequest = false;
parseData = false;
@ -1519,12 +1520,14 @@ namespace Mist{
if (!streamName.size()){
WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["name"].asString().c_str());
onFail("Unconnected recording output", true);
recEndTrigger();
return 2;
}
initialize();
if (!M.getValidTracks().size() || !userSelect.size() || !keepGoing()){
INFO_MSG("Stream not available - aborting");
onFail("Stream not available for recording", true);
recEndTrigger();
return 3;
}
initialSeek();
@ -1562,6 +1565,7 @@ namespace Mist{
}else{
if (!connectToFile(newTarget, targetParams.count("append"))){
onFail("Could not connect to the target for recording", true);
recEndTrigger();
return 3;
}
INFO_MSG("Recording %s to %s with %s format", streamName.c_str(),
@ -1752,7 +1756,7 @@ namespace Mist{
INFO_MSG("Switching to next push target filename: %s", newTarget.c_str());
if (!connectToFile(newTarget)){
FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str());
Util::logExitReason("failed to open file, aborting: %s", newTarget.c_str());
Util::logExitReason(ER_WRITE_FAILURE, "failed to open file, aborting: %s", newTarget.c_str());
onFinish();
break;
}
@ -1763,7 +1767,7 @@ namespace Mist{
}else{
if (!onFinish()){
INFO_MSG("Shutting down because planned stopping point reached");
Util::logExitReason("planned stopping point reached");
Util::logExitReason(ER_CLEAN_INTENDED_STOP, "planned stopping point reached");
break;
}
}
@ -1779,20 +1783,20 @@ namespace Mist{
}
/*LTS-END*/
if (!onFinish()){
Util::logExitReason("end of stream");
Util::logExitReason(ER_CLEAN_EOF, "end of stream");
break;
}
}
}
if (!meta){
Util::logExitReason("lost internal connection to stream data");
Util::logExitReason(ER_SHM_LOST, "lost internal connection to stream data");
break;
}
}
stats();
}
if (!config->is_active){Util::logExitReason("set inactive");}
if (!myConn){Util::logExitReason("connection closed");}
if (!config->is_active){Util::logExitReason(ER_UNKNOWN, "set inactive");}
if (!myConn){Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "connection closed");}
if (strncmp(Util::exitReason, "connection closed", 17) == 0){
MEDIUM_MSG("Client handler shutting down, exit reason: %s", Util::exitReason);
}else{
@ -1831,7 +1835,6 @@ namespace Mist{
}else{
FAIL_MSG("Lost connection to the playlist file `%s` during segmenting", playlistLocationString.c_str());
Util::logExitReason("Lost connection to the playlist file `%s` during segmenting", playlistLocationString.c_str());
return 1;
}
}
@ -1841,25 +1844,10 @@ namespace Mist{
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
Triggers::doTrigger("CONN_CLOSE", payload, streamName);
}
if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName)){
uint64_t rightNow = Util::epoch();
std::stringstream payl;
payl << streamName << '\n';
payl << config->getString("target") << '\n';
payl << capa["name"].asStringRef() << '\n';
payl << myConn.dataUp() << '\n';
payl << (Util::bootSecs() - myConn.connTime()) << '\n';
payl << (rightNow - (Util::bootSecs() - myConn.connTime())) << '\n';
payl << rightNow << '\n';
if (firstPacketTime != 0xFFFFFFFFFFFFFFFFull){
payl << (lastPacketTime - firstPacketTime) << '\n';
}else{
payl << 0 << '\n';
}
payl << firstPacketTime << '\n';
payl << lastPacketTime << '\n';
Triggers::doTrigger("RECORDING_END", payl.str(), streamName);
if (isRecordingToFile){
recEndTrigger();
}
outputEndTrigger();
/*LTS-END*/
disconnect();
@ -1971,7 +1959,7 @@ namespace Mist{
}
if (!dropTracks.size()){
FAIL_MSG("Could not equalize tracks! This is very very very bad and I am now going to shut down to prevent worse.");
Util::logExitReason("Could not equalize tracks");
Util::logExitReason(ER_INTERNAL_ERROR, "Could not equalize tracks");
parseData = false;
config->is_active = false;
return false;
@ -2099,11 +2087,11 @@ namespace Mist{
//every ~1 second, check if the stream is not offline
if (emptyCount % 100 == 0 && Util::getStreamStatus(streamName) == STRMSTAT_OFF){
if (M.getLive()){
Util::logExitReason("Live stream source shut down");
Util::logExitReason(ER_CLEAN_EOF, "Live stream source shut down");
thisPacket.null();
return true;
}else if (!Util::startInput(streamName)){
Util::logExitReason("VoD stream source shut down and could not be restarted");
Util::logExitReason(ER_UNKNOWN, "VoD stream source shut down and could not be restarted");
thisPacket.null();
return true;
}
@ -2329,6 +2317,40 @@ namespace Mist{
return true;
}
std::string Output::getExitTriggerPayload(){
uint64_t rightNow = Util::epoch();
std::stringstream payl;
payl << streamName << '\n';
payl << config->getString("target") << '\n';
payl << capa["name"].asStringRef() << '\n';
payl << myConn.dataUp() << '\n';
payl << (Util::bootSecs() - myConn.connTime()) << '\n';
payl << (rightNow - (Util::bootSecs() - myConn.connTime())) << '\n';
payl << rightNow << '\n';
if (firstPacketTime != 0xFFFFFFFFFFFFFFFFull){
payl << (lastPacketTime - firstPacketTime) << '\n';
}else{
payl << 0 << '\n';
}
payl << firstPacketTime << '\n';
payl << lastPacketTime << '\n';
payl << Util::mRExitReason << '\n';
payl << Util::exitReason << '\n';
return payl.str();
}
void Output::recEndTrigger(){
if (Util::Config::binaryType == Util::OUTPUT && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName)){
Triggers::doTrigger("RECORDING_END", getExitTriggerPayload(), streamName);
}
}
void Output::outputEndTrigger(){
if (Util::Config::binaryType == Util::OUTPUT && config->hasOption("target") && Triggers::shouldTrigger("OUTPUT_END", streamName)){
Triggers::doTrigger("OUTPUT_END", getExitTriggerPayload(), streamName);
}
}
/// Checks if the set streamName allows pushes from this connector/IP/password combination.
/// Runs all appropriate triggers and checks.
/// Returns true if the push should continue, false otherwise.

View file

@ -151,6 +151,9 @@ namespace Mist{
virtual bool isRecording();
virtual bool isFileTarget();
virtual bool isPushing(){return pushing;};
std::string getExitTriggerPayload();
void recEndTrigger();
void outputEndTrigger();
bool allowPush(const std::string &passwd);
void waitForStreamPushReady();

View file

@ -328,7 +328,7 @@ namespace Mist{
if (!newStream.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.c_str());
Util::logExitReason(
Util::logExitReason(ER_TRIGGER,
"Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.c_str());
onFail("Push not allowed - rejected by trigger");

View file

@ -169,7 +169,7 @@ namespace Mist{
}else if (command["type"] == "set_speed") {
handleWebsocketSetSpeed(command);
}else if (command["type"] == "stop") {
Util::logExitReason("User requested stop");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "User requested stop");
myConn.close();
}else if (command["type"] == "play") {
parseData = true;

View file

@ -92,7 +92,7 @@ namespace Mist{
char error_buf[200];
mbedtls_strerror(ret, error_buf, 200);
MEDIUM_MSG("Could not handshake, SSL error: %s (%d)", error_buf, ret);
Util::logExitReason("Could not handshake, SSL error: %s (%d)", error_buf, ret);
Util::logExitReason(ER_READ_START_FAILURE, "Could not handshake, SSL error: %s (%d)", error_buf, ret);
C.close();
return;
}else{
@ -111,7 +111,7 @@ namespace Mist{
int fd[2];
if (socketpair(PF_LOCAL, SOCK_STREAM, 0, fd) != 0){
FAIL_MSG("Could not open anonymous socket for SSL<->HTTP connection!");
Util::logExitReason("Could not open anonymous socket for SSL<->HTTP connection!");
Util::logExitReason(ER_READ_START_FAILURE, "Could not open anonymous socket for SSL<->HTTP connection!");
return 1;
}
std::deque<std::string> args;
@ -137,7 +137,7 @@ namespace Mist{
close(fd[1]);
if (http_proc < 2){
FAIL_MSG("Could not spawn MistOutHTTP process for SSL connection!");
Util::logExitReason("Could not spawn MistOutHTTP process for SSL connection!");
Util::logExitReason(ER_EXEC_FAILURE, "Could not spawn MistOutHTTP process for SSL connection!");
return 1;
}
Socket::Connection http(fd[0]);
@ -153,7 +153,7 @@ namespace Mist{
if (ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE){
if (ret <= 0){
HIGH_MSG("SSL disconnect!");
Util::logExitReason("SSL client disconnected");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SSL client disconnected");
break;
}
// we received ret bytes of data to pass on. Do so.
@ -172,7 +172,7 @@ namespace Mist{
ret = mbedtls_ssl_write(&ssl, (const unsigned char *)http_buf.get().data() + done, toSend - done);
if (ret == MBEDTLS_ERR_NET_CONN_RESET || ret == MBEDTLS_ERR_SSL_CLIENT_RECONNECT){
HIGH_MSG("SSL disconnect!");
Util::logExitReason("SSL client disconnected");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SSL client disconnected");
http.close();
break;
}

View file

@ -1529,7 +1529,7 @@ namespace Mist{
}else if (command["type"] == "set_speed") {
handleWebsocketSetSpeed(command);
}else if (command["type"] == "stop") {
Util::logExitReason("User requested stop");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "User requested stop");
myConn.close();
}else if (command["type"] == "play") {
parseData = true;

View file

@ -1190,7 +1190,7 @@ namespace Mist{
if (!newStream.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.c_str());
Util::logExitReason(
Util::logExitReason(ER_TRIGGER,
"Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.c_str());
onFinish();

View file

@ -388,7 +388,7 @@ namespace Mist{
if (!newStream.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), qUrl.getUrl().c_str());
Util::logExitReason(
Util::logExitReason(ER_TRIGGER,
"Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), qUrl.getUrl().c_str());
onFinish();

View file

@ -235,7 +235,7 @@ namespace Mist{
}else{
myConn.SendNow(tsData, len);
if (!myConn){
Util::logExitReason("connection closed by peer");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "connection closed by peer");
config->is_active = false;
}
}

View file

@ -167,7 +167,7 @@ namespace Mist{
if (!newStream.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.getUrl().c_str());
Util::logExitReason(
Util::logExitReason(ER_TRIGGER,
"Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.getUrl().c_str());
onFinish();
@ -390,14 +390,14 @@ void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
if (!sockCount){
INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, no connections");
Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1, no connections");
///\TODO Update for RIST
//server_socket.close();
Util::Config::is_active = false;
}else{
INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, after disconnect wait");
Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1, after disconnect wait");
}
}
@ -405,6 +405,7 @@ int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
Util::redirectLogsIfNeeded();
Util::Config conf(argv[0]);
Util::Config::binaryType = Util::OUTPUT;
mistOut::init(&conf);
if (conf.parseArgs(argc, argv)){
if (conf.getBool("json")){

View file

@ -104,7 +104,7 @@ namespace Mist{
if (!newStream.size()){
FAIL_MSG("Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.getUrl().c_str());
Util::logExitReason(
Util::logExitReason(ER_TRIGGER,
"Push from %s to URL %s rejected - PUSH_REWRITE trigger blanked the URL",
getConnectedHost().c_str(), reqUrl.getUrl().c_str());
onFinish();
@ -291,7 +291,7 @@ namespace Mist{
srtConn.connect(target.host, target.getPort(), "output", targetParams);
if (!srtConn){Util::sleep(500);}
}else{
Util::logExitReason("SRT connection closed");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed");
myConn.close();
parseData = false;
return;
@ -301,7 +301,7 @@ namespace Mist{
srtConn.SendNow(packetBuffer, packetBuffer.size());
if (!srtConn){
if (!config->getString("target").size()){
Util::logExitReason("SRT connection closed");
Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed");
myConn.close();
parseData = false;
}
@ -366,7 +366,7 @@ namespace Mist{
}
bool OutTSSRT::dropPushTrack(uint32_t trackId, const std::string & dropReason){
Util::logExitReason("track dropped by buffer");
Util::logExitReason(ER_SHM_LOST, "track dropped by buffer");
myConn.close();
srtConn.close();
return Output::dropPushTrack(trackId, dropReason);
@ -401,13 +401,13 @@ void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){
if (!sockCount){
INFO_MSG("USR1 received - triggering rolling restart (no connections active)");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, no connections");
Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1, no connections");
server_socket.close();
Util::Config::is_active = false;
}else{
INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero");
Util::Config::is_restarting = true;
Util::logExitReason("signal USR1, after disconnect wait");
Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1, after disconnect wait");
}
}
@ -437,6 +437,7 @@ int main(int argc, char *argv[]){
DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN;
Util::redirectLogsIfNeeded();
Util::Config conf(argv[0]);
Util::Config::binaryType = Util::OUTPUT;
mistOut::init(&conf);
if (conf.parseArgs(argc, argv)){
if (conf.getBool("json")){

View file

@ -309,7 +309,7 @@ namespace Mist{
if (!parseData){udp.sendPaced(10000);}
//After 10s of no packets, abort
if (Util::bootMS() > lastRecv + 10000){
Util::logExitReason("received no data for 10+ seconds");
Util::logExitReason(ER_CLEAN_INACTIVE, "received no data for 10+ seconds");
config->is_active = false;
}
return;