From f9de7f9a64456030e9a12bd4dc7cf1b314bdbd37 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 15 Apr 2012 01:54:39 +0200 Subject: [PATCH] RTMP Connector upgrade to DTSC - push mode doesn't convert to DTSC yet (it will tomorrow) and it is still untested - but should work. --- Connector_HTTP/Analysis_BuckBunny_Fifa | 48 -- Connector_RTMP/main.cpp | 715 +++++++++---------------- util/socket.cpp | 5 +- util/socket.h | 2 +- 4 files changed, 254 insertions(+), 516 deletions(-) delete mode 100644 Connector_HTTP/Analysis_BuckBunny_Fifa diff --git a/Connector_HTTP/Analysis_BuckBunny_Fifa b/Connector_HTTP/Analysis_BuckBunny_Fifa deleted file mode 100644 index 45cb2de6..00000000 --- a/Connector_HTTP/Analysis_BuckBunny_Fifa +++ /dev/null @@ -1,48 +0,0 @@ - [BuckBunny Entry1] - -trackinfo - -timescale -length -language -sampledescription -sampletype - -timescale -length -language -sampledescription -sampletype - -audiochannels -audiosamplerate -videoframerate -aacaot -avclevel -avcprofile -audiocodecid -videocodecid -width -height -frameWidth -frameHeight -displayWidth -displayHeight -moovposition -duration - - - [FIFA Entry] - -duration -width -height -videodatarate -framerate -videocodecid -audiodatarate -audiosamplerate -audiosamplesize -stero -audiocodecid -filesize \ No newline at end of file diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 28a9622b..d6eb2769 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -27,7 +27,9 @@ namespace Connector_RTMP{ Socket::Connection Socket; ///< Socket connected to user Socket::Connection SS; ///< Socket connected to server std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened - void parseChunk(); + void parseChunk(std::string & buffer);///< Parses a single RTMP chunk. + void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id);///< Sends a RTMP command either in AMF or AMF3 mode. + void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id);///< Parses a single AMF command message. int Connector_RTMP(Socket::Connection conn); };//Connector_RTMP namespace; @@ -35,8 +37,9 @@ namespace Connector_RTMP{ /// Main Connector_RTMP function int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket = conn; - FLV::Tag tag, viddata, auddata; - bool viddone = false, auddone = false; + FLV::Tag tag, init_tag; + DTSC::Stream Strm; + bool stream_inited = false;//true if init data for audio/video was sent //first timestamp set RTMPStream::firsttime = RTMPStream::getNowMS(); @@ -61,15 +64,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ unsigned int lastStats = 0; - while (Socket.connected() && !FLV::Parse_Error){ - //only parse input if available or not yet init'ed - //rightnow = getNowMS(); - if (Socket.canRead() || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size) - switch (Socket.ready()){ - case -1: break; //disconnected - case 0: break; //not ready yet - default: parseChunk(); break; //new data is waiting - } + while (Socket.connected()){ + sleep(10000);//sleep 10ms to prevent high CPU usage + if (Socket.spool()){ + parseChunk(Socket.Received()); } if (ready4data){ if (!inited){ @@ -95,54 +93,27 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ SS.write(stat); } } - SS.canRead(); - switch (SS.ready()){ - case -1: - #if DEBUG >= 1 - fprintf(stderr, "Source socket is disconnected.\n"); - #endif - Socket.close();//disconnect user - break; - case 0: break;//not ready yet - default: - bool justdone = false; - if (tag.SockLoader(SS)){//able to read a full packet? - //init data? parse and resent in correct order if all is received - /// \todo Check metadata for needed audio/video init or not - we now assume both video/audio are always present... - if (((tag.data[0] == 0x09) && !viddone) || ((tag.data[0] == 0x08) && !auddone)){ - if (tag.needsInitData()){ - if (tag.data[0] == 0x09){viddata = tag;}else{auddata = tag;} - } - if (tag.data[0] == 0x09){viddone = true;}else{auddone = true;} - justdone = true; + if (SS.spool()){ + if (Strm.parsePacket(SS.Received())){ + //sent init data if needed + if (!stream_inited){ + if (Strm.metadata.getContentP("audio") && Strm.metadata.getContentP("audio")->getContentP("init")){ + init_tag.DTSCAudioInit(Strm); + Socket.write(RTMPStream::SendMedia(init_tag)); } - if (viddone && auddone && justdone){ - if (viddata.len != 0){ - Socket.write(RTMPStream::SendMedia(viddata)); - #if DEBUG >= 8 - fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), viddata.tagTime(), viddata.tagType().c_str()); - #endif - } - if (auddata.len != 0){ - Socket.write(RTMPStream::SendMedia(auddata)); - #if DEBUG >= 8 - fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), auddata.tagTime(), auddata.tagType().c_str()); - #endif - } - break; + if (Strm.metadata.getContentP("video") && Strm.metadata.getContentP("video")->getContentP("init")){ + init_tag.DTSCVideoInit(Strm); + Socket.write(RTMPStream::SendMedia(init_tag)); } - //not gotten init yet? cancel this tag - if (tag.needsInitData()){ - if ((tag.data[0] == 0x09) && (viddata.len == 0)){break;} - if ((tag.data[0] == 0x08) && (auddata.len == 0)){break;} - } - //send tag normally - Socket.write(RTMPStream::SendMedia(tag)); - #if DEBUG >= 8 - fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str()); - #endif + stream_inited = true; } - break; + //sent a tag + tag.DTSCLoader(Strm); + Socket.write(RTMPStream::SendMedia(tag)); + #if DEBUG >= 8 + fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str()); + #endif + } } } } @@ -165,15 +136,13 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ }//Connector_RTMP /// Tries to get and parse one RTMP chunk at a time. -void Connector_RTMP::parseChunk(){ +void Connector_RTMP::parseChunk(std::string & inbuffer){ static RTMPStream::Chunk next; - static std::string inbuffer; FLV::Tag F; static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER); static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER); static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER); - if (!Connector_RTMP::Socket.read(inbuffer)){return;} //try to get more data while (next.Parse(inbuffer)){ @@ -246,6 +215,7 @@ void Connector_RTMP::parseChunk(){ #if DEBUG >= 4 fprintf(stderr, "A"); #endif + /// \TODO Convert to DTSC properly. SS.write(std::string(F.data, F.len)); }else{ #if DEBUG >= 4 @@ -260,6 +230,7 @@ void Connector_RTMP::parseChunk(){ #if DEBUG >= 4 fprintf(stderr, "V"); #endif + /// \TODO Convert to DTSC properly. SS.write(std::string(F.data, F.len)); }else{ #if DEBUG >= 4 @@ -279,7 +250,6 @@ void Connector_RTMP::parseChunk(){ #endif break; case 17:{ - bool parsed3 = false; #if DEBUG >= 4 fprintf(stderr, "Received AFM3 command message\n"); #endif @@ -295,205 +265,7 @@ void Connector_RTMP::parseChunk(){ #endif next.data = next.data.substr(1); amfdata = AMF::parse(next.data); - #if DEBUG >= 4 - amfdata.Print(); - #endif - if (amfdata.getContentP(0)->StrValue() == "connect"){ - double objencoding = 0; - if (amfdata.getContentP(2)->getContentP("objectEncoding")){ - objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue(); - } - fprintf(stderr, "Object encoding set to %e\n", objencoding); - #if DEBUG >= 4 - int tmpint; - tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} - if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} - tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} - if (tmpint & 0x400){fprintf(stderr, "AAC audio support detected\n");} - #endif - Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object(""));//server properties - amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123")); - amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); - //amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); - amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); - //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); - //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - //send onBWDone packet - no clue what it is, but real server sends it... - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onBWDone"));//result - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - parsed3 = true; - }//connect - if (amfdata.getContentP(0)->StrValue() == "createStream"){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1 - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - parsed3 = true; - }//createStream - if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ - if (SS.connected()){SS.close();} - } - if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", (double)0));//zero length - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - parsed3 = true; - }//getStreamLength - if ((amfdata.getContentP(0)->StrValue() == "publish")){ - if (amfdata.getContentP(3)){ - streamname = amfdata.getContentP(3)->StrValue(); - for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){ - if (*i == '?'){streamname.erase(i, streamname.end()); break;} - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ - streamname.erase(i); - --i; - }else{ - *i=tolower(*i); - } - } - streamname = "/tmp/shared_socket_" + streamname; - #if DEBUG >= 4 - fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str()); - #endif - SS = Socket::Connection(streamname); - if (!SS.connected()){ - #if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); - #endif - Socket.close();//disconnect user - break; - } - SS.write("P "+Socket.getHost()+'\n'); - nostats = true; - #if DEBUG >= 4 - fprintf(stderr, "Connected to buffer, starting to sent data...\n"); - #endif - } - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a status reply - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - parsed3 = true; - }//getStreamLength - if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 17, 1, (char)0+amfreply.Pack())); - parsed3 = true; - }//checkBandwidth - if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ - //send streambegin - streamname = amfdata.getContentP(3)->StrValue(); - for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);} - } - streamname = "/tmp/shared_socket_" + streamname; - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a status reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(4, 17, next.msg_stream_id, (char)0+amfreply.Pack())); - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(4, 17, 1, (char)0+amfreply.Pack())); - RTMPStream::chunk_snd_max = 102400;//100KiB - Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) - Connector_RTMP::ready4data = true;//start sending video data! - parsed3 = true; - }//createStream - #if DEBUG >= 3 - fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str()); - #endif - if (!parsed3){ - #if DEBUG >= 2 - fprintf(stderr, "AMF0 command not processed! :(\n"); - #endif - } + parseAMFCommand(amfdata, 17, next.msg_stream_id); }//parsing AMF0-style } break; case 18: @@ -511,212 +283,8 @@ void Connector_RTMP::parseChunk(){ #endif break; case 20:{//AMF0 command message - bool parsed = false; amfdata = AMF::parse(next.data); - #if DEBUG >= 4 - amfdata.Print(); - #endif - if (amfdata.getContentP(0)->StrValue() == "connect"){ - double objencoding = 0; - if (amfdata.getContentP(2)->getContentP("objectEncoding")){ - objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue(); - } - fprintf(stderr, "Object encoding set to %e\n", objencoding); - #if DEBUG >= 4 - int tmpint; - if (amfdata.getContentP(2)->getContentP("videoCodecs")){ - tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} - if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} - } - if (amfdata.getContentP(2)->getContentP("audioCodecs")){ - tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); - if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} - if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");} - } - #endif - RTMPStream::chunk_snd_max = 4096; - Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) - Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6) - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object(""));//server properties - amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123")); - amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); - //amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); - amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); - //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); - //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); - //send onBWDone packet - no clue what it is, but real server sends it... - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onBWDone"));//result - amfreply.addContent(AMF::Object("", (double)0));//zero - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); - parsed = true; - }//connect - if (amfdata.getContentP(0)->StrValue() == "createStream"){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1 - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - parsed = true; - }//createStream - if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ - if (SS.connected()){SS.close();} - } - if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", (double)0));//zero length - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); - parsed = true; - }//getStreamLength - if ((amfdata.getContentP(0)->StrValue() == "publish")){ - if (amfdata.getContentP(3)){ - streamname = amfdata.getContentP(3)->StrValue(); - for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){ - if (*i == '?'){streamname.erase(i, streamname.end()); break;} - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ - streamname.erase(i); - --i; - }else{ - *i=tolower(*i); - } - } - streamname = "/tmp/shared_socket_" + streamname; - #if DEBUG >= 4 - fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str()); - #endif - SS = Socket::Connection(streamname); - if (!SS.connected()){ - #if DEBUG >= 1 - fprintf(stderr, "Could not connect to server!\n"); - #endif - Socket.close();//disconnect user - break; - } - SS.write("P "+Socket.getHost()+'\n'); - nostats = true; - #if DEBUG >= 4 - fprintf(stderr, "Connected to buffer, starting to send data...\n"); - #endif - } - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack())); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a status reply - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack())); - parsed = true; - }//getStreamLength - if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ - //send a _result reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "_result"));//result success - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(3, 20, 1, amfreply.Pack())); - parsed = true; - }//checkBandwidth - if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ - //send streambegin - streamname = amfdata.getContentP(3)->StrValue(); - for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ - if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);} - } - streamname = "/tmp/shared_socket_" + streamname; - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - //send a status reply - AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); - amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack())); - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onStatus"));//status reply - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info - amfreply.addContent(AMF::Object(""));//info - amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); - amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); - amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); - amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - #if DEBUG >= 4 - amfreply.Print(); - #endif - Socket.write(RTMPStream::SendChunk(4, 20, 1, amfreply.Pack())); - RTMPStream::chunk_snd_max = 102400;//100KiB; - Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) - Connector_RTMP::ready4data = true;//start sending video data! - parsed = true; - }//createStream - #if DEBUG >= 3 - fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str()); - #endif - if (!parsed){ - #if DEBUG >= 2 - fprintf(stderr, "AMF0 command not processed! :(\n"); - #endif - } + parseAMFCommand(amfdata, 20, next.msg_stream_id); } break; case 22: #if DEBUG >= 4 @@ -733,6 +301,223 @@ void Connector_RTMP::parseChunk(){ } }//parseChunk +void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){ + if (messagetype == 17){ + Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack())); + }else{ + Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack())); + } +}//sendCommand + +void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id){ + bool parsed = false; + #if DEBUG >= 4 + amfdata.Print(); + #endif + if (amfdata.getContentP(0)->StrValue() == "connect"){ + double objencoding = 0; + if (amfdata.getContentP(2)->getContentP("objectEncoding")){ + objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue(); + } + fprintf(stderr, "Object encoding set to %e\n", objencoding); + #if DEBUG >= 4 + int tmpint; + if (amfdata.getContentP(2)->getContentP("videoCodecs")){ + tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");} + if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");} + } + if (amfdata.getContentP(2)->getContentP("audioCodecs")){ + tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue(); + if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");} + if (tmpint & 0x400){fprintf(stderr, "AAC audio support detected\n");} + } + #endif + RTMPStream::chunk_snd_max = 4096; + Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) + Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6) + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object(""));//server properties + amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123")); + amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31)); + //amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1)); + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded.")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337)); + amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding)); + //amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY)); + //amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004")); + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + //send onBWDone packet - no clue what it is, but real server sends it... + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onBWDone"));//result + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null + sendCommand(amfreply, messagetype, stream_id); + parsed = true; + }//connect + if (amfdata.getContentP(0)->StrValue() == "createStream"){ + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1 + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + parsed = true; + }//createStream + if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ + if (SS.connected()){SS.close();} + } + if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", (double)0));//zero length + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + parsed = true; + }//getStreamLength + if ((amfdata.getContentP(0)->StrValue() == "publish")){ + if (amfdata.getContentP(3)){ + streamname = amfdata.getContentP(3)->StrValue(); + for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){ + if (*i == '?'){streamname.erase(i, streamname.end()); break;} + if (!isalpha(*i) && !isdigit(*i) && *i != '_'){ + streamname.erase(i); + --i; + }else{ + *i=tolower(*i); + } + } + streamname = "/tmp/shared_socket_" + streamname; + #if DEBUG >= 4 + fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str()); + #endif + SS = Socket::Connection(streamname); + if (!SS.connected()){ + #if DEBUG >= 1 + fprintf(stderr, "Could not connect to server!\n"); + #endif + Socket.close();//disconnect user + return; + } + SS.write("P "+Socket.getHost()+'\n'); + nostats = true; + #if DEBUG >= 4 + fprintf(stderr, "Connected to buffer, starting to send data...\n"); + #endif + } + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success? + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a status reply + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + parsed = true; + }//getStreamLength + if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ + //send a _result reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "_result"));//result success + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + parsed = true; + }//checkBandwidth + if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ + //send streambegin + streamname = amfdata.getContentP(3)->StrValue(); + for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){ + if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);} + } + streamname = "/tmp/shared_socket_" + streamname; + Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + //send a status reply + AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting...")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + amfreply.addContent(AMF::Object("", "onStatus"));//status reply + amfreply.addContent(amfdata.getContent(1));//same transaction ID + amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info + amfreply.addContent(AMF::Object(""));//info + amfreply.getContentP(3)->addContent(AMF::Object("level", "status")); + amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start")); + amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); + amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); + amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); + #if DEBUG >= 4 + amfreply.Print(); + #endif + sendCommand(amfreply, messagetype, stream_id); + RTMPStream::chunk_snd_max = 102400;//100KiB + Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) + Connector_RTMP::ready4data = true;//start sending video data! + parsed = true; + }//createStream + #if DEBUG >= 3 + fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str()); + #endif + if (!parsed){ + #if DEBUG >= 2 + fprintf(stderr, "AMF0 command not processed! :(\n"); + #endif + } +}//parseAMFCommand + // Load main server setup file, default port 1935, handler is Connector_RTMP::Connector_RTMP #define DEFAULT_PORT 1935 diff --git a/util/socket.cpp b/util/socket.cpp index ed669e75..e751b097 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -222,9 +222,10 @@ std::string Socket::Connection::getStats(std::string C){ } /// Updates the downbuffer and upbuffer internal variables. -void Socket::Connection::spool(){ - iread(downbuffer); +/// Returns true if new data was received, false otherwise. +bool Socket::Connection::spool(){ iwrite(upbuffer); + return iread(downbuffer); } /// Returns a reference to the download buffer. diff --git a/util/socket.h b/util/socket.h index 988b96c2..3a538286 100644 --- a/util/socket.h +++ b/util/socket.h @@ -48,7 +48,7 @@ namespace Socket{ bool swrite(std::string & buffer); ///< Write call that is compatible with std::string. bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string. bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string. - void spool(); ///< Updates the downbuffer and upbuffer internal variables. + bool spool(); ///< Updates the downbuffer and upbuffer internal variables. std::string & Received(); ///< Returns a reference to the download buffer. void Send(std::string data); ///< Appends data to the upbuffer. void close(); ///< Close connection.