diff --git a/.gitignore b/.gitignore index eff93cb0..8b4f06d1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ #ignore object files and nonsense like that *.[oa] -Client_PLS -Server_PLS -Connector_RTMP +Client/Client_PLS +Server/Server_PLS +Connector_RTMP/Connector_RTMP *~ diff --git a/Connector_RTMP/Makefile b/Connector_RTMP/Makefile index e07a6ee4..3dbacaef 100644 --- a/Connector_RTMP/Makefile +++ b/Connector_RTMP/Makefile @@ -19,6 +19,5 @@ clean: run-test: $(OUT) rm -rf ./meh mkfifo ./meh + cat ./meh & nc -l -p 1935 -e './Connector_RTMP 2>./meh' -run-cat: - cat < ./meh diff --git a/Connector_RTMP/flv_sock.cpp b/Connector_RTMP/flv_sock.cpp index 82a34ee1..8d6c55a4 100644 --- a/Connector_RTMP/flv_sock.cpp +++ b/Connector_RTMP/flv_sock.cpp @@ -10,19 +10,21 @@ void FLV_Readheader(SWUnixSocket & ss){ } }//FLV_Readheader +void FLV_Dump(){FLV_len = 0;} + bool FLV_GetPacket(SWUnixSocket & ss){ if (FLVbs < 15){FLVbuffer = (char*)realloc(FLVbuffer, 15); FLVbs = 15;} //if received a whole header, receive a whole packet //if not, retry header next pass - if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){ - FLV_len = FLVbuffer[3] + 15; - FLV_len += (FLVbuffer[2] << 8); - FLV_len += (FLVbuffer[1] << 16); - if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;} - while (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) != FLV_len-11){ - //wait + if (FLV_len == 0){ + if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){ + FLV_len = FLVbuffer[3] + 15; + FLV_len += (FLVbuffer[2] << 8); + FLV_len += (FLVbuffer[1] << 16); + if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;} } - return true; + }else{ + if (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) == FLV_len-11){return true;} } return false; }//FLV_GetPacket diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp index fc0ddc1d..1521111f 100644 --- a/Connector_RTMP/main.cpp +++ b/Connector_RTMP/main.cpp @@ -15,6 +15,7 @@ #include "../sockets/SocketW.h" bool ready4data = false;//set to true when streaming starts bool inited = false; +bool stopparsing = false; timeval lastrec; #include "parsechunks.cpp" //chunkstream parsing @@ -44,9 +45,10 @@ int main(){ fprintf(stderr, "Starting processing...\n"); #endif while (!feof(stdin)){ - select(1, &pollset, 0, 0, &timeout); + //select(1, &pollset, 0, 0, &timeout); //only parse input from stdin if available or not yet init'ed - if (FD_ISSET(0, &pollset) || !ready4data || (snd_cnt - snd_window_at >= snd_window_size)){parseChunk();fflush(stdout);}// || !ready4data? + //FD_ISSET(0, &pollset) || //NOTE: Polling does not work? WHY?!? WHY DAMN IT?!? + if ((!ready4data || (snd_cnt - snd_window_at >= snd_window_size)) && !stopparsing){parseChunk();fflush(stdout);} if (ready4data){ if (!inited){ //we are ready, connect the socket! @@ -54,7 +56,7 @@ int main(){ #ifdef DEBUG fprintf(stderr, "Could not connect to server!\n"); #endif - return 1; + return 0; } FLV_Readheader(ss);//read the header, we don't want it #ifdef DEBUG @@ -69,17 +71,29 @@ int main(){ ts += FLVbuffer[4] * 256*256; ts += FLVbuffer[5] * 256; ts += FLVbuffer[6]; - if (fts == 0){fts = ts;ftst = getNowMS();} - ts -= fts; - FLVbuffer[7] = ts / (256*256*256); - FLVbuffer[4] = ts / (256*256); - FLVbuffer[5] = ts / 256; - FLVbuffer[6] = ts % 256; - ts += ftst; + if (ts != 0){ + if (fts == 0){fts = ts;ftst = getNowMS();} + ts -= fts; + FLVbuffer[7] = ts / (256*256*256); + FLVbuffer[4] = ts / (256*256); + FLVbuffer[5] = ts / 256; + FLVbuffer[6] = ts % 256; + ts += ftst; + }else{ + ftst = getNowMS(); + FLVbuffer[7] = ftst / (256*256*256); + FLVbuffer[4] = ftst / (256*256); + FLVbuffer[5] = ftst / 256; + FLVbuffer[6] = ftst % 256; + } SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15, ts); - //if (FLVbuffer[0] == 9){ - // fprintf(stderr, "first 2 bytes: 0x%hhx 0x%hhx\n", FLVbuffer[11], FLVbuffer[12]); - //} + FLV_Dump();//dump packet and get ready for next + } + if ((SWBerr != SWBaseSocket::ok) && (SWBerr != SWBaseSocket::notReady)){ + #ifdef DEBUG + fprintf(stderr, "No more data! :-( (%s)\n", SWBerr.get_error().c_str()); + #endif + return 0;//no more input possible! Fail immediately. } } } @@ -89,5 +103,8 @@ int main(){ SendCTL(3, rec_cnt);//send ack (msg 3) } } + #ifdef DEBUG + fprintf(stderr, "User disconnected.\n"); + #endif return 0; }//main diff --git a/Connector_RTMP/parsechunks.cpp b/Connector_RTMP/parsechunks.cpp index f4fbe465..c3de2954 100644 --- a/Connector_RTMP/parsechunks.cpp +++ b/Connector_RTMP/parsechunks.cpp @@ -28,7 +28,7 @@ void parseChunk(){ fprintf(stderr, "CTRL: Acknowledgement\n"); #endif snd_window_at = ntohl(*(int*)next.data); - //maybe better? snd_window_at = snd_cnt; + snd_window_at = snd_cnt; break; case 4:{ #ifdef DEBUG @@ -205,6 +205,11 @@ void parseChunk(){ amfreply.getContentP(3)->addContent(AMFType("details", "PLS")); amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1)); SendChunk(4, 20, 1, amfreply.Pack()); + amfreply = AMFType("container", (unsigned char)0xFF); + amfreply.addContent(AMFType("", "|RtmpSampleAccess"));//status reply + amfreply.addContent(AMFType("", (double)1, 0x01));//bool true - audioaccess + amfreply.addContent(AMFType("", (double)1, 0x01));//bool true - videoaccess + SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()); chunk_snd_max = 1024*1024; SendCTL(1, chunk_snd_max);//send chunk size max (msg 1) ready4data = true;//start sending video data! @@ -226,8 +231,9 @@ void parseChunk(){ break; default: #ifdef DEBUG - fprintf(stderr, "Unknown chunk received!\n"); + fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n", next.msg_type_id); #endif + stopparsing = true; break; } }//parseChunk diff --git a/Server/buffer.h b/Server/buffer.h index c5d04955..5a18603f 100644 --- a/Server/buffer.h +++ b/Server/buffer.h @@ -2,5 +2,6 @@ struct buffer{ int number; + bool iskeyframe; FLV_Pack * FLV; };//buffer diff --git a/Server/main.cpp b/Server/main.cpp index f037de3b..c3a84a6b 100644 --- a/Server/main.cpp +++ b/Server/main.cpp @@ -37,9 +37,11 @@ int main( int argc, char * argv[] ) { unlink("/tmp/shared_socket"); listener.bind("/tmp/shared_socket"); - listener.listen(); + listener.listen(50); listener.set_timeout(0,50000); - + unsigned char packtype; + bool gotVideoInfo = false; + bool gotAudioInfo = false; while(std::cin.good()) { loopcount ++; //invalidate the current buffer @@ -49,25 +51,62 @@ int main( int argc, char * argv[] ) { FLV_Readheader(); } else { FLV_GetPacket(ringbuf[current_buffer]->FLV); - //if video frame? (id 9) check for incoming connections - if (ringbuf[current_buffer]->FLV->data[0] == 0x12){ + packtype = ringbuf[current_buffer]->FLV->data[0]; + //store metadata, if available + if (packtype == 0x12){ metabuflen = ringbuf[current_buffer]->FLV->len; metabuffer = (char*)realloc(metabuffer, metabuflen); memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen); + std::cout << "Received metadata!" << std::endl; + gotVideoInfo = false; + gotAudioInfo = false; + } + if (!gotVideoInfo && ringbuf[current_buffer]->FLV->isKeyframe){ + if ((ringbuf[current_buffer]->FLV->data[11] & 0x0f) == 7){//avc packet + if (ringbuf[current_buffer]->FLV->data[12] == 0){ + ringbuf[current_buffer]->FLV->data[4] = 0;//timestamp to zero + ringbuf[current_buffer]->FLV->data[5] = 0;//timestamp to zero + ringbuf[current_buffer]->FLV->data[6] = 0;//timestamp to zero + metabuffer = (char*)realloc(metabuffer, metabuflen + ringbuf[current_buffer]->FLV->len); + memcpy(metabuffer+metabuflen, ringbuf[current_buffer]->FLV->data, ringbuf[current_buffer]->FLV->len); + metabuflen += ringbuf[current_buffer]->FLV->len; + gotVideoInfo = true; + std::cout << "Received video configuration!" << std::endl; + } + }else{gotVideoInfo = true;}//non-avc = no config... + } + if (!gotAudioInfo && (packtype == 0x08)){ + if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 10){//aac packet + ringbuf[current_buffer]->FLV->data[4] = 0;//timestamp to zero + ringbuf[current_buffer]->FLV->data[5] = 0;//timestamp to zero + ringbuf[current_buffer]->FLV->data[6] = 0;//timestamp to zero + metabuffer = (char*)realloc(metabuffer, metabuflen + ringbuf[current_buffer]->FLV->len); + memcpy(metabuffer+metabuflen, ringbuf[current_buffer]->FLV->data, ringbuf[current_buffer]->FLV->len); + metabuflen += ringbuf[current_buffer]->FLV->len; + gotAudioInfo = true; + std::cout << "Received audio configuration!" << std::endl; + }else{gotAudioInfo = true;}//no aac = no config... + } + //on keyframe set start point + if (packtype == 0x09){ + if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 1){lastproper = current_buffer;} } incoming = listener.accept(&BError); if (incoming){ connectionList.push_back(user(incoming)); //send the FLV header - std::cout << "Client connected." << std::endl; connectionList.back().MyBuffer = lastproper; - connectionList.back().MyBuffer_num = ringbuf[lastproper]->number; + connectionList.back().MyBuffer_num = -1; //TODO: Do this more nicely? - if (connectionList.back().Conn->send(FLVHeader,13,0) != 13){ + if (connectionList.back().Conn->send(FLVHeader,13,&BError) != 13){ connectionList.back().disconnect("failed to receive the header!"); + }else{ + if (connectionList.back().Conn->send(metabuffer,metabuflen,&BError) != metabuflen){ + connectionList.back().disconnect("failed to receive metadata!"); + } } - if (connectionList.back().Conn->send(metabuffer,metabuflen,0) != metabuflen){ - connectionList.back().disconnect("failed to receive metadata!"); + if (BError != SWBaseSocket::ok){ + connectionList.back().disconnect("Socket error: " + BError.get_error()); } } ringbuf[current_buffer]->number = loopcount; @@ -77,7 +116,6 @@ int main( int argc, char * argv[] ) { (*connIt).Send(ringbuf, buffers); } //keep track of buffers - lastproper = current_buffer; current_buffer++; current_buffer %= buffers; } diff --git a/Server/play1000kbit.sh b/Server/play1000kbit.sh index 388e208a..c6811478 100755 --- a/Server/play1000kbit.sh +++ b/Server/play1000kbit.sh @@ -1,3 +1,4 @@ #!/bin/bash -ffmpeg -re -i "$1" -b 1024000 -ar 11025 -f flv - | ./Server_PLS 5000 5 +ffmpeg -re -i "$1" -b 1024000 -ar 11025 -f flv - 2> /dev/null | ./Server_PLS 500 + diff --git a/Server/user.cpp b/Server/user.cpp index 6143cea3..41948f80 100644 --- a/Server/user.cpp +++ b/Server/user.cpp @@ -11,19 +11,28 @@ class user{ SWUnixSocket * Conn; int MyBuffer; int MyBuffer_num; + int MyBuffer_len; + int MyNum; + void * lastpointer; + static int UserCount; + static SWBaseSocket::SWBaseError err; };//user +int user::UserCount = 0; +SWBaseSocket::SWBaseError user::err; user::user(SWBaseSocket * newConn) { Conn = (SWUnixSocket*)newConn; is_connected = (Conn != 0); + MyNum = UserCount++; + std::cout << "User " << MyNum << " connected" << std::endl; } void user::disconnect(std::string reason) { if (Conn) { - Conn->disconnect(); + Conn->disconnect(&err); Conn = NULL; - std::cout << "Disconnected user: " << reason << std::endl; + std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; } is_connected = false; } @@ -32,24 +41,38 @@ void user::Send(buffer ** ringbuf, int buffers){ //not connected? cancel if (!is_connected){return;} //still waiting for next buffer? check it - if (MyBuffer_num < 0){MyBuffer_num = ringbuf[MyBuffer]->number;} - //still waiting? don't crash - wait longer. - if (MyBuffer_num < 0){return;} - //buffer number changed? disconnect - if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ - disconnect("Buffer number changed (connection too slow)"); + if (MyBuffer_num < 0){ + MyBuffer_num = ringbuf[MyBuffer]->number; + //still waiting? don't crash - wait longer. + if (MyBuffer_num < 0){ + return; + }else{ + MyBuffer_len = ringbuf[MyBuffer]->FLV->len; + lastpointer = ringbuf[MyBuffer]->FLV->data; + } + } + if (lastpointer != ringbuf[MyBuffer]->FLV->data){ + disconnect("Buffer resize at wrong time... had to disconnect"); return; } - SWBaseSocket::SWBaseError err; - int ret = Conn->fsend(ringbuf[MyBuffer]->FLV->data, ringbuf[MyBuffer]->FLV->len, &err); + int ret = Conn->fsend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len, &err); if ((err != SWBaseSocket::ok) && (err != SWBaseSocket::notReady)){ - disconnect("Socket error"); + disconnect("Socket error: " + err.get_error()); return; } - if (ret == ringbuf[MyBuffer]->FLV->len){ + if (ret == MyBuffer_len){ //completed a send - switch to next buffer - MyBuffer++; - MyBuffer %= buffers; + if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ + std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; + do{ + MyBuffer++; + MyBuffer %= buffers; + }while(!ringbuf[MyBuffer]->FLV->isKeyframe); + }else{ + MyBuffer++; + MyBuffer %= buffers; + } MyBuffer_num = -1; + lastpointer = 0; } -} \ No newline at end of file +} diff --git a/util/flv.cpp b/util/flv.cpp index 4dc4235f..0e119a52 100644 --- a/util/flv.cpp +++ b/util/flv.cpp @@ -3,6 +3,7 @@ struct FLV_Pack { int len; int buf; + bool isKeyframe; char * data; };//FLV_Pack @@ -43,4 +44,6 @@ void FLV_GetPacket(FLV_Pack *& p){ p->len += (p->data[1] << 16); if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;} fread(p->data+11,1,p->len-11,stdin); + p->isKeyframe = false; + if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;} }//FLV_GetPacket