diff --git a/Buffer/main.cpp b/Buffer/main.cpp index 6476a778..a99969cb 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -11,7 +11,7 @@ #include #include #include -#include "../util/dtsc.h" //DTSC support +#include "../util/flv_tag.h" //FLV format parser #include "../util/socket.h" //Socket lib #include "../util/json/json.h" @@ -28,7 +28,11 @@ namespace Buffer{ } } - DTSC::Stream * Strm = 0; + ///holds FLV::Tag objects and their numbers + struct buffer{ + int number; + FLV::Tag FLV; + };//buffer /// Converts a stats line to up, down, host, connector and conntime values. class Stats{ @@ -72,7 +76,9 @@ namespace Buffer{ /// Keeps track of what buffer users are using and the connection status. class user{ public: - DTSC::Ring * myRing; ///< Ring of the buffer for this user. + int MyBuffer; ///< Index of currently used buffer. + int MyBuffer_num; ///< Number of currently used buffer. + int MyBuffer_len; ///< Length in bytes of currently used buffer. int MyNum; ///< User ID of this user. std::string MyStr; ///< User ID of this user as a string. int currsend; ///< Current amount of bytes sent. @@ -91,16 +97,11 @@ namespace Buffer{ std::stringstream st; st << MyNum; MyStr = st.str(); + gotproperaudio = false; curr_up = 0; curr_down = 0; - currsend = 0; - myRing = 0; std::cout << "User " << MyNum << " connected" << std::endl; }//constructor - /// Drops held DTSC::Ring class, if one is held. - ~user(){ - Strm->dropRing(myRing); - }//destructor /// Disconnects the current user. Doesn't do anything if already disconnected. /// Prints "Disconnected user" to stdout if disconnect took place. void Disconnect(std::string reason) { @@ -118,42 +119,70 @@ namespace Buffer{ }//Disconnect /// Tries to send the current buffer, returns true if success, false otherwise. /// Has a side effect of dropping the connection if send will never complete. - bool doSend(const char * ptr, int len){ - int r = S.iwrite(ptr+currsend, len-currsend); + bool doSend(){ + int r = S.iwrite((char*)lastpointer+currsend, MyBuffer_len-currsend); if (r <= 0){ if (errno == EWOULDBLOCK){return false;} Disconnect(S.getError()); return false; } currsend += r; - return (currsend == len); + return (currsend == MyBuffer_len); }//doSend /// Try to send data to this user. Disconnects if any problems occur. - void Send(){ - if (!myRing){return;}//no ring! + /// \param ringbuf Array of buffers (FLV:Tag with ID attached) + /// \param buffers Count of elements in ringbuf + void Send(buffer ** ringbuf, int buffers){ + /// \todo For MP3: gotproperaudio - if false, only send if first byte is 0xFF and set to true if (!S.connected()){return;}//cancel if not connected - if (myRing->waiting){return;}//still waiting for next buffer? - if (myRing->starved){ - //if corrupt data, warn and get new DTSC::Ring - std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl; - Strm->dropRing(myRing); - myRing = Strm->getRing(); + //still waiting for next buffer? check it + if (MyBuffer_num < 0){ + MyBuffer_num = ringbuf[MyBuffer]->number; + if (MyBuffer_num < 0){ + return; //still waiting? don't crash - wait longer. + }else{ + MyBuffer_len = ringbuf[MyBuffer]->FLV.len; + lastpointer = ringbuf[MyBuffer]->FLV.data; + } + } + + //do check for buffer resizes + if (lastpointer != ringbuf[MyBuffer]->FLV.data){ + Disconnect("Buffer resize at wrong time... had to disconnect"); + return; } - currsend = 0; //try to complete a send - if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){ + if (doSend()){ //switch to next buffer - if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. - myRing->b--; + if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ + //if corrupt data, warn and find keyframe + std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; + int nocrashcount = 0; + do{ + MyBuffer++; + nocrashcount++; + MyBuffer %= buffers; + }while(!ringbuf[MyBuffer]->FLV.isKeyframe && (nocrashcount < buffers)); + //if keyframe not available, try again later + if (nocrashcount >= buffers){ + std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl; + return; + } + }else{ + MyBuffer++; + MyBuffer %= buffers; + } + MyBuffer_num = -1; + lastpointer = 0; currsend = 0; }//completed a send }//send }; int user::UserCount = 0; - /// Starts a loop, waiting for connections to send data to. + /// Starts a loop, waiting for connections to send video data to. int Start(int argc, char ** argv) { //first make sure no segpipe signals will kill us struct sigaction new_action; @@ -163,27 +192,32 @@ namespace Buffer{ sigaction (SIGPIPE, &new_action, NULL); //then check and parse the commandline - if (argc < 2) { - std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl; + if (argc < 3) { + std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl; return 1; } std::string waiting_ip = ""; bool ip_waiting = false; Socket::Connection ip_input; if (argc >= 4){ - waiting_ip += argv[2]; + waiting_ip += argv[3]; ip_waiting = true; } std::string shared_socket = "/tmp/shared_socket_"; - shared_socket += argv[1]; + shared_socket += argv[2]; Socket::Server SS(shared_socket, true); - Strm = new DTSC::Stream(5); + FLV::Tag metadata; + FLV::Tag video_init; + FLV::Tag audio_init; + int buffers = atoi(argv[1]); + buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); std::vector users; std::vector::iterator usersIt; - std::string inBuffer; - char charBuffer[1024*10]; - unsigned int charCount; + for (int i = 0; i < buffers; ++i) ringbuf[i] = new buffer; + int current_buffer = 0; + int lastproper = 0;//last properly finished buffer number + unsigned int loopcount = 0; unsigned int stattimer = 0; Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); @@ -192,9 +226,13 @@ namespace Buffer{ Storage["log"] = Json::Value(Json::objectValue); Storage["curr"] = Json::Value(Json::objectValue); Storage["totals"] = Json::Value(Json::objectValue); + + unsigned char packtype; + bool gotVideoInfo = false; + bool gotAudioInfo = false; + bool gotData = false; - - while (!feof(stdin) || ip_waiting){ + while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){ usleep(1000); //sleep for 1 ms, to prevent 100% CPU time unsigned int now = time(0); if (now != stattimer){ @@ -211,9 +249,7 @@ namespace Buffer{ Storage["totals"]["up"] = tot_up; Storage["totals"]["count"] = tot_count; Storage["totals"]["now"] = now; - if( argc >= 4 ) { - Storage["totals"]["buffer"] = argv[2]; - } + Storage["totals"]["buffer"] = argv[2]; if (!StatsSocket.connected()){ StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); } @@ -223,22 +259,89 @@ namespace Buffer{ } } //invalidate the current buffer - if ( (!ip_waiting && std_input.canRead()) || (ip_waiting && ip_input.connected()) ){ - std::cin.read(charBuffer, 1024*10); - charCount = std::cin.gcount(); - inBuffer.append(charBuffer, charCount); - Strm->parsePacket(inBuffer); + ringbuf[current_buffer]->number = -1; + if ( + (!ip_waiting && + (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin) + ) || (ip_waiting && (ip_input.connected()) && + ringbuf[current_buffer]->FLV.SockLoader(ip_input) + ) + ){ + loopcount++; + packtype = ringbuf[current_buffer]->FLV.data[0]; + //store metadata, if available + if (packtype == 0x12){ + metadata = ringbuf[current_buffer]->FLV; + std::cout << "Received metadata!" << std::endl; + if (gotVideoInfo && gotAudioInfo){ + FLV::Parse_Error = true; + std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl; + } + gotVideoInfo = false; + gotAudioInfo = false; + } + //store video init data, if available + if (!gotVideoInfo && ringbuf[current_buffer]->FLV.isKeyframe){ + if ((ringbuf[current_buffer]->FLV.data[11] & 0x0f) == 7){//avc packet + if (ringbuf[current_buffer]->FLV.data[12] == 0){ + ringbuf[current_buffer]->FLV.tagTime(0);//timestamp to zero + video_init = ringbuf[current_buffer]->FLV; + gotVideoInfo = true; + std::cout << "Received video configuration!" << std::endl; + } + }else{gotVideoInfo = true;}//non-avc = no config... + } + //store audio init data, if available + if (!gotAudioInfo && (packtype == 0x08)){ + if (((ringbuf[current_buffer]->FLV.data[11] & 0xf0) >> 4) == 10){//aac packet + ringbuf[current_buffer]->FLV.tagTime(0);//timestamp to zero + audio_init = ringbuf[current_buffer]->FLV; + gotAudioInfo = true; + std::cout << "Received audio configuration!" << std::endl; + }else{gotAudioInfo = true;}//no aac = no config... + } + //on keyframe set possible start point + if (packtype == 0x09){ + if (((ringbuf[current_buffer]->FLV.data[11] & 0xf0) >> 4) == 1){ + lastproper = current_buffer; + } + } + if (loopcount > 5){gotData = true;} + //keep track of buffers + ringbuf[current_buffer]->number = loopcount; + current_buffer++; + current_buffer %= buffers; } //check for new connections, accept them if there are any incoming = SS.accept(true); if (incoming.connected()){ users.push_back(incoming); - //send the header - users.back().myRing = Strm->getRing(); - if (!users.back().S.write(Strm->outHeader())){ - /// \todo Do this more nicely? - users.back().Disconnect("failed to receive the header!"); + //send the FLV header + users.back().currsend = 0; + users.back().MyBuffer = lastproper; + users.back().MyBuffer_num = -1; + /// \todo Do this more nicely? + if (gotData){ + if (!users.back().S.write(FLV::Header, 13)){ + users.back().Disconnect("failed to receive the header!"); + }else{ + if (metadata.len > 0){ + if (!users.back().S.write(metadata.data, metadata.len)){ + users.back().Disconnect("failed to receive metadata!"); + } + } + if (audio_init.len > 0){ + if (!users.back().S.write(audio_init.data, audio_init.len)){ + users.back().Disconnect("failed to receive audio init!"); + } + } + if (video_init.len > 0){ + if (!users.back().S.write(video_init.data, video_init.len)){ + users.back().Disconnect("failed to receive video init!"); + } + } + } } } @@ -259,7 +362,7 @@ namespace Buffer{ if (tmp != ""){ if (tmp[0] == 'P'){ std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl; - if (tmp.substr(2) == waiting_ip){ + if (tmp.substr(2) == waiting_ip || tmp.substr(2) == "::ffff:"+waiting_ip){ if (!ip_input.connected()){ std::cout << "Push accepted!" << std::endl; ip_input = (*usersIt).S; @@ -269,7 +372,7 @@ namespace Buffer{ (*usersIt).Disconnect("Push denied - push already in progress!"); } }else{ - (*usersIt).Disconnect("Push denied - invalid IP address!"); + (*usersIt).Disconnect("Push denied - invalid IP address ("+waiting_ip+"!="+tmp.substr(2)+")!"); } } if (tmp[0] == 'S'){ @@ -288,15 +391,18 @@ namespace Buffer{ } } } - (*usersIt).Send(); + (*usersIt).Send(ringbuf, buffers); } } } }//main loop // disconnect listener - /// \todo Deal with EOF more nicely - doesn't send the end of the stream to all users! - std::cout << "Reached EOF of input" << std::endl; + if (FLV::Parse_Error){ + std::cout << "FLV parse error:" << FLV::Error_Str << std::endl; + }else{ + std::cout << "Reached EOF of input" << std::endl; + } SS.close(); while (users.size() > 0){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ @@ -304,7 +410,6 @@ namespace Buffer{ if (!(*usersIt).S.connected()){users.erase(usersIt);break;} } } - delete Strm; return 0; } diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index b55b563c..28a9622b 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -241,21 +241,31 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5) break; case 8: - #if DEBUG >= 4 - fprintf(stderr, "Received audio data\n"); - #endif F.ChunkLoader(next); if (SS.connected()){ + #if DEBUG >= 4 + fprintf(stderr, "A"); + #endif SS.write(std::string(F.data, F.len)); + }else{ + #if DEBUG >= 4 + fprintf(stderr, "Received useless audio data\n"); + #endif + Socket.close(); } break; case 9: - #if DEBUG >= 4 - fprintf(stderr, "Received video data\n"); - #endif F.ChunkLoader(next); if (SS.connected()){ + #if DEBUG >= 4 + fprintf(stderr, "V"); + #endif SS.write(std::string(F.data, F.len)); + }else{ + #if DEBUG >= 4 + fprintf(stderr, "Received useless video data\n"); + #endif + Socket.close(); } break; case 15: @@ -348,6 +358,9 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 parsed3 = true; }//createStream + if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ + if (SS.connected()){SS.close();} + } if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -569,6 +582,9 @@ void Connector_RTMP::parseChunk(){ Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1 parsed = true; }//createStream + if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){ + if (SS.connected()){SS.close();} + } if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ //send a _result reply AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); diff --git a/util/flv_tag.cpp b/util/flv_tag.cpp index c5b4b964..cf75e549 100644 --- a/util/flv_tag.cpp +++ b/util/flv_tag.cpp @@ -580,7 +580,14 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P){ len += (data[2] << 8); len += (data[1] << 16); if (buf < len){data = (char*)realloc(data, len); buf = len;} - if (data[0] > 0x12){FLV::Parse_Error = true; Error_Str = "Invalid Tag received."; return false;} + if (data[0] > 0x12){ + data[0] += 32; + FLV::Parse_Error = true; + Error_Str = "Invalid Tag received ("; + Error_Str += data[0]; + Error_Str += ")."; + return false; + } done = false; } } @@ -607,20 +614,11 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P){ /// \param sock Socket to read from. /// \return True if count bytes are read succesfully, false otherwise. bool FLV::Tag::SockReadUntil(char * buffer, unsigned int count, unsigned int & sofar, Socket::Connection & sock){ - if (sofar == count){return true;} - if (!sock.read(buffer + sofar,count-sofar)){ - if (errno != EWOULDBLOCK){ - FLV::Parse_Error = true; - Error_Str = "Error reading from socket."; - } - return false; - } - sofar += count-sofar; - if (sofar == count){return true;} - if (sofar > count){ - FLV::Parse_Error = true; - Error_Str = "Socket buffer overflow."; - } + if (sofar >= count){return true;} + int r = 0; + r = sock.iread(buffer + sofar,count-sofar); + sofar += r; + if (sofar >= count){return true;} return false; }//Tag::SockReadUntil @@ -647,7 +645,14 @@ bool FLV::Tag::SockLoader(Socket::Connection sock){ len += (data[2] << 8); len += (data[1] << 16); if (buf < len){data = (char*)realloc(data, len); buf = len;} - if (data[0] > 0x12){FLV::Parse_Error = true; Error_Str = "Invalid Tag received."; return false;} + if (data[0] > 0x12){ + data[0] += 32; + FLV::Parse_Error = true; + Error_Str = "Invalid Tag received ("; + Error_Str += data[0]; + Error_Str += ")."; + return false; + } done = false; } } @@ -719,7 +724,14 @@ bool FLV::Tag::FileLoader(FILE * f){ len += (data[2] << 8); len += (data[1] << 16); if (buf < len){data = (char*)realloc(data, len); buf = len;} - if (data[0] > 0x12){FLV::Parse_Error = true; Error_Str = "Invalid Tag received."; return false;} + if (data[0] > 0x12){ + data[0] += 32; + FLV::Parse_Error = true; + Error_Str = "Invalid Tag received ("; + Error_Str += data[0]; + Error_Str += ")."; + return false; + } done = false; } }