diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index ea15fc81..10554b94 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -27,6 +27,11 @@ namespace Connector_RTMP{ bool nostats = false; ///< Set to true if no stats should be sent anymore (push mode). bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled. + //for reply to play command + int play_trans = -1; + int play_streamid = -1; + int play_msgtype = -1; + Socket::Connection Socket; ///< Socket connected to user Socket::Connection SS; ///< Socket connected to server std::string streamname; ///< Stream that will be opened @@ -101,6 +106,42 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } if (SS.spool()){ while (Strm.parsePacket(SS.Received())){ + + if (play_trans != -1){ + //send streambegin + Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send streamisrecorded if stream, well, is recorded. + if (Strm.metadata.isMember("length") && Strm.metadata["length"].asInt() > 0){ + Socket.Send(RTMPStream::SendUSR(4, 2));//send UCM StreamIsRecorded (4), stream 1 + } + //send a status reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", (double)play_trans));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, play_msgtype, play_streamid); + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", (double)play_trans));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + sendCommand(amfreply, play_msgtype, play_streamid); + RTMPStream::chunk_snd_max = 102400;//100KiB + Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) + play_trans = -1; + } + //sent init data if needed if (!stream_inited){ init_tag.DTSCMetaInit(Strm); @@ -308,6 +349,9 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ }//parseChunk void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){ + #if DEBUG >= 4 + amfreply.Print(); + #endif if (messagetype == 17){ Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack())); }else{ @@ -361,9 +405,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); - #if DEBUG >= 4 - amfreply.Print(); - #endif sendCommand(amfreply, messagetype, stream_id); //send onBWDone packet - no clue what it is, but real server sends it... //amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); @@ -380,9 +421,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1 - #if DEBUG >= 4 - amfreply.Print(); - #endif sendCommand(amfreply, messagetype, stream_id); Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 return; @@ -398,9 +436,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info amfreply.addContent(AMF::Object("", (double)0));//zero length - #if DEBUG >= 4 - amfreply.Print(); - #endif sendCommand(amfreply, messagetype, stream_id); return; }//getStreamLength @@ -428,9 +463,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? - #if DEBUG >= 4 - amfreply.Print(); - #endif sendCommand(amfreply, messagetype, stream_id); Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a status reply @@ -443,9 +475,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif sendCommand(amfreply, messagetype, stream_id); return; }//getStreamLength @@ -456,51 +485,28 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - #if DEBUG >= 4 - amfreply.Print(); - #endif sendCommand(amfreply, messagetype, stream_id); return; }//checkBandwidth if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ - //send streambegin + //set reply number and stream name, actual reply is sent up in the SS.spool() handler + play_trans = amfdata.getContentP(1)->NumValue(); + play_msgtype = messagetype; + play_streamid = stream_id; streamname = amfdata.getContentP(3)->StrValue(); - Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a status reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - sendCommand(amfreply, messagetype, stream_id); - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - sendCommand(amfreply, messagetype, stream_id); - RTMPStream::chunk_snd_max = 102400;//100KiB - Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) Connector_RTMP::ready4data = true;//start sending video data! return; - }//createStream - + }//play + if ((amfdata.getContentP(0)->StrValue() == "seek")){ + //set reply number and stream name, actual reply is sent up in the SS.spool() handler + play_trans = amfdata.getContentP(1)->NumValue(); + play_msgtype = messagetype; + play_streamid = stream_id; + stream_inited = false; + SS.Send("seek " + JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString() + "\n"); + return; + }//seek + #if DEBUG >= 2 fprintf(stderr, "AMF0 command not processed! :(\n"); #endif