diff --git a/Buffer/main.cpp b/Buffer/main.cpp index 6476a778..14cc1cc3 100644 --- a/Buffer/main.cpp +++ b/Buffer/main.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "../util/dtsc.h" //DTSC support #include "../util/socket.h" //Socket lib #include "../util/json/json.h" @@ -18,6 +19,14 @@ /// Holds all code unique to the Buffer. namespace Buffer{ + /// Gets the current system time in milliseconds. + unsigned int getNowMS(){ + timeval t; + gettimeofday(&t, 0); + return t.tv_sec + t.tv_usec/1000; + }//getNowMS + + Json::Value Storage = Json::Value(Json::objectValue); ///< Global storage of data. ///A simple signal handler that ignores all signals. @@ -185,6 +194,9 @@ namespace Buffer{ char charBuffer[1024*10]; unsigned int charCount; unsigned int stattimer = 0; + unsigned int lastPacketTime = 0;//time in MS last packet was parsed + unsigned int currPacketTime = 0;//time of the last parsed packet (current packet) + unsigned int prevPacketTime = 0;//time of the previously parsed packet (current packet - 1) Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true); @@ -224,10 +236,17 @@ namespace Buffer{ } //invalidate the current buffer if ( (!ip_waiting && std_input.canRead()) || (ip_waiting && ip_input.connected()) ){ - std::cin.read(charBuffer, 1024*10); - charCount = std::cin.gcount(); - inBuffer.append(charBuffer, charCount); - Strm->parsePacket(inBuffer); + //slow down packet receiving to real-time + if ((getNowMS() - lastPacketTime > currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){ + std::cin.read(charBuffer, 1024*10); + charCount = std::cin.gcount(); + inBuffer.append(charBuffer, charCount); + if (Strm->parsePacket(inBuffer)){ + lastPacketTime = getNowMS(); + prevPacketTime = currPacketTime; + currPacketTime = Strm->getTime(); + } + } } //check for new connections, accept them if there are any diff --git a/util/dtsc.cpp b/util/dtsc.cpp index 85e1ce47..a5eb05e3 100644 --- a/util/dtsc.cpp +++ b/util/dtsc.cpp @@ -23,6 +23,12 @@ DTSC::Stream::Stream(unsigned int rbuffers){ buffercount = rbuffers; } +/// Returns the time in milliseconds of the last received packet. +/// This is _not_ the time this packet was received, only the stored time. +unsigned int DTSC::Stream::getTime(){ + return buffers.front().getContentP("time")->NumValue(); +} + /// Attempts to parse a packet from the given std::string buffer. /// Returns true if successful, removing the parsed part from the buffer string. /// Returns false if invalid or not enough data is in the buffer. diff --git a/util/dtsc.h b/util/dtsc.h index 3d690d53..f721e6c4 100644 --- a/util/dtsc.h +++ b/util/dtsc.h @@ -129,6 +129,7 @@ namespace DTSC{ std::string & outPacket(unsigned int num); std::string & outHeader(); Ring * getRing(); + unsigned int getTime(); void dropRing(Ring * ptr); private: std::deque buffers;