From 3ebfe1b6939ea6dd2e699b769bba976bfd0432ac Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 4 May 2012 14:28:01 +0200 Subject: [PATCH] Fixed bugs for debug session may 4 2012. --- Buffer/main.cpp | 20 +++++++--------- Buffer/stream.cpp | 5 ++-- Buffer/stream.h | 2 +- Connector_RTMP/main.cpp | 48 +++++++++++++++++++------------------- tools/RTMP_Parser/Makefile | 2 +- tools/RTMP_Parser/main.cpp | 7 +++--- util/dtsc.cpp | 2 ++ util/socket.cpp | 12 ++++++++++ util/socket.h | 1 + 9 files changed, 56 insertions(+), 43 deletions(-) diff --git a/Buffer/main.cpp b/Buffer/main.cpp index eb60ec18..050c0373 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -126,8 +126,10 @@ namespace Buffer{ lastPacketTime = now; prevPacketTime = currPacketTime; currPacketTime = thisStream->getStream()->getTime(); + thisStream->dropWriteLock(true); + }else{ + thisStream->dropWriteLock(false); } - thisStream->dropWriteLock(); }else{ if (((currPacketTime - prevPacketTime) - (now - lastPacketTime)) > 999){ usleep(999000); @@ -151,8 +153,10 @@ namespace Buffer{ thisStream->getWriteLock(); if (thisStream->getStream()->parsePacket(inBuffer)){ thisStream->getStream()->outPacket(0); + thisStream->dropWriteLock(true); + }else{ + thisStream->dropWriteLock(false); } - thisStream->dropWriteLock(); } }else{ usleep(1000000); @@ -177,27 +181,19 @@ namespace Buffer{ return 1; } std::string name = argv[1]; - bool ip_waiting = false; - std::string waiting_ip; - if (argc >= 4){ - waiting_ip += argv[2]; - ip_waiting = true; - } SS = Socket::makeStream(name); thisStream = Stream::get(); thisStream->setName(name); - if (ip_waiting){ - thisStream->setWaitingIP(waiting_ip); - } Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); tthread::thread StatsThread = tthread::thread(handleStats, 0); tthread::thread * StdinThread = 0; - if (!ip_waiting){ + if (argc < 3){ StdinThread = new tthread::thread(handleStdin, 0); }else{ + thisStream->setWaitingIP(argv[2]); StdinThread = new tthread::thread(handlePushin, 0); } diff --git a/Buffer/stream.cpp b/Buffer/stream.cpp index 7f1a282a..571a7fba 100644 --- a/Buffer/stream.cpp +++ b/Buffer/stream.cpp @@ -87,6 +87,7 @@ bool Buffer::Stream::checkWaitingIP(std::string ip){ if (ip == waiting_ip || ip == "::ffff:"+waiting_ip){ return true; }else{ + std::cout << ip << " != " << waiting_ip << std::endl; return false; } } @@ -165,12 +166,12 @@ void Buffer::Stream::getWriteLock(){ } /// Drops a previously gotten write lock. -void Buffer::Stream::dropWriteLock(){ +void Buffer::Stream::dropWriteLock(bool newpackets_available){ rw_mutex.lock(); writers--; rw_mutex.unlock(); rw_change.notify_all(); - moreData.notify_all(); + if (newpackets_available){moreData.notify_all();} } /// Blocks until reading is safe. diff --git a/Buffer/stream.h b/Buffer/stream.h index e8153db0..a4fa0fc4 100644 --- a/Buffer/stream.h +++ b/Buffer/stream.h @@ -34,7 +34,7 @@ namespace Buffer{ /// Blocks until writing is safe. void getWriteLock(); /// Drops a previously gotten write lock. - void dropWriteLock(); + void dropWriteLock(bool newpackets_available); /// Blocks until reading is safe. void getReadLock(); /// Drops a previously gotten read lock. diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index 20c860db..649ec02c 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -45,10 +45,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ //first timestamp set RTMPStream::firsttime = RTMPStream::getNowMS(); - while (Socket.connected() && (RTMPStream::handshake_in.size() < 1537)){ - Socket.read(RTMPStream::handshake_in); - } + RTMPStream::handshake_in.reserve(1537); + Socket.read((char*)RTMPStream::handshake_in.c_str(), 1537); RTMPStream::rec_cnt += 1537; + if (RTMPStream::doHandshake()){ Socket.write(RTMPStream::handshake_out); Socket.read((char*)RTMPStream::handshake_in.c_str(), 1536); @@ -64,9 +64,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } unsigned int lastStats = 0; + conn.setBlocking(false); while (Socket.connected()){ - sleep(10000);//sleep 10ms to prevent high CPU usage + usleep(10000);//sleep 10ms to prevent high CPU usage if (Socket.spool()){ parseChunk(Socket.Received()); } @@ -309,9 +310,11 @@ void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int st }//sendCommand void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id){ - bool parsed = false; #if DEBUG >= 4 - amfdata.Print(); + fprintf(stderr, "Received command: %s\n", amfdata.Print().c_str()); + #endif + #if DEBUG >= 3 + fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str()); #endif if (amfdata.getContentP(0)->StrValue() == "connect"){ double objencoding = 0; @@ -358,12 +361,12 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int #endif sendCommand(amfreply, messagetype, stream_id); //send onBWDone packet - no clue what it is, but real server sends it... - amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); - amfreply.addContent(AMF::Object("", "onBWDone"));//result - amfreply.addContent(amfdata.getContent(1));//same transaction ID - amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - sendCommand(amfreply, messagetype, stream_id); - parsed = true; + //amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); + //amfreply.addContent(AMF::Object("", "onBWDone"));//result + //amfreply.addContent(amfdata.getContent(1));//same transaction ID + //amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null + //sendCommand(amfreply, messagetype, stream_id); + return; }//connect if (amfdata.getContentP(0)->StrValue() == "createStream"){ //send a _result reply @@ -377,10 +380,11 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int #endif sendCommand(amfreply, messagetype, stream_id); Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 - parsed = true; + return; }//createStream if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ if (SS.connected()){SS.close();} + return; } if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ //send a _result reply @@ -393,7 +397,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.Print(); #endif sendCommand(amfreply, messagetype, stream_id); - parsed = true; + return; }//getStreamLength if ((amfdata.getContentP(0)->StrValue() == "publish")){ if (amfdata.getContentP(3)){ @@ -437,7 +441,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.Print(); #endif sendCommand(amfreply, messagetype, stream_id); - parsed = true; + return; }//getStreamLength if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){ //send a _result reply @@ -450,7 +454,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.Print(); #endif sendCommand(amfreply, messagetype, stream_id); - parsed = true; + return; }//checkBandwidth if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ //send streambegin @@ -488,16 +492,12 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int RTMPStream::chunk_snd_max = 102400;//100KiB Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) Connector_RTMP::ready4data = true;//start sending video data! - parsed = true; + return; }//createStream - #if DEBUG >= 3 - fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str()); + + #if DEBUG >= 2 + fprintf(stderr, "AMF0 command not processed! :(\n"); #endif - if (!parsed){ - #if DEBUG >= 2 - fprintf(stderr, "AMF0 command not processed! :(\n"); - #endif - } }//parseAMFCommand diff --git a/tools/RTMP_Parser/Makefile b/tools/RTMP_Parser/Makefile index 07b148b5..d1303ddf 100644 --- a/tools/RTMP_Parser/Makefile +++ b/tools/RTMP_Parser/Makefile @@ -1,4 +1,4 @@ -SRC = main.cpp ../../util/amf.cpp ../../util/rtmpchunks.cpp ../../util/crypto.cpp ../../util/flv_tag.cpp ../../util/socket.cpp +SRC = main.cpp ../../util/amf.cpp ../../util/rtmpchunks.cpp ../../util/crypto.cpp ../../util/flv_tag.cpp ../../util/socket.cpp ../../util/dtsc.cpp OBJ = $(SRC:.cpp=.o) OUT = RTMP_Parser INCLUDES = diff --git a/tools/RTMP_Parser/main.cpp b/tools/RTMP_Parser/main.cpp index 98aca820..40e2c6e2 100644 --- a/tools/RTMP_Parser/main.cpp +++ b/tools/RTMP_Parser/main.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "../../util/flv_tag.h" #include "../../util/amf.h" #include "../../util/rtmpchunks.h" @@ -149,10 +150,10 @@ int main(int argc, char ** argv){ next.data = next.data.substr(1); if (soort == 0){ amfdata = AMF::parse(next.data); - amfdata.Print(); + std::cerr << amfdata.Print() << std::endl; }else{ amf3data = AMF::parse3(next.data); - amf3data.Print(); + std::cerr << amf3data.Print() << std::endl; } } break; case 18:{ @@ -170,7 +171,7 @@ int main(int argc, char ** argv){ case 20:{//AMF0 command message fprintf(stderr, "Received AFM0 command message:\n"); amfdata = AMF::parse(next.data); - amfdata.Print(); + std::cerr << amfdata.Print() << std::endl; } break; case 22: fprintf(stderr, "Received aggregate message\n"); diff --git a/util/dtsc.cpp b/util/dtsc.cpp index 26601a5d..9e937d8d 100644 --- a/util/dtsc.cpp +++ b/util/dtsc.cpp @@ -102,6 +102,8 @@ bool DTSC::Stream::hasAudio(){ /// Returns a packed DTSC packet, ready to sent over the network. std::string & DTSC::Stream::outPacket(unsigned int num){ + static std::string emptystring; + if (num >= buffers.size()) return emptystring; buffers[num].Pack(true); return buffers[num].packed; } diff --git a/util/socket.cpp b/util/socket.cpp index 4ddcfb81..82c52dce 100644 --- a/util/socket.cpp +++ b/util/socket.cpp @@ -40,6 +40,18 @@ Socket::Connection::Connection(){ Blocking = false; }//Socket::Connection basic constructor + +/// Set this socket to be blocking (true) or nonblocking (false). +void Socket::Connection::setBlocking(bool blocking){ + int flags = fcntl(sock, F_GETFL, 0); + if (!blocking){ + flags |= O_NONBLOCK; + }else{ + flags &= !O_NONBLOCK; + } + fcntl(sock, F_SETFL, flags); +} + /// Close connection. The internal socket is closed and then set to -1. void Socket::Connection::close(){ #if DEBUG >= 6 diff --git a/util/socket.h b/util/socket.h index 503ec4e9..03750b2a 100644 --- a/util/socket.h +++ b/util/socket.h @@ -33,6 +33,7 @@ namespace Socket{ Connection(int sockNo); ///< Create a new base socket. Connection(std::string hostname, int port, bool nonblock); ///< Create a new TCP socket. Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. + void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). bool canRead(); ///< Calls poll() on the socket, checking if data is available. bool canWrite(); ///< Calls poll() on the socket, checking if data can be written. signed int ready(); ///< Returns the ready-state for this socket.