diff --git a/src/buffer.cpp b/src/buffer.cpp index c3752700..27968fcc 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -39,7 +39,8 @@ namespace Buffer{ StatsSocket = Socket::Connection("/tmp/mist/statistics", true); } if (StatsSocket.connected()){ - StatsSocket.write(Stream::get()->getStats()+"\n\n"); + StatsSocket.Send(Stream::get()->getStats()+"\n\n"); + StatsSocket.flush(); } } StatsSocket.close(); @@ -50,23 +51,18 @@ namespace Buffer{ std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl; usr->myRing = thisStream->getRing(); - if (!usr->S.write(thisStream->getHeader())){ - usr->Disconnect("failed to receive the header!"); - return; - } + usr->S.Send(thisStream->getHeader()); + usr->S.flush(); while (usr->S.connected()){ usleep(5000); //sleep 5ms - if (usr->S.canRead()){ - usr->inbuffer.clear(); - char charbuf; - while ((usr->S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){ - usr->inbuffer += charbuf; - } - if (usr->inbuffer != ""){ - if (usr->inbuffer[0] == 'P'){ - std::cout << "Push attempt from IP " << usr->inbuffer.substr(2) << std::endl; - if (thisStream->checkWaitingIP(usr->inbuffer.substr(2))){ + if (usr->S.spool() && usr->S.Received().find('\n') != std::string::npos){ + std::string cmd = usr->S.Received().substr(0, usr->S.Received().find('\n')); + usr->S.Received().erase(0, usr->S.Received().find('\n')+1); + if (cmd != ""){ + if (cmd[0] == 'P'){ + std::cout << "Push attempt from IP " << cmd.substr(2) << std::endl; + if (thisStream->checkWaitingIP(cmd.substr(2))){ if (thisStream->setInput(usr->S)){ std::cout << "Push accepted!" << std::endl; usr->S = Socket::Connection(-1); @@ -78,8 +74,8 @@ namespace Buffer{ usr->Disconnect("Push denied - invalid IP address!"); } } - if (usr->inbuffer[0] == 'S'){ - usr->tmpStats = Stats(usr->inbuffer.substr(2)); + if (cmd[0] == 'S'){ + usr->tmpStats = Stats(cmd.substr(2)); unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; if (secs < 1){secs = 1;} usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; @@ -140,21 +136,18 @@ namespace Buffer{ /// No changes to the speed are made. void handlePushin(void * empty){ if (empty != 0){return;} - std::string inBuffer; while (buffer_running){ if (thisStream->getIPInput().connected()){ - if (inBuffer.size() > 0){ + if (thisStream->getIPInput().spool()){ thisStream->getWriteLock(); - if (thisStream->getStream()->parsePacket(inBuffer)){ + if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ thisStream->getStream()->outPacket(0); thisStream->dropWriteLock(true); }else{ thisStream->dropWriteLock(false); - thisStream->getIPInput().iread(inBuffer); usleep(1000);//1ms wait } }else{ - thisStream->getIPInput().iread(inBuffer); usleep(1000);//1ms wait } }else{ @@ -199,7 +192,7 @@ namespace Buffer{ while (buffer_running && SS.connected() && conf.is_active){ //check for new connections, accept them if there are any //starts a thread for every accepted connection - incoming = SS.accept(false); + incoming = SS.accept(true); if (incoming.connected()){ user * usr_ptr = new user(incoming); thisStream->addUser(usr_ptr); diff --git a/src/buffer_user.cpp b/src/buffer_user.cpp index f67422f7..75f3a9e4 100644 --- a/src/buffer_user.cpp +++ b/src/buffer_user.cpp @@ -32,12 +32,14 @@ Buffer::user::~user(){ /// Disconnects the current user. Doesn't do anything if already disconnected. /// Prints "Disconnected user" to stdout if disconnect took place. void Buffer::user::Disconnect(std::string reason) { + Stream::get()->clearStats(MyStr, lastStats, reason); if (S.connected()){S.close();} if (Thread != 0){ - if (Thread->joinable()){Thread->join();} + if (Thread->joinable()){ + Thread->join(); + } Thread = 0; } - Stream::get()->clearStats(MyStr, lastStats, reason); }//Disconnect /// Tries to send the current buffer, returns true if success, false otherwise. diff --git a/src/conn_http.cpp b/src/conn_http.cpp index a97b6919..8d6ab11a 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -155,6 +155,7 @@ namespace Connector_HTTP{ //create a unique ID based on a hash of the user agent and host, followed by the stream name and connector std::string uid = md5(H.GetHeader("User-Agent")+conn->getHost())+"_"+H.GetVar("stream")+"_"+connector; H.SetHeader("X-UID", uid);//add the UID to the headers before copying + H.SetHeader("X-Origin", conn->getHost());//add the UID to the headers before copying std::string request = H.BuildRequest();//copy the request for later forwarding to the connector H.Clean(); diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index 20d50893..899ea77f 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -136,6 +136,7 @@ namespace Connector_HTTP{ #if DEBUG >= 4 std::cout << "Received request: " << HTTP_R.url << std::endl; #endif + conn.setHost(HTTP_R.GetHeader("X-Origin")); if (HTTP_R.url.find("f4m") == std::string::npos){ streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 ); @@ -249,6 +250,8 @@ namespace Connector_HTTP{ } } conn.close(); + ss.Send("S "+conn.getStats("HTTP_Dynamic")); + ss.flush(); ss.close(); #if DEBUG >= 1 if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index 68e09729..db281a21 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -42,6 +42,7 @@ namespace Connector_HTTP{ #if DEBUG >= 4 std::cout << "Received request: " << HTTP_R.url << std::endl; #endif + conn.setHost(HTTP_R.GetHeader("X-Origin")); //we assume the URL is the stream name with a 3 letter extension std::string extension = HTTP_R.url.substr(HTTP_R.url.size()-4); streamname = HTTP_R.url.substr(0, HTTP_R.url.size()-4);//strip the extension @@ -115,6 +116,8 @@ namespace Connector_HTTP{ } } conn.close(); + ss.Send("S "+conn.getStats("HTTP_Dynamic")); + ss.flush(); ss.close(); #if DEBUG >= 1 if (FLV::Parse_Error){fprintf(stderr, "FLV Parser Error: %s\n", FLV::Error_Str.c_str());} diff --git a/src/conn_raw.cpp b/src/conn_raw.cpp index 1eec8857..5061c581 100644 --- a/src/conn_raw.cpp +++ b/src/conn_raw.cpp @@ -2,6 +2,7 @@ /// Contains the main code for the RAW connector. #include +#include #include #include @@ -13,17 +14,34 @@ int main(int argc, char ** argv) { conf.addOption("stream_name", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the stream to write to stdout.\"}")); conf.parseArgs(argc, argv); - std::string input = "/tmp/shared_socket_" + conf.getString("stream_name"); //connect to the proper stream - Socket::Connection S(input); + Socket::Connection S = Socket::getStream(conf.getString("stream_name")); + S.setBlocking(false); if (!S.connected()){ std::cout << "Could not open stream " << conf.getString("stream_name") << std::endl; return 1; } - //transport ~50kb at a time - //this is a nice tradeoff between CPU usage and speed - const char buffer[50000] = {0}; - while(std::cout.good() && S.read(buffer,50000)){std::cout.write(buffer,50000);} + unsigned int lastStats = 0; + unsigned int started = time(0); + while(std::cout.good()){ + if (S.spool()){ + std::cout.write(S.Received().c_str(),S.Received().size()); + S.Received().clear(); + }else{ + usleep(10000);//sleep 10ms if no data + } + unsigned int now = time(0); + if (now != lastStats){ + lastStats = now; + std::stringstream st; + st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; + S.Send(st.str()); + } + } + std::stringstream st; + st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n"; + S.Send(st.str()); + S.flush(); S.close(); return 0; } diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index e9bc9fce..324a2a70 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -39,17 +39,20 @@ namespace Connector_RTMP{ /// Main Connector_RTMP function int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket = conn; + Socket.setBlocking(false); FLV::Tag tag, init_tag; DTSC::Stream Strm; bool stream_inited = false;//true if init data for audio/video was sent - RTMPStream::handshake_in.reserve(1537); - Socket.read((char*)RTMPStream::handshake_in.c_str(), 1537); + while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);} + RTMPStream::handshake_in = Socket.Received().substr(0, 1537); + Socket.Received().erase(0, 1537); RTMPStream::rec_cnt += 1537; if (RTMPStream::doHandshake()){ - Socket.write(RTMPStream::handshake_out); - Socket.read((char*)RTMPStream::handshake_in.c_str(), 1536); + Socket.Send(RTMPStream::handshake_out); + while (Socket.Received().size() < 1536 && Socket.connected()){Socket.spool(); usleep(5000);} + Socket.Received().erase(0, 1536); RTMPStream::rec_cnt += 1536; #if DEBUG >= 4 fprintf(stderr, "Handshake succcess!\n"); @@ -62,11 +65,12 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } unsigned int lastStats = 0; - conn.setBlocking(false); + bool firstrun = true; while (Socket.connected()){ usleep(10000);//sleep 10ms to prevent high CPU usage - if (Socket.spool()){ + if (Socket.spool() || firstrun){ + firstrun = false; parseChunk(Socket.Received()); } if (ready4data){ @@ -80,6 +84,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ Socket.close();//disconnect user break; } + SS.setBlocking(false); #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif @@ -89,8 +94,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ unsigned int now = time(0); if (now != lastStats){ lastStats = now; - std::string stat = "S "+Socket.getStats("RTMP"); - SS.write(stat); + SS.Send("S "+Socket.getStats("RTMP")); } } if (SS.spool()){ @@ -119,8 +123,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ } } } - SS.close(); Socket.close(); + SS.Send("S "+Socket.getStats("RTMP")); + SS.flush(); + SS.close(); #if DEBUG >= 1 if (FLV::Parse_Error){fprintf(stderr, "FLV Parse Error: %s\n", FLV::Error_Str.c_str());} fprintf(stderr, "User %i disconnected.\n", conn.getSocket()); @@ -157,7 +163,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ //send ACK if we received a whole window if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){ RTMPStream::rec_window_at = RTMPStream::rec_cnt; - Socket.write(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3) + Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3) } switch (next.msg_type_id){ @@ -207,7 +213,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ #endif RTMPStream::rec_window_size = ntohl(*(int*)next.data.c_str()); RTMPStream::rec_window_at = RTMPStream::rec_cnt; - Socket.write(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3) + Socket.Send(RTMPStream::SendCTL(3, RTMPStream::rec_cnt));//send ack (msg 3) break; case 6: #if DEBUG >= 4 @@ -215,7 +221,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ #endif //4 bytes window size, 1 byte limit type (ignored) RTMPStream::snd_window_size = ntohl(*(int*)next.data.c_str()); - Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) break; case 8://audio data case 9://video data @@ -228,15 +234,15 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ counter++; if (counter > 8){ sending = true; - SS.write(meta_out.toNetPacked()); - SS.write(prebuffer.str());//write buffer + SS.Send(meta_out.toNetPacked()); + SS.Send(prebuffer.str());//write buffer prebuffer.str("");//clear buffer - SS.write(pack_out.toNetPacked()); + SS.Send(pack_out.toNetPacked()); }else{ prebuffer << pack_out.toNetPacked(); } }else{ - SS.write(pack_out.toNetPacked()); + SS.Send(pack_out.toNetPacked()); } } }else{ @@ -301,9 +307,9 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){ void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){ if (messagetype == 17){ - Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack())); + Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack())); }else{ - Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack())); + Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack())); } }//sendCommand @@ -319,7 +325,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int if (amfdata.getContentP(2)->getContentP("objectEncoding")){ objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue(); } - fprintf(stderr, "Object encoding set to %e\n", objencoding); #if DEBUG >= 4 int tmpint; if (amfdata.getContentP(2)->getContentP("videoCodecs")){ @@ -334,10 +339,10 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int } #endif RTMPStream::chunk_snd_max = 4096; - Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) - Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) - Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6) - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) + Socket.Send(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) + Socket.Send(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6) + Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "_result"));//result success @@ -377,7 +382,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.Print(); #endif sendCommand(amfreply, messagetype, stream_id); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 return; }//createStream if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ @@ -408,7 +413,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int Socket.close();//disconnect user return; } - SS.write("P "+Socket.getHost()+'\n'); + SS.Send("P "+Socket.getHost()+'\n'); nostats = true; #if DEBUG >= 4 fprintf(stderr, "Connected to buffer, starting to send data...\n"); @@ -424,7 +429,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int amfreply.Print(); #endif sendCommand(amfreply, messagetype, stream_id); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a status reply amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "onStatus"));//status reply @@ -457,7 +462,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){ //send streambegin streamname = amfdata.getContentP(3)->StrValue(); - Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 + Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 //send a status reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); amfreply.addContent(AMF::Object("", "onStatus"));//status reply @@ -488,7 +493,7 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int #endif sendCommand(amfreply, messagetype, stream_id); RTMPStream::chunk_snd_max = 102400;//100KiB - Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) + Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1) Connector_RTMP::ready4data = true;//start sending video data! return; }//createStream