From 77af63ebe44a350c3c9f323834a29fd7ff65e199 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Fri, 30 Mar 2012 13:20:08 +0200
Subject: [PATCH] Added DTSC rate limiter. Closes #12

---
 Buffer/main.cpp | 27 +++++++++++++++++++++++----
 util/dtsc.cpp   |  6 ++++++
 util/dtsc.h     |  1 +
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/Buffer/main.cpp b/Buffer/main.cpp
index 6476a778..14cc1cc3 100644
--- a/Buffer/main.cpp
+++ b/Buffer/main.cpp
@@ -11,6 +11,7 @@
 #include <unistd.h>
 #include <signal.h>
 #include <sstream>
+#include <sys/time.h>
 #include "../util/dtsc.h" //DTSC support
 #include "../util/socket.h" //Socket lib
 #include "../util/json/json.h"
@@ -18,6 +19,14 @@
 /// Holds all code unique to the Buffer.
 namespace Buffer{
 
+  /// Gets the current system time in milliseconds.
+  unsigned int getNowMS(){
+    timeval t;
+    gettimeofday(&t, 0);
+    return t.tv_sec + t.tv_usec/1000;
+  }//getNowMS
+
+
   Json::Value Storage = Json::Value(Json::objectValue); ///< Global storage of data.
   
   ///A simple signal handler that ignores all signals.
@@ -185,6 +194,9 @@ namespace Buffer{
     char charBuffer[1024*10];
     unsigned int charCount;
     unsigned int stattimer = 0;
+    unsigned int lastPacketTime = 0;//time in MS last packet was parsed
+    unsigned int currPacketTime = 0;//time of the last parsed packet (current packet)
+    unsigned int prevPacketTime = 0;//time of the previously parsed packet (current packet - 1)
     Socket::Connection incoming;
     Socket::Connection std_input(fileno(stdin));
     Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
@@ -224,10 +236,17 @@ namespace Buffer{
       }
       //invalidate the current buffer
       if ( (!ip_waiting && std_input.canRead()) || (ip_waiting && ip_input.connected()) ){
-        std::cin.read(charBuffer, 1024*10);
-        charCount = std::cin.gcount();
-        inBuffer.append(charBuffer, charCount);
-        Strm->parsePacket(inBuffer);
+        //slow down packet receiving to real-time
+        if ((getNowMS() - lastPacketTime > currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){
+          std::cin.read(charBuffer, 1024*10);
+          charCount = std::cin.gcount();
+          inBuffer.append(charBuffer, charCount);
+          if (Strm->parsePacket(inBuffer)){
+            lastPacketTime = getNowMS();
+            prevPacketTime = currPacketTime;
+            currPacketTime = Strm->getTime();
+          }
+        }
       }
 
       //check for new connections, accept them if there are any
diff --git a/util/dtsc.cpp b/util/dtsc.cpp
index 85e1ce47..a5eb05e3 100644
--- a/util/dtsc.cpp
+++ b/util/dtsc.cpp
@@ -23,6 +23,12 @@ DTSC::Stream::Stream(unsigned int rbuffers){
   buffercount = rbuffers;
 }
 
+/// Returns the time in milliseconds of the last received packet.
+/// This is _not_ the time this packet was received, only the stored time.
+unsigned int DTSC::Stream::getTime(){
+  return buffers.front().getContentP("time")->NumValue();
+}
+
 /// Attempts to parse a packet from the given std::string buffer.
 /// Returns true if successful, removing the parsed part from the buffer string.
 /// Returns false if invalid or not enough data is in the buffer.
diff --git a/util/dtsc.h b/util/dtsc.h
index 3d690d53..f721e6c4 100644
--- a/util/dtsc.h
+++ b/util/dtsc.h
@@ -129,6 +129,7 @@ namespace DTSC{
       std::string & outPacket(unsigned int num);
       std::string & outHeader();
       Ring * getRing();
+      unsigned int getTime();
       void dropRing(Ring * ptr);
   private:
       std::deque<DTSC::DTMI> buffers;