From ef412b62da57dc558cf5049a0bea701464dbb391 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Fri, 31 Aug 2012 16:58:17 +0200
Subject: [PATCH] Updated everything for new socket lib requirements - working
 VoD through HTTP progressive, HTTP dynamic is almost working and RTMP is
 severely broken altogether.

---
 src/buffer.cpp                |   6 +-
 src/buffer_stream.cpp         |   5 +-
 src/buffer_stream.h           |   2 +-
 src/conn_http.cpp             |  23 +++---
 src/conn_http_dynamic.cpp     |  74 ++++++++++++++---
 src/conn_http_progressive.cpp |  28 ++++---
 src/conn_raw.cpp              |   6 +-
 src/conn_rtmp.cpp             |  20 +++--
 src/player.cpp                | 145 +++++++++++++++++++++++-----------
 9 files changed, 215 insertions(+), 94 deletions(-)

diff --git a/src/buffer.cpp b/src/buffer.cpp
index f651840a..a79918a7 100644
--- a/src/buffer.cpp
+++ b/src/buffer.cpp
@@ -33,6 +33,7 @@ namespace Buffer{
 
   void handleStats(void * empty){
     if (empty != 0){return;}
+    std::string double_newline = "\n\n";
     Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
     while (buffer_running){
       usleep(1000000); //sleep one second
@@ -40,7 +41,8 @@ namespace Buffer{
         StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
       }
       if (StatsSocket.connected()){
-        StatsSocket.Send(Stream::get()->getStats()+"\n\n");
+        StatsSocket.Send(Stream::get()->getStats());
+        StatsSocket.Send(double_newline);
         StatsSocket.flush();
       }
     }
@@ -140,7 +142,7 @@ namespace Buffer{
           inBuffer.append(charBuffer, charCount);
         }
       }else{
-        usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000);
+        usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000);
       }
     }
     buffer_running = false;
diff --git a/src/buffer_stream.cpp b/src/buffer_stream.cpp
index ebbbe6f9..1956a28b 100644
--- a/src/buffer_stream.cpp
+++ b/src/buffer_stream.cpp
@@ -45,7 +45,8 @@ Buffer::Stream::~Stream(){
 }
 
 /// Calculate and return the current statistics in JSON format.
-std::string Buffer::Stream::getStats(){
+std::string & Buffer::Stream::getStats(){
+  static std::string ret;
   unsigned int now = time(0);
   unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
   stats_mutex.lock();
@@ -64,7 +65,7 @@ std::string Buffer::Stream::getStats(){
   Storage["meta"] = Strm->metadata;
   if (Storage["meta"].isMember("audio")){Storage["meta"]["audio"].removeMember("init");}
   if (Storage["meta"].isMember("video")){Storage["meta"]["video"].removeMember("init");}
-  std::string ret = Storage.toString();
+  ret = Storage.toString();
   Storage["log"].null();
   stats_mutex.unlock();
   return ret;
diff --git a/src/buffer_stream.h b/src/buffer_stream.h
index a4d340e2..81784d39 100644
--- a/src/buffer_stream.h
+++ b/src/buffer_stream.h
@@ -14,7 +14,7 @@ namespace Buffer{
       /// Get a reference to this Stream object.
       static Stream * get();
       /// Get the current statistics in JSON format.
-      std::string getStats();
+      std::string & getStats();
       /// Get a new DTSC::Ring object for a user.
       DTSC::Ring * getRing();
       /// Drop a DTSC::Ring object.
diff --git a/src/conn_http.cpp b/src/conn_http.cpp
index 09af0847..edcec046 100644
--- a/src/conn_http.cpp
+++ b/src/conn_http.cpp
@@ -103,7 +103,7 @@ namespace Connector_HTTP{
   /// Handles internal requests.
   void Handle_Internal(HTTP::Parser & H, Socket::Connection * conn){
 
-    std::string url = H.url;
+    std::string url = H.getUrl();
 
     if (url == "/crossdomain.xml"){
       H.Clean();
@@ -114,7 +114,7 @@ namespace Connector_HTTP{
       return;
     }//crossdomain.xml
 
-    if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(H.url.length() - 3, 3) == ".js")){
+    if ((url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js") || (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js")){
       std::string streamname;
       if (url.substr(0, 6) == "/info_"){
         streamname = url.substr(6, url.length() - 9);
@@ -293,24 +293,25 @@ namespace Connector_HTTP{
   /// - dynamic (request fed from http_dynamic connector)
   /// - progressive (request fed from http_progressive connector)
   std::string getHTTPType(HTTP::Parser & H){
-    if ((H.url.find("f4m") != std::string::npos) || ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos))){
-      std::string streamname = H.url.substr(1,H.url.find("/",1)-1);
+    std::string url = H.getUrl();
+    if ((url.find("f4m") != std::string::npos) || ((url.find("Seg") != std::string::npos) && (url.find("Frag") != std::string::npos))){
+      std::string streamname = url.substr(1,url.find("/",1)-1);
       Util::Stream::sanitizeName(streamname);
       H.SetVar("stream", streamname);
       return "dynamic";
     }
-    if (H.url.length() > 4){
-      std::string ext = H.url.substr(H.url.length() - 4, 4);
+    if (url.length() > 4){
+      std::string ext = url.substr(url.length() - 4, 4);
       if (ext == ".flv" || ext == ".mp3"){
-        std::string streamname = H.url.substr(1,H.url.length() - 5);
+        std::string streamname = url.substr(1,url.length() - 5);
         Util::Stream::sanitizeName(streamname);
         H.SetVar("stream", streamname);
         return "progressive";
       }
     }
-    if (H.url == "/crossdomain.xml"){return "internal";}
-    if (H.url.length() > 10 && H.url.substr(0, 7) == "/embed_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";}
-    if (H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js"){return "internal";}
+    if (url == "/crossdomain.xml"){return "internal";}
+    if (url.length() > 10 && url.substr(0, 7) == "/embed_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";}
+    if (url.length() > 9 && url.substr(0, 6) == "/info_" && url.substr(url.length() - 3, 3) == ".js"){return "internal";}
     return "none";
   }
 
@@ -324,7 +325,7 @@ namespace Connector_HTTP{
         if (Client.Read(conn->Received())){
           std::string handler = getHTTPType(Client);
           #if DEBUG >= 4
-          std::cout << "Received request: " << Client.url << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl;
+          std::cout << "Received request: " << Client.getUrl() << " => " << handler << " (" << Client.GetVar("stream") << ")" << std::endl;
           #endif
           if (handler == "none" || handler == "internal"){
             if (handler == "internal"){
diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp
index 85ce94fa..d74b6adc 100644
--- a/src/conn_http_dynamic.cpp
+++ b/src/conn_http_dynamic.cpp
@@ -113,6 +113,7 @@ namespace Connector_HTTP{
   /// Main function for Connector_HTTP_Dynamic
   int Connector_HTTP_Dynamic(Socket::Connection conn){
     std::string FlashBuf;
+    int flashbuf_nonempty = 0;
     FLV::Tag tmp;//temporary tag, for init data
 
     std::queue<std::string> Flash_FragBuffer;//Fragment buffer
@@ -139,11 +140,27 @@ namespace Connector_HTTP{
       if (conn.spool()){
         if (HTTP_R.Read(conn.Received())){
           #if DEBUG >= 4
-          std::cout << "Received request: " << HTTP_R.url << std::endl;
+          std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
           #endif
           conn.setHost(HTTP_R.GetHeader("X-Origin"));
           if (HTTP_R.url.find("f4m") == std::string::npos){
             streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
+            if (!ss){
+              ss = Util::Stream::getStream(streamname);
+              if (!ss.connected()){
+                #if DEBUG >= 1
+                fprintf(stderr, "Could not connect to server!\n");
+                #endif
+                ss.close();
+                HTTP_S.Clean();
+                HTTP_S.SetBody("No such stream is available on the system. Please try again.\n");
+                conn.Send(HTTP_S.BuildResponse("404", "Not found"));
+                ready4data = false;
+                continue;
+              }
+              ss.setBlocking(false);
+              inited = true;
+            }
             Quality = HTTP_R.url.substr( HTTP_R.url.find("/",1)+1 );
             Quality = Quality.substr(0, Quality.find("Seg"));
             temp = HTTP_R.url.find("Seg") + 3;
@@ -153,12 +170,27 @@ namespace Connector_HTTP{
             #if DEBUG >= 4
             printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment);
             #endif
-            ss.Send("f " + JSON::Value((long long int)ReqFragment) + "\no \n");
+            std::stringstream sstream;
+            sstream << "f " << ReqFragment << "\no \n";
+            ss.Send(sstream.str().c_str());
             ss.flush();
             Flash_RequestPending++;
           }else{
             streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1);
-            pending_manifest = true;
+            if (!Strm.metadata.isNull()){
+              HTTP_S.Clean();
+              HTTP_S.SetHeader("Content-Type","text/xml");
+              HTTP_S.SetHeader("Cache-Control","no-cache");
+              std::string manifest = BuildManifest(streamname, Strm.metadata);
+              HTTP_S.SetBody(manifest);
+              conn.Send(HTTP_S.BuildResponse("200", "OK"));
+              #if DEBUG >= 3
+              printf("Sent manifest\n");
+              #endif
+              pending_manifest = false;
+            }else{
+              pending_manifest = true;
+            }
           }
           ready4data = true;
           HTTP_R.Clean(); //clean for any possible next requests
@@ -183,6 +215,7 @@ namespace Connector_HTTP{
             ready4data = false;
             continue;
           }
+          ss.setBlocking(false);
           #if DEBUG >= 3
           fprintf(stderr, "Everything connected, starting to send video data...\n");
           #endif
@@ -202,7 +235,8 @@ namespace Connector_HTTP{
         unsigned int now = time(0);
         if (now != lastStats){
           lastStats = now;
-          ss.Send("S "+conn.getStats("HTTP_Dynamic"));
+          ss.Send("S ");
+          ss.Send(conn.getStats("HTTP_Dynamic").c_str());
         }
         if (ss.spool() || ss.Received() != ""){
           if (Strm.parsePacket(ss.Received())){
@@ -216,7 +250,9 @@ namespace Connector_HTTP{
               }
               Strm.metadata["lasttime"] = Strm.getPacket(0)["time"];
             }
-            tag.DTSCLoader(Strm);
+            if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
+              tag.DTSCLoader(Strm);
+            }
             if (pending_manifest){
               HTTP_S.Clean();
               HTTP_S.SetHeader("Content-Type","text/xml");
@@ -229,17 +265,18 @@ namespace Connector_HTTP{
               #endif
               pending_manifest = false;
             }
-            if (Strm.getPacket(0).isMember("keyframe")){
-              if (FlashBuf != ""){
+            if (Strm.getPacket(0).isMember("keyframe") || Strm.getPacket(0)["datatype"].asString() == "pause_marker"){
+              if (flashbuf_nonempty){
                 Flash_FragBuffer.push(FlashBuf);
                 while (Flash_FragBuffer.size() > 2){
                   Flash_FragBuffer.pop();
                 }
                 #if DEBUG >= 4
-                fprintf(stderr, "Received a fragment. Now %i in buffer.\n", (int)Flash_FragBuffer.size());
+                fprintf(stderr, "Received a %s fragment of %i packets. Now %i in buffer.\n", Strm.getPacket(0)["datatype"].asString().c_str(), flashbuf_nonempty, (int)Flash_FragBuffer.size());
                 #endif
               }
               FlashBuf.clear();
+              flashbuf_nonempty = 0;
               //fill buffer with init data, if needed.
               if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
                 tmp.DTSCAudioInit(Strm);
@@ -250,14 +287,31 @@ namespace Connector_HTTP{
                 FlashBuf.append(tmp.data, tmp.len);
               }
             }
-            FlashBuf.append(tag.data, tag.len);
+            if (Strm.lastType() == DTSC::VIDEO || Strm.lastType() == DTSC::AUDIO){
+              ++flashbuf_nonempty;
+              FlashBuf.append(tag.data, tag.len);
+            }
+          }else{
+            if (pending_manifest && !Strm.metadata.isNull()){
+              HTTP_S.Clean();
+              HTTP_S.SetHeader("Content-Type","text/xml");
+              HTTP_S.SetHeader("Cache-Control","no-cache");
+              std::string manifest = BuildManifest(streamname, Strm.metadata);
+              HTTP_S.SetBody(manifest);
+              conn.Send(HTTP_S.BuildResponse("200", "OK"));
+              #if DEBUG >= 3
+              printf("Sent manifest\n");
+              #endif
+              pending_manifest = false;
+            }
           }
         }
         if (!ss.connected()){break;}
       }
     }
     conn.close();
-    ss.Send("S "+conn.getStats("HTTP_Dynamic"));
+    ss.Send("S ");
+    ss.Send(conn.getStats("HTTP_Dynamic").c_str());
     ss.flush();
     ss.close();
     #if DEBUG >= 1
diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp
index 81f7e955..c626a471 100644
--- a/src/conn_http_progressive.cpp
+++ b/src/conn_http_progressive.cpp
@@ -35,7 +35,7 @@ namespace Connector_HTTP{
     FLV::Tag tag;///< Temporary tag buffer.
 
     unsigned int lastStats = 0;
-    unsigned int seek_pos = 0;//seek position in milliseconds
+    unsigned int seek_pos = 0;//seek position in ms
     conn.setBlocking(false);//do not block on conn.spool() when no data is available
 
     while (conn.connected()){
@@ -43,14 +43,14 @@ namespace Connector_HTTP{
       if (conn.spool()){
         if (HTTP_R.Read(conn.Received())){
           #if DEBUG >= 4
-          std::cout << "Received request: " << HTTP_R.url << std::endl;
+          std::cout << "Received request: " << HTTP_R.getUrl() << std::endl;
           #endif
           conn.setHost(HTTP_R.GetHeader("X-Origin"));
           //we assume the URL is the stream name with a 3 letter extension
-          streamname = HTTP_R.url;
+          streamname = HTTP_R.getUrl().substr(1);
           size_t extDot = streamname.rfind('.');
           if (extDot != std::string::npos){streamname.resize(extDot);};//strip the extension
-          seek_pos = 1000 * atof(HTTP_R.GetVar("start").c_str());//seconds to ms
+          seek_pos = atoi(HTTP_R.GetVar("start").c_str()) * 1000;//seconds to ms
           ready4data = true;
           HTTP_R.Clean(); //clean for any possible next requests
         }else{
@@ -65,7 +65,7 @@ namespace Connector_HTTP{
           ss = Util::Stream::getStream(streamname);
           if (!ss.connected()){
             #if DEBUG >= 1
-            fprintf(stderr, "Could not connect to server!\n");
+            fprintf(stderr, "Could not connect to server for %s!\n", streamname.c_str());
             #endif
             ss.close();
             HTTP_S.Clean();
@@ -77,7 +77,7 @@ namespace Connector_HTTP{
           if (seek_pos){
             std::stringstream cmd;
             cmd << "s " << seek_pos << "\n";
-            ss.Send(cmd.str());
+            ss.Send(cmd.str().c_str());
           }
           #if DEBUG >= 3
           fprintf(stderr, "Everything connected, starting to send video data...\n");
@@ -89,7 +89,8 @@ namespace Connector_HTTP{
         unsigned int now = time(0);
         if (now != lastStats){
           lastStats = now;
-          ss.Send("S "+conn.getStats("HTTP_Progressive"));
+          ss.Send("S ");
+          ss.Send(conn.getStats("HTTP_Progressive").c_str());
         }
         if (ss.spool() || ss.Received() != ""){
           if (Strm.parsePacket(ss.Received())){
@@ -100,34 +101,35 @@ namespace Connector_HTTP{
               //HTTP_S.SetHeader("Transfer-Encoding", "chunked");
               HTTP_S.protocol = "HTTP/1.0";
               conn.Send(HTTP_S.BuildResponse("200", "OK"));//no SetBody = unknown length - this is intentional, we will stream the entire file
-              conn.Send(std::string(FLV::Header, 13));//write FLV header
+              conn.Send(FLV::Header, 13);//write FLV header
               static FLV::Tag tmp;
               //write metadata
               tmp.DTSCMetaInit(Strm);
-              conn.Send(std::string(tmp.data, tmp.len));
+              conn.Send(tmp.data, tmp.len);
               //write video init data, if needed
               if (Strm.metadata.isMember("video") && Strm.metadata["video"].isMember("init")){
                 tmp.DTSCVideoInit(Strm);
-                conn.Send(std::string(tmp.data, tmp.len));
+                conn.Send(tmp.data, tmp.len);
               }
               //write audio init data, if needed
               if (Strm.metadata.isMember("audio") && Strm.metadata["audio"].isMember("init")){
                 tmp.DTSCAudioInit(Strm);
-                conn.Send(std::string(tmp.data, tmp.len));
+                conn.Send(tmp.data, tmp.len);
               }
               progressive_has_sent_header = true;
               #if DEBUG >= 1
               fprintf(stderr, "Sent progressive FLV header\n");
               #endif
             }
-            conn.Send(std::string(tag.data, tag.len));//write the tag contents
+            conn.Send(tag.data, tag.len);//write the tag contents
           }
         }
         if (!ss.connected()){break;}
       }
     }
     conn.close();
-    ss.Send("S "+conn.getStats("HTTP_Dynamic"));
+    ss.Send("S ");
+    ss.Send(conn.getStats("HTTP_Dynamic").c_str());
     ss.flush();
     ss.close();
     #if DEBUG >= 1
diff --git a/src/conn_raw.cpp b/src/conn_raw.cpp
index 860af85c..f231ab8d 100644
--- a/src/conn_raw.cpp
+++ b/src/conn_raw.cpp
@@ -36,12 +36,14 @@ int main(int argc, char  ** argv) {
       lastStats = now;
       std::stringstream st;
       st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
-      S.Send(st.str());
+      std::string tmp = st.str();
+      S.Send(tmp);
     }
   }
   std::stringstream st;
   st << "S localhost RAW " << (time(0) - started) << " " << S.dataDown() << " " << S.dataUp() << "\n";
-  S.Send(st.str());
+  std::string tmp = st.str();
+  S.Send(tmp);
   S.flush();
   S.close();
   return 0;
diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp
index e68c1a5b..4c363a87 100644
--- a/src/conn_rtmp.cpp
+++ b/src/conn_rtmp.cpp
@@ -32,6 +32,9 @@ namespace Connector_RTMP{
   int play_streamid = -1;
   int play_msgtype = -1;
 
+  //generic state keeping
+  bool stream_inited = false;///true if init data for audio/video was sent
+
   Socket::Connection Socket; ///< Socket connected to user
   Socket::Connection SS; ///< Socket connected to server
   std::string streamname; ///< Stream that will be opened
@@ -48,7 +51,6 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
   Socket.setBlocking(false);
   FLV::Tag tag, init_tag;
   DTSC::Stream Strm;
-  bool stream_inited = false;//true if init data for audio/video was sent
 
   while (Socket.Received().size() < 1537 && Socket.connected()){Socket.spool(); usleep(5000);}
   RTMPStream::handshake_in = Socket.Received().substr(0, 1537);
@@ -101,7 +103,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
         unsigned int now = time(0);
         if (now != lastStats){
           lastStats = now;
-          SS.Send("S "+Socket.getStats("RTMP"));
+          SS.Send("S ");
+          SS.Send(Socket.getStats("RTMP").c_str());
         }
       }
       if (SS.spool()){
@@ -167,7 +170,8 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
     }
   }
   Socket.close();
-  SS.Send("S "+Socket.getStats("RTMP"));
+  SS.Send("S ");
+  SS.Send(Socket.getStats("RTMP").c_str());
   SS.flush();
   SS.close();
   #if DEBUG >= 1
@@ -278,7 +282,7 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
               if (counter > 8){
                 sending = true;
                 SS.Send(meta_out.toNetPacked());
-                SS.Send(prebuffer.str());//write buffer
+                SS.Send(prebuffer.str().c_str());//write buffer
                 prebuffer.str("");//clear buffer
                 SS.Send(pack_out.toNetPacked());
               }else{
@@ -451,7 +455,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
         Socket.close();//disconnect user
         return;
       }
-      SS.Send("P "+Socket.getHost()+'\n');
+      SS.Send("P ");
+      SS.Send(Socket.getHost().c_str());
+      SS.Send("\n");
       nostats = true;
       #if DEBUG >= 4
       fprintf(stderr, "Connected to buffer, starting to send data...\n");
@@ -503,7 +509,9 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
     play_msgtype = messagetype;
     play_streamid = stream_id;
     stream_inited = false;
-    SS.Send("seek " + JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString() + "\n");
+    SS.Send("s ");
+    SS.Send(JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString().c_str());
+    SS.Send("\n");
     return;
   }//seek
   
diff --git a/src/player.cpp b/src/player.cpp
index 8e36d434..bbd19192 100644
--- a/src/player.cpp
+++ b/src/player.cpp
@@ -1,6 +1,10 @@
 /// \file player.cpp
 /// Holds all code for the MistPlayer application used for VoD streams.
 
+#if DEBUG >= 4
+#include <iostream>//for std::cerr
+#endif
+
 #include <stdio.h> //for fileno
 #include <sys/time.h>
 #include <mist/dtsc.h>
@@ -23,14 +27,17 @@ int main(int argc, char** argv){
   int playing = 0;
 
   DTSC::File source = DTSC::File(conf.getString("filename"));
-  Socket::Connection in_out = Socket::Connection(fileno(stdin), fileno(stdout));
+  Socket::Connection in_out = Socket::Connection(fileno(stdout), fileno(stdin));
   std::string meta_str = source.getHeader();
-
+  JSON::Value pausemark;
+  pausemark["datatype"] = "pause_marker";
+  pausemark["time"] = (long long int)0;
+  
   //send the header
   {
     in_out.Send("DTSC");
     unsigned int size = htonl(meta_str.size());
-    in_out.Send(std::string((char*)&size, (size_t)4));
+    in_out.Send((char*)&size, 4);
     in_out.Send(meta_str);
   }
 
@@ -40,60 +47,104 @@ int main(int argc, char** argv){
   long long now, timeDiff = 0, lastTime = 0;
 
   while (in_out.connected()){
-    if (in_out.spool() && in_out.Received().find('\n') != std::string::npos){
-      std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n'));
-      in_out.Received().erase(0, in_out.Received().find('\n')+1);
-      if (cmd != ""){
-        switch (cmd[0]){
-          case 'P':{ //Push
-            in_out.close();//pushing to VoD makes no sense
-          } break;
-          case 'S':{ //Stats
-            /// \todo Parse stats command properly.
-            /* Stats(cmd.substr(2)); */
-          } break;
-          case 's':{ //second-seek
-            int second = JSON::Value(cmd.substr(2)).asInt();
-            double keyms = meta["video"]["keyms"].asInt();
-            if (keyms <= 0){keyms = 2000;}
-            source.seek_frame(second / (keyms / 1000.0));
-          } break;
-          case 'f':{ //frame-seek
-            source.seek_frame(JSON::Value(cmd.substr(2)).asInt());
-          } break;
-          case 'p':{ //play
-            playing = -1;
-          } break;
-          case 'o':{ //once-play
-            if (playing < 0){playing = 0;}
-            ++playing;
-          } break;
-          case 'q':{ //quit-playing
-            playing = 0;
-          } break;
+    if (in_out.spool()){
+      while (in_out.Received().find('\n') != std::string::npos){
+        std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n'));
+        in_out.Received().erase(0, in_out.Received().find('\n')+1);
+        if (cmd != ""){
+          switch (cmd[0]){
+            case 'P':{ //Push
+              #if DEBUG >= 4
+              std::cerr << "Received push - ignoring (" << cmd << ")" << std::endl;
+              #endif
+              in_out.close();//pushing to VoD makes no sense
+            } break;
+            case 'S':{ //Stats
+              #if DEBUG >= 4
+              //std::cerr << "Received stats - ignoring (" << cmd << ")" << std::endl;
+              #endif
+              /// \todo Parse stats command properly.
+              /* Stats(cmd.substr(2)); */
+            } break;
+            case 's':{ //second-seek
+              #if DEBUG >= 4
+              std::cerr << "Received ms-seek (" << cmd << ")" << std::endl;
+              #endif
+              int ms = JSON::Value(cmd.substr(2)).asInt();
+              bool ret = source.seek_time(ms);
+              #if DEBUG >= 4
+              std::cerr << "Second-seek completed (time " << ms << "ms) " << ret << std::endl;
+              #endif
+            } break;
+            case 'f':{ //frame-seek
+              #if DEBUG >= 4
+              std::cerr << "Received frame-seek (" << cmd << ")" << std::endl;
+              #endif
+              bool ret = source.seek_frame(JSON::Value(cmd.substr(2)).asInt());
+              #if DEBUG >= 4
+              std::cerr << "Frame-seek completed " << ret << std::endl;
+              #endif
+            } break;
+            case 'p':{ //play
+              #if DEBUG >= 4
+              std::cerr << "Received play" << std::endl;
+              #endif
+              playing = -1;
+              in_out.setBlocking(false);
+            } break;
+            case 'o':{ //once-play
+              #if DEBUG >= 4
+              std::cerr << "Received once-play" << std::endl;
+              #endif
+              if (playing <= 0){playing = 1;}
+              ++playing;
+              in_out.setBlocking(false);
+            } break;
+            case 'q':{ //quit-playing
+              #if DEBUG >= 4
+              std::cerr << "Received quit-playing" << std::endl;
+              #endif
+              playing = 0;
+              in_out.setBlocking(true);
+            } break;
+          }
         }
       }
     }
     if (playing != 0){
       now = getNowMS();
-      if (now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) {
-        std::string packet = source.getPacket();
-        last_pack = JSON::fromDTMI(packet);
-        lastTime = last_pack["time"].asInt();
-        if ((now - timeDiff - lastTime) > 15000 || (now - timeDiff - lastTime < -15000)){
+      if (playing > 0 || now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) {
+        source.seekNext();
+        lastTime = source.getJSON()["time"].asInt();
+        if ((now - timeDiff - lastTime) > 5000 || (now - timeDiff - lastTime < -5000)){
           timeDiff = now - lastTime;
         }
-        //insert proper header for this type of data
-        in_out.Send("DTPD");
-        //insert the packet length
-        unsigned int size = htonl(packet.size());
-        in_out.Send(std::string((char*)&size, (size_t)4));
-        in_out.Send(packet);
+        if (source.getJSON().isMember("keyframe")){
+          if (playing > 0){--playing;}
+          if (playing == 0){
+            #if DEBUG >= 4
+            std::cerr << "Sending pause_marker" << std::endl;
+            #endif
+            pausemark["time"] = (long long int)now;
+            pausemark.toPacked();
+            in_out.Send(pausemark.toNetPacked());
+            in_out.flush();
+            in_out.setBlocking(true);
+          }
+        }
+        if (playing != 0){
+          //insert proper header for this type of data
+          in_out.Send("DTPD");
+          //insert the packet length
+          unsigned int size = htonl(source.getPacket().size());
+          in_out.Send((char*)&size, 4);
+          in_out.Send(source.getPacket());
+        }
       } else {
-        usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000);
+        usleep(std::min(10000LL, lastTime - (now - timeDiff)) * 1000);
       }
-      if (playing > 0){--playing;}
     }
+    usleep(10000);//sleep 10ms
   }
   return 0;
 }