diff --git a/src/Makefile.am b/src/Makefile.am index f61b23ce..73fd1e97 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -47,7 +47,7 @@ bin_PROGRAMS+=MistDTSC2MP4 bin_PROGRAMS+=MistDTSC2SRT #buffer folder (MistBuffer, MistPlayer) -MistBuffer_SOURCES=buffer/buffer.cpp buffer/buffer_user.h buffer/buffer_user.cpp buffer/buffer_stream.h buffer/buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION +MistBuffer_SOURCES=buffer/buffer.cpp buffer/buffer_stream.h buffer/buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION MistBuffer_LDADD=$(MIST_LIBS) -lpthread MistPlayer_SOURCES=buffer/player.cpp diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index d13e6e40..3ecb39f2 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -60,9 +60,7 @@ namespace Buffer { #endif conn.setBlocking(true); while (buffer_running && conn.connected()){ - if (conn.spool()){ - thisStream->parsePacket(conn.Received()); - } + thisStream->parsePacket(conn); } if (buffer_running){ thisStream->endStream(); diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index 87e5d778..d426963f 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -3,6 +3,7 @@ #include "buffer_stream.h" #include +#include namespace Buffer { /// Stores the singleton reference. @@ -59,17 +60,14 @@ namespace Buffer { Storage["totals"]["count"] = tot_count; Storage["totals"]["now"] = now; Storage["buffer"] = name; - - Storage["meta"] = metadata; - - if(Storage["meta"].isMember("tracks") && Storage["meta"]["tracks"].size() > 0){ - for(JSON::ObjIter it = Storage["meta"]["tracks"].ObjBegin(); it != Storage["meta"]["tracks"].ObjEnd(); it++){ - it->second.removeMember("keys"); - it->second.removeMember("frags"); - } - //delete empty trackname if present - these are never interesting - Storage["meta"]["tracks"].removeMember(""); + + std::map::iterator it; + for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){ + std::cout << it->second.getIdentifier() << ": " << it->second.firstms << "-" << it->second.lastms << " (" << it->second.keys.size() << ")" << std::endl; } + + Storage["meta"] = metadata.toJSON(); + ret = Storage.toString(); Storage["log"].null(); return ret; @@ -137,6 +135,7 @@ namespace Buffer { Storage["log"][username]["host"] = stats.host; Storage["log"][username]["start"] = Util::epoch() - stats.conntime; } + /// The deletion callback override that will disconnect users /// whom are currently receiving a tag that is being deleted. void Stream::deletionCallback(DTSC::livePos deleting){ @@ -173,12 +172,13 @@ namespace Buffer { } /// parsePacket override that will lock the rw_mutex during parsing. - bool Stream::parsePacket(Socket::Buffer & buffer){ + bool Stream::parsePacket(Socket::Connection & c){ bool ret = false; + if (!c.spool()){ + return ret; + } rw_mutex.lock(); - while (DTSC::Stream::parsePacket(buffer)){ - //TODO: Update metadata with call erik will write - //metadata.netPrepare(); + while (DTSC::Stream::parsePacket(c.Received())){ ret = true; } rw_mutex.unlock(); @@ -218,4 +218,58 @@ namespace Buffer { moreData.wait(stats_mutex); } + ///Creates a new user from a newly connected socket. + ///Also prints "User connected" text to stdout. + ///\param fd A connection to the user. + user::user(Socket::Connection fd, long long ID){ + sID = JSON::Value(ID).asString(); + S = fd; + curr_up = 0; + curr_down = 0; + myRing = 0; + } //constructor + + ///Disconnects the current user. Doesn't do anything if already disconnected. + ///Prints "Disconnected user" to stdout if disconnect took place. + ///\param reason The reason for disconnecting the user. + void user::Disconnect(std::string reason){ + S.close(); + Stream::get()->clearStats(sID, lastStats, reason); + } //Disconnect + + ///Default stats constructor. + ///Should not be used. + Stats::Stats(){ + up = 0; + down = 0; + conntime = 0; + } + + ///Stats constructor reading a string. + ///Reads a stats string and parses it to the internal representation. + ///\param s The string of stats. + Stats::Stats(std::string s){ + size_t f = s.find(' '); + if (f != std::string::npos){ + host = s.substr(0, f); + s.erase(0, f + 1); + } + f = s.find(' '); + if (f != std::string::npos){ + connector = s.substr(0, f); + s.erase(0, f + 1); + } + f = s.find(' '); + if (f != std::string::npos){ + conntime = atoi(s.substr(0, f).c_str()); + s.erase(0, f + 1); + } + f = s.find(' '); + if (f != std::string::npos){ + up = atoi(s.substr(0, f).c_str()); + s.erase(0, f + 1); + down = atoi(s.c_str()); + } + } + } diff --git a/src/buffer/buffer_stream.h b/src/buffer/buffer_stream.h index 22d3c14e..d523b631 100644 --- a/src/buffer/buffer_stream.h +++ b/src/buffer/buffer_stream.h @@ -7,9 +7,41 @@ #include #include #include "tinythread.h" -#include "buffer_user.h" namespace Buffer { + + /// Converts a stats line to up, down, host, connector and conntime values. + class Stats{ + public: + unsigned int up;/// & allowedTracks); private: void deletionCallback(DTSC::livePos deleting); - volatile int readers; ///< Current count of active readers; - volatile int writers; ///< Current count of waiting/active writers. tthread::mutex rw_mutex; ///< Mutex for read/write locking. tthread::condition_variable rw_change; ///< Triggered when reader/writer count changes. static Stream * ref; diff --git a/src/buffer/buffer_user.cpp b/src/buffer/buffer_user.cpp deleted file mode 100644 index cf98119a..00000000 --- a/src/buffer/buffer_user.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/// \file buffer_user.cpp -/// Contains code for buffer users. -#include "buffer_user.h" -#include "buffer_stream.h" - -#include -#include - -namespace Buffer { - ///Creates a new user from a newly connected socket. - ///Also prints "User connected" text to stdout. - ///\param fd A connection to the user. - user::user(Socket::Connection fd, long long ID){ - sID = JSON::Value(ID).asString(); - S = fd; - curr_up = 0; - curr_down = 0; - myRing = 0; - } //constructor - - ///Disconnects the current user. Doesn't do anything if already disconnected. - ///Prints "Disconnected user" to stdout if disconnect took place. - ///\param reason The reason for disconnecting the user. - void user::Disconnect(std::string reason){ - S.close(); - Stream::get()->clearStats(sID, lastStats, reason); - } //Disconnect - - ///Default stats constructor. - ///Should not be used. - Stats::Stats(){ - up = 0; - down = 0; - conntime = 0; - } - - ///Stats constructor reading a string. - ///Reads a stats string and parses it to the internal representation. - ///\param s The string of stats. - Stats::Stats(std::string s){ - size_t f = s.find(' '); - if (f != std::string::npos){ - host = s.substr(0, f); - s.erase(0, f + 1); - } - f = s.find(' '); - if (f != std::string::npos){ - connector = s.substr(0, f); - s.erase(0, f + 1); - } - f = s.find(' '); - if (f != std::string::npos){ - conntime = atoi(s.substr(0, f).c_str()); - s.erase(0, f + 1); - } - f = s.find(' '); - if (f != std::string::npos){ - up = atoi(s.substr(0, f).c_str()); - s.erase(0, f + 1); - down = atoi(s.c_str()); - } - } -} diff --git a/src/buffer/buffer_user.h b/src/buffer/buffer_user.h deleted file mode 100644 index 97f51499..00000000 --- a/src/buffer/buffer_user.h +++ /dev/null @@ -1,42 +0,0 @@ -/// \file buffer_user.h -/// Contains definitions for buffer users. - -#pragma once -#include -#include -#include -#include "tinythread.h" - -namespace Buffer { - /// Converts a stats line to up, down, host, connector and conntime values. - class Stats{ - public: - unsigned int up;///