From 94f587563c109a3f7dae71cf913497c04c231eb1 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Tue, 24 Aug 2010 00:13:52 +0200
Subject: [PATCH] Stabiliteitsfixes en in theorie werkende H.264 en AAC, maar
 je weet maar nooit...

---
 .gitignore                     |  6 ++--
 Connector_RTMP/Makefile        |  3 +-
 Connector_RTMP/flv_sock.cpp    | 18 ++++++-----
 Connector_RTMP/main.cpp        | 43 +++++++++++++++++--------
 Connector_RTMP/parsechunks.cpp | 10 ++++--
 Server/buffer.h                |  1 +
 Server/main.cpp                | 58 ++++++++++++++++++++++++++++------
 Server/play1000kbit.sh         |  3 +-
 Server/user.cpp                | 53 ++++++++++++++++++++++---------
 util/flv.cpp                   |  3 ++
 10 files changed, 144 insertions(+), 54 deletions(-)

diff --git a/.gitignore b/.gitignore
index eff93cb0..8b4f06d1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,7 @@
 #ignore object files and nonsense like that
 *.[oa]
-Client_PLS
-Server_PLS
-Connector_RTMP
+Client/Client_PLS
+Server/Server_PLS
+Connector_RTMP/Connector_RTMP
 *~
 
diff --git a/Connector_RTMP/Makefile b/Connector_RTMP/Makefile
index e07a6ee4..3dbacaef 100644
--- a/Connector_RTMP/Makefile
+++ b/Connector_RTMP/Makefile
@@ -19,6 +19,5 @@ clean:
 run-test: $(OUT)
 	rm -rf ./meh
 	mkfifo ./meh
+	cat ./meh &
 	nc -l -p 1935 -e './Connector_RTMP 2>./meh'
-run-cat:
-	cat < ./meh
diff --git a/Connector_RTMP/flv_sock.cpp b/Connector_RTMP/flv_sock.cpp
index 82a34ee1..8d6c55a4 100644
--- a/Connector_RTMP/flv_sock.cpp
+++ b/Connector_RTMP/flv_sock.cpp
@@ -10,19 +10,21 @@ void FLV_Readheader(SWUnixSocket & ss){
   }
 }//FLV_Readheader
 
+void FLV_Dump(){FLV_len = 0;}
+
 bool FLV_GetPacket(SWUnixSocket & ss){
   if (FLVbs < 15){FLVbuffer = (char*)realloc(FLVbuffer, 15); FLVbs = 15;}
   //if received a whole header, receive a whole packet
   //if not, retry header next pass
-  if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){
-    FLV_len = FLVbuffer[3] + 15;
-    FLV_len += (FLVbuffer[2] << 8);
-    FLV_len += (FLVbuffer[1] << 16);
-    if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;}
-    while (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) != FLV_len-11){
-      //wait
+  if (FLV_len == 0){
+    if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){
+      FLV_len = FLVbuffer[3] + 15;
+      FLV_len += (FLVbuffer[2] << 8);
+      FLV_len += (FLVbuffer[1] << 16);
+      if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;}
     }
-    return true;
+  }else{
+    if (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) == FLV_len-11){return true;}
   }
   return false;
 }//FLV_GetPacket
diff --git a/Connector_RTMP/main.cpp b/Connector_RTMP/main.cpp
index fc0ddc1d..1521111f 100644
--- a/Connector_RTMP/main.cpp
+++ b/Connector_RTMP/main.cpp
@@ -15,6 +15,7 @@
 #include "../sockets/SocketW.h"
 bool ready4data = false;//set to true when streaming starts
 bool inited = false;
+bool stopparsing = false;
 timeval lastrec;
 
 #include "parsechunks.cpp" //chunkstream parsing
@@ -44,9 +45,10 @@ int main(){
   fprintf(stderr, "Starting processing...\n");
   #endif
   while (!feof(stdin)){
-    select(1, &pollset, 0, 0, &timeout);
+    //select(1, &pollset, 0, 0, &timeout);
     //only parse input from stdin if available or not yet init'ed
-    if (FD_ISSET(0, &pollset) || !ready4data || (snd_cnt - snd_window_at >= snd_window_size)){parseChunk();fflush(stdout);}// || !ready4data?
+    //FD_ISSET(0, &pollset) || //NOTE: Polling does not work? WHY?!? WHY DAMN IT?!?
+    if ((!ready4data || (snd_cnt - snd_window_at >= snd_window_size)) && !stopparsing){parseChunk();fflush(stdout);}
     if (ready4data){
       if (!inited){
         //we are ready, connect the socket!
@@ -54,7 +56,7 @@ int main(){
           #ifdef DEBUG
           fprintf(stderr, "Could not connect to server!\n");
           #endif
-          return 1;
+          return 0;
         }
         FLV_Readheader(ss);//read the header, we don't want it
         #ifdef DEBUG
@@ -69,17 +71,29 @@ int main(){
           ts += FLVbuffer[4] * 256*256;
           ts += FLVbuffer[5] * 256;
           ts += FLVbuffer[6];
-          if (fts == 0){fts = ts;ftst = getNowMS();}
-          ts -= fts;
-          FLVbuffer[7] = ts / (256*256*256);
-          FLVbuffer[4] = ts / (256*256);
-          FLVbuffer[5] = ts / 256;
-          FLVbuffer[6] = ts % 256;
-          ts += ftst;
+          if (ts != 0){
+            if (fts == 0){fts = ts;ftst = getNowMS();}
+            ts -= fts;
+            FLVbuffer[7] = ts / (256*256*256);
+            FLVbuffer[4] = ts / (256*256);
+            FLVbuffer[5] = ts / 256;
+            FLVbuffer[6] = ts % 256;
+            ts += ftst;
+          }else{
+            ftst = getNowMS();
+            FLVbuffer[7] = ftst / (256*256*256);
+            FLVbuffer[4] = ftst / (256*256);
+            FLVbuffer[5] = ftst / 256;
+            FLVbuffer[6] = ftst % 256;
+          }
           SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15, ts);
-          //if (FLVbuffer[0] == 9){
-          //  fprintf(stderr, "first 2 bytes: 0x%hhx 0x%hhx\n", FLVbuffer[11], FLVbuffer[12]);
-          //}
+          FLV_Dump();//dump packet and get ready for next
+        }
+        if ((SWBerr != SWBaseSocket::ok) && (SWBerr != SWBaseSocket::notReady)){
+          #ifdef DEBUG
+          fprintf(stderr, "No more data! :-(  (%s)\n", SWBerr.get_error().c_str());
+          #endif
+          return 0;//no more input possible! Fail immediately.
         }
       }
     }
@@ -89,5 +103,8 @@ int main(){
       SendCTL(3, rec_cnt);//send ack (msg 3)
     }
   }
+  #ifdef DEBUG
+  fprintf(stderr, "User disconnected.\n");
+  #endif
   return 0;
 }//main
diff --git a/Connector_RTMP/parsechunks.cpp b/Connector_RTMP/parsechunks.cpp
index f4fbe465..c3de2954 100644
--- a/Connector_RTMP/parsechunks.cpp
+++ b/Connector_RTMP/parsechunks.cpp
@@ -28,7 +28,7 @@ void parseChunk(){
       fprintf(stderr, "CTRL: Acknowledgement\n");
       #endif
       snd_window_at = ntohl(*(int*)next.data);
-      //maybe better? snd_window_at = snd_cnt;
+      snd_window_at = snd_cnt;
       break;
     case 4:{
       #ifdef DEBUG
@@ -205,6 +205,11 @@ void parseChunk(){
         amfreply.getContentP(3)->addContent(AMFType("details", "PLS"));
         amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1));
         SendChunk(4, 20, 1, amfreply.Pack());
+        amfreply = AMFType("container", (unsigned char)0xFF);
+        amfreply.addContent(AMFType("", "|RtmpSampleAccess"));//status reply
+        amfreply.addContent(AMFType("", (double)1, 0x01));//bool true - audioaccess
+        amfreply.addContent(AMFType("", (double)1, 0x01));//bool true - videoaccess
+        SendChunk(4, 20, next.msg_stream_id, amfreply.Pack());
         chunk_snd_max = 1024*1024;
         SendCTL(1, chunk_snd_max);//send chunk size max (msg 1)
         ready4data = true;//start sending video data!
@@ -226,8 +231,9 @@ void parseChunk(){
       break;
     default:
       #ifdef DEBUG
-      fprintf(stderr, "Unknown chunk received!\n");
+      fprintf(stderr, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.\n", next.msg_type_id);
       #endif
+      stopparsing = true;
       break;
   }
 }//parseChunk
diff --git a/Server/buffer.h b/Server/buffer.h
index c5d04955..5a18603f 100644
--- a/Server/buffer.h
+++ b/Server/buffer.h
@@ -2,5 +2,6 @@
 
 struct buffer{
   int number;
+  bool iskeyframe;
   FLV_Pack * FLV;
 };//buffer
diff --git a/Server/main.cpp b/Server/main.cpp
index f037de3b..c3a84a6b 100644
--- a/Server/main.cpp
+++ b/Server/main.cpp
@@ -37,9 +37,11 @@ int main( int argc, char * argv[] ) {
 
   unlink("/tmp/shared_socket");
   listener.bind("/tmp/shared_socket");
-  listener.listen();
+  listener.listen(50);
   listener.set_timeout(0,50000);
-
+  unsigned char packtype;
+  bool gotVideoInfo = false;
+  bool gotAudioInfo = false;
   while(std::cin.good()) {
     loopcount ++;
     //invalidate the current buffer
@@ -49,25 +51,62 @@ int main( int argc, char * argv[] ) {
       FLV_Readheader();
     } else {
       FLV_GetPacket(ringbuf[current_buffer]->FLV);
-      //if video frame? (id 9) check for incoming connections
-      if (ringbuf[current_buffer]->FLV->data[0] == 0x12){
+      packtype = ringbuf[current_buffer]->FLV->data[0];
+      //store metadata, if available
+      if (packtype == 0x12){
         metabuflen = ringbuf[current_buffer]->FLV->len;
         metabuffer = (char*)realloc(metabuffer, metabuflen);
         memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen);
+        std::cout << "Received metadata!" << std::endl;
+        gotVideoInfo = false;
+        gotAudioInfo = false;
+      }
+      if (!gotVideoInfo && ringbuf[current_buffer]->FLV->isKeyframe){
+        if ((ringbuf[current_buffer]->FLV->data[11] & 0x0f) == 7){//avc packet
+          if (ringbuf[current_buffer]->FLV->data[12] == 0){
+            ringbuf[current_buffer]->FLV->data[4] = 0;//timestamp to zero
+            ringbuf[current_buffer]->FLV->data[5] = 0;//timestamp to zero
+            ringbuf[current_buffer]->FLV->data[6] = 0;//timestamp to zero
+            metabuffer = (char*)realloc(metabuffer, metabuflen + ringbuf[current_buffer]->FLV->len);
+            memcpy(metabuffer+metabuflen, ringbuf[current_buffer]->FLV->data, ringbuf[current_buffer]->FLV->len);
+            metabuflen += ringbuf[current_buffer]->FLV->len;
+            gotVideoInfo = true;
+            std::cout << "Received video configuration!" << std::endl;
+          }
+        }else{gotVideoInfo = true;}//non-avc = no config...
+      }
+      if (!gotAudioInfo && (packtype == 0x08)){
+        if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 10){//aac packet
+          ringbuf[current_buffer]->FLV->data[4] = 0;//timestamp to zero
+          ringbuf[current_buffer]->FLV->data[5] = 0;//timestamp to zero
+          ringbuf[current_buffer]->FLV->data[6] = 0;//timestamp to zero
+          metabuffer = (char*)realloc(metabuffer, metabuflen + ringbuf[current_buffer]->FLV->len);
+          memcpy(metabuffer+metabuflen, ringbuf[current_buffer]->FLV->data, ringbuf[current_buffer]->FLV->len);
+          metabuflen += ringbuf[current_buffer]->FLV->len;
+          gotAudioInfo = true;
+          std::cout << "Received audio configuration!" << std::endl;
+        }else{gotAudioInfo = true;}//no aac = no config...
+      }
+      //on keyframe set start point
+      if (packtype == 0x09){
+        if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 1){lastproper = current_buffer;}
       }
       incoming = listener.accept(&BError);
       if (incoming){
         connectionList.push_back(user(incoming));
         //send the FLV header
-        std::cout << "Client connected." << std::endl;
         connectionList.back().MyBuffer = lastproper;
-        connectionList.back().MyBuffer_num = ringbuf[lastproper]->number;
+        connectionList.back().MyBuffer_num = -1;
         //TODO: Do this more nicely?
-        if (connectionList.back().Conn->send(FLVHeader,13,0) != 13){
+        if (connectionList.back().Conn->send(FLVHeader,13,&BError) != 13){
           connectionList.back().disconnect("failed to receive the header!");
+        }else{
+          if (connectionList.back().Conn->send(metabuffer,metabuflen,&BError) != metabuflen){
+            connectionList.back().disconnect("failed to receive metadata!");
+          }
         }
-        if (connectionList.back().Conn->send(metabuffer,metabuflen,0) != metabuflen){
-          connectionList.back().disconnect("failed to receive metadata!");
+        if (BError != SWBaseSocket::ok){
+          connectionList.back().disconnect("Socket error: " + BError.get_error());
         }
       }
       ringbuf[current_buffer]->number = loopcount;
@@ -77,7 +116,6 @@ int main( int argc, char * argv[] ) {
         (*connIt).Send(ringbuf, buffers);
       }
       //keep track of buffers
-      lastproper = current_buffer;
       current_buffer++;
       current_buffer %= buffers;
     }
diff --git a/Server/play1000kbit.sh b/Server/play1000kbit.sh
index 388e208a..c6811478 100755
--- a/Server/play1000kbit.sh
+++ b/Server/play1000kbit.sh
@@ -1,3 +1,4 @@
 #!/bin/bash
-ffmpeg -re -i "$1" -b 1024000 -ar 11025 -f flv - | ./Server_PLS 5000 5
+ffmpeg -re -i "$1" -b 1024000 -ar 11025 -f flv - 2> /dev/null | ./Server_PLS 500
+
 
diff --git a/Server/user.cpp b/Server/user.cpp
index 6143cea3..41948f80 100644
--- a/Server/user.cpp
+++ b/Server/user.cpp
@@ -11,19 +11,28 @@ class user{
     SWUnixSocket * Conn;
     int MyBuffer;
     int MyBuffer_num;
+    int MyBuffer_len;
+    int MyNum;
+    void * lastpointer;
+    static int UserCount;
+    static SWBaseSocket::SWBaseError err;
 };//user
 
+int user::UserCount = 0;
+SWBaseSocket::SWBaseError user::err;
 
 user::user(SWBaseSocket * newConn) {
   Conn = (SWUnixSocket*)newConn;
   is_connected = (Conn != 0);
+  MyNum = UserCount++;
+  std::cout << "User " << MyNum << " connected" << std::endl;
 }
 
 void user::disconnect(std::string reason) {
   if (Conn) {
-    Conn->disconnect();
+    Conn->disconnect(&err);
     Conn = NULL;
-    std::cout << "Disconnected user: " << reason << std::endl;
+    std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl;
   }
   is_connected = false;
 }
@@ -32,24 +41,38 @@ void user::Send(buffer ** ringbuf, int buffers){
   //not connected? cancel
   if (!is_connected){return;}
   //still waiting for next buffer? check it
-  if (MyBuffer_num < 0){MyBuffer_num = ringbuf[MyBuffer]->number;}
-  //still waiting? don't crash - wait longer.
-  if (MyBuffer_num < 0){return;}
-  //buffer number changed? disconnect
-  if ((ringbuf[MyBuffer]->number != MyBuffer_num)){
-    disconnect("Buffer number changed (connection too slow)");
+  if (MyBuffer_num < 0){
+    MyBuffer_num = ringbuf[MyBuffer]->number;
+    //still waiting? don't crash - wait longer.
+    if (MyBuffer_num < 0){
+      return;
+    }else{
+      MyBuffer_len = ringbuf[MyBuffer]->FLV->len;
+      lastpointer = ringbuf[MyBuffer]->FLV->data;
+    }
+  }
+  if (lastpointer != ringbuf[MyBuffer]->FLV->data){
+    disconnect("Buffer resize at wrong time... had to disconnect");
     return;
   }
-  SWBaseSocket::SWBaseError err;
-  int ret = Conn->fsend(ringbuf[MyBuffer]->FLV->data, ringbuf[MyBuffer]->FLV->len, &err);
+  int ret = Conn->fsend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len, &err);
   if ((err != SWBaseSocket::ok) && (err != SWBaseSocket::notReady)){
-    disconnect("Socket error");
+    disconnect("Socket error: " + err.get_error());
     return;
   }
-  if (ret == ringbuf[MyBuffer]->FLV->len){
+  if (ret == MyBuffer_len){
     //completed a send - switch to next buffer
-    MyBuffer++;
-    MyBuffer %= buffers;
+    if ((ringbuf[MyBuffer]->number != MyBuffer_num)){
+      std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
+      do{
+        MyBuffer++;
+        MyBuffer %= buffers;
+      }while(!ringbuf[MyBuffer]->FLV->isKeyframe);
+    }else{
+      MyBuffer++;
+      MyBuffer %= buffers;
+    }
     MyBuffer_num = -1;
+    lastpointer = 0;
   }
-}
\ No newline at end of file
+}
diff --git a/util/flv.cpp b/util/flv.cpp
index 4dc4235f..0e119a52 100644
--- a/util/flv.cpp
+++ b/util/flv.cpp
@@ -3,6 +3,7 @@
 struct FLV_Pack {
   int len;
   int buf;
+  bool isKeyframe;
   char * data;
 };//FLV_Pack
 
@@ -43,4 +44,6 @@ void FLV_GetPacket(FLV_Pack *& p){
   p->len += (p->data[1] << 16);
   if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;}
   fread(p->data+11,1,p->len-11,stdin);
+  p->isKeyframe = false;
+  if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;}
 }//FLV_GetPacket