Fix RTMP not shutting down on playback.

This commit is contained in:
Thulinma 2016-07-21 11:39:31 +02:00
parent 33a0b2fae3
commit e1fac6248e
3 changed files with 42 additions and 5 deletions

View file

@ -657,6 +657,7 @@ namespace Mist {
} }
stats(); stats();
} }
onFinish();
MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data"); MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data");
stats(true); stats(true);
@ -941,11 +942,13 @@ namespace Mist {
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
if (!nProxy.userClient.getData()){ if (!nProxy.userClient.getData()){
WARN_MSG("Player connection failure - aborting output"); WARN_MSG("Player connection failure - aborting output");
onFinish();
myConn.close(); myConn.close();
return; return;
} }
} }
if (!nProxy.userClient.isAlive()){ if (!nProxy.userClient.isAlive()){
onFinish();
myConn.close(); myConn.close();
return; return;
} }

View file

@ -51,6 +51,38 @@ namespace Mist {
} }
} }
bool OutRTMP::onFinish(){
MEDIUM_MSG("Finishing stream %s, %s", streamName.c_str(), myConn?"while connected":"already disconnected");
if (myConn){
myConn.SendNow(RTMPStream::SendUSR(1, 1)); //send UCM StreamEOF (1), stream 1
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus")); //status reply
amfreply.addContent(AMF::Object("", (double)0)); //transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
amfreply.addContent(AMF::Object("")); //info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Stop"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream stopped"));
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfreply, 20, 1);
amfreply = AMF::Object ("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus")); //status reply
amfreply.addContent(AMF::Object("", (double)0)); //transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
amfreply.addContent(AMF::Object("")); //info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.UnpublishNotify"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream stopped"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfreply, 20, 1);
myConn.close();
}
return false;
}
void OutRTMP::parseVars(std::string data){ void OutRTMP::parseVars(std::string data){
std::string varname; std::string varname;
std::string varval; std::string varval;
@ -445,6 +477,7 @@ namespace Mist {
} //createStream } //createStream
if ((amfData.getContentP(0)->StrValue() == "closeStream") || (amfData.getContentP(0)->StrValue() == "deleteStream")) { if ((amfData.getContentP(0)->StrValue() == "closeStream") || (amfData.getContentP(0)->StrValue() == "deleteStream")) {
stop(); stop();
onFinish();
return; return;
} }
if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")) { if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")) {
@ -524,20 +557,20 @@ namespace Mist {
if (streamCfg){ if (streamCfg){
if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){ if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str()); FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str());
myConn.close(); onFinish();
}else{ }else{
std::string source = streamCfg.getMember("source").asString().substr(7); std::string source = streamCfg.getMember("source").asString().substr(7);
std::string IP = source.substr(0, source.find('@')); std::string IP = source.substr(0, source.find('@'));
if (IP != ""){ if (IP != ""){
if (!myConn.isAddress(IP)){ if (!myConn.isAddress(IP)){
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
myConn.close(); onFinish();
} }
} }
} }
}else{ }else{
FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str()); FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str());
myConn.close(); onFinish();
} }
configLock.post(); configLock.post();
configLock.close(); configLock.close();
@ -777,7 +810,7 @@ namespace Mist {
inputBuffer.get().clear(); inputBuffer.get().clear();
} }
stop(); stop();
myConn.close(); onFinish();
break; //happens when connection breaks unexpectedly break; //happens when connection breaks unexpectedly
case 1: //set chunk size case 1: //set chunk size
RTMPStream::chunk_rec_max = ntohl(*(int *)next.data.c_str()); RTMPStream::chunk_rec_max = ntohl(*(int *)next.data.c_str());
@ -850,7 +883,7 @@ namespace Mist {
static std::map<unsigned int, AMF::Object> pushMeta; static std::map<unsigned int, AMF::Object> pushMeta;
if (!isInitialized) { if (!isInitialized) {
MEDIUM_MSG("Received useless media data"); MEDIUM_MSG("Received useless media data");
myConn.close(); onFinish();
break; break;
} }
F.ChunkLoader(next); F.ChunkLoader(next);

View file

@ -14,6 +14,7 @@ namespace Mist {
void sendNext(); void sendNext();
void sendHeader(); void sendHeader();
bool isReadyForPlay(); bool isReadyForPlay();
bool onFinish();
protected: protected:
bool isPushing; bool isPushing;
void parseVars(std::string data); void parseVars(std::string data);