From 1e2f1602f837b8e5dde06918939d05faa47412f5 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Thu, 28 Mar 2013 15:42:46 +0100 Subject: [PATCH] Namespace refactor and documentation in the separate buffer files. --- src/buffer/buffer_stream.cpp | 448 ++++++++++++++++++----------------- src/buffer/buffer_stream.h | 2 +- src/buffer/buffer_user.cpp | 258 ++++++++++---------- 3 files changed, 376 insertions(+), 332 deletions(-) diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index ccbec971..3419c711 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -4,237 +4,263 @@ #include "buffer_stream.h" #include -/// Stores the globally equal reference. -Buffer::Stream * Buffer::Stream::ref = 0; +namespace Buffer { + ///\brief Stores the singleton reference. + Stream * Stream::ref = 0; -/// Returns a globally equal reference to this class. -Buffer::Stream * Buffer::Stream::get(){ - static tthread::mutex creator; - if (ref == 0){ - //prevent creating two at the same time - creator.lock(); + ///\brief Returns a reference to the singleton instance of this class. + ///\return A reference to the class. + Stream * Stream::get(){ + static tthread::mutex creator; if (ref == 0){ - ref = new Stream(); + //prevent creating two at the same time + creator.lock(); + if (ref == 0){ + ref = new Stream(); + } + creator.unlock(); } - creator.unlock(); + return ref; } - return ref; -} -/// Creates a new DTSC::Stream object, private function so only one instance can exist. -Buffer::Stream::Stream(){ - Strm = new DTSC::Stream(5); - readers = 0; - writers = 0; -} - -/// Do cleanup on delete. -Buffer::Stream::~Stream(){ - tthread::lock_guard guard(stats_mutex); - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (( * *usersIt).S.connected()){ - ( * *usersIt).S.close(); - } + ///\brief Creates a new DTSC::Stream object, private function so only one instance can exist. + Stream::Stream(){ + Strm = new DTSC::Stream(5); + readers = 0; + writers = 0; } - moreData.notify_all(); - delete Strm; -} -/// Calculate and return the current statistics in JSON format. -std::string & Buffer::Stream::getStats(){ - static std::string ret; - long long int now = Util::epoch(); - unsigned int tot_up = 0, tot_down = 0, tot_count = 0; - tthread::lock_guard guard(stats_mutex); - if (users.size() > 0){ + ///\brief Do cleanup on delete. + Stream::~Stream(){ + tthread::lock_guard guard(stats_mutex); for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - tot_down += ( * *usersIt).curr_down; - tot_up += ( * *usersIt).curr_up; - tot_count++; + if (( * *usersIt).S.connected()){ + ( * *usersIt).S.close(); + } } + moreData.notify_all(); + delete Strm; } - Storage["totals"]["down"] = tot_down; - Storage["totals"]["up"] = tot_up; - Storage["totals"]["count"] = tot_count; - Storage["totals"]["now"] = now; - Storage["buffer"] = name; - Storage["meta"] = Strm->metadata; - if (Storage["meta"].isMember("audio")){ - Storage["meta"]["audio"].removeMember("init"); - } - if (Storage["meta"].isMember("video")){ - Storage["meta"]["video"].removeMember("init"); - } - ret = Storage.toString(); - Storage["log"].null(); - return ret; -} -/// Get a new DTSC::Ring object for a user. -DTSC::Ring * Buffer::Stream::getRing(){ - return Strm->getRing(); -} - -/// Drop a DTSC::Ring object. -void Buffer::Stream::dropRing(DTSC::Ring * ring){ - Strm->dropRing(ring); -} - -/// Get the (constant) header data of this stream. -std::string & Buffer::Stream::getHeader(){ - return Strm->outHeader(); -} - -/// Set the IP address to accept push data from. -void Buffer::Stream::setWaitingIP(std::string ip){ - waiting_ip = ip; -} - -/// Check if this is the IP address to accept push data from. -bool Buffer::Stream::checkWaitingIP(std::string ip){ - if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){ - return true; - }else{ - std::cout << ip << " != (::ffff:)" << waiting_ip << std::endl; - return false; - } -} - -/// Sets the current socket for push data. -bool Buffer::Stream::setInput(Socket::Connection S){ - if (ip_input.connected()){ - return false; - }else{ - ip_input = S; - return true; - } -} - -/// Gets the current socket for push data. -Socket::Connection & Buffer::Stream::getIPInput(){ - return ip_input; -} - -/// Stores intermediate statistics. -void Buffer::Stream::saveStats(std::string username, Stats & stats){ - tthread::lock_guard guard(stats_mutex); - Storage["curr"][username]["connector"] = stats.connector; - Storage["curr"][username]["up"] = stats.up; - Storage["curr"][username]["down"] = stats.down; - Storage["curr"][username]["conntime"] = stats.conntime; - Storage["curr"][username]["host"] = stats.host; - Storage["curr"][username]["start"] = Util::epoch() - stats.conntime; -} - -/// Stores final statistics. -void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){ - tthread::lock_guard guard(stats_mutex); - if (Storage["curr"].isMember(username)){ - Storage["curr"].removeMember(username); -#if DEBUG >= 4 - std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and " - << stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl; -#endif - } - Storage["log"][username]["connector"] = stats.connector; - Storage["log"][username]["up"] = stats.up; - Storage["log"][username]["down"] = stats.down; - Storage["log"][username]["conntime"] = stats.conntime; - Storage["log"][username]["host"] = stats.host; - Storage["log"][username]["start"] = Util::epoch() - stats.conntime; -} - -/// Cleans up broken connections -void Buffer::Stream::cleanUsers(){ - bool repeat = false; - tthread::lock_guard guard(stats_mutex); - do{ - repeat = false; + ///\brief Calculate and return the current statistics. + ///\return The current statistics in JSON format. + std::string & Stream::getStats(){ + static std::string ret; + long long int now = Util::epoch(); + unsigned int tot_up = 0, tot_down = 0, tot_count = 0; + tthread::lock_guard guard(stats_mutex); if (users.size() > 0){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){ - delete *usersIt; - users.erase(usersIt); - repeat = true; - break; - }else{ - if ( !( * *usersIt).S.connected()){ - if (( * *usersIt).Thread->joinable()){ - ( * *usersIt).Thread->join(); - delete ( * *usersIt).Thread; - ( * *usersIt).Thread = 0; + tot_down += ( * *usersIt).curr_down; + tot_up += ( * *usersIt).curr_up; + tot_count++; + } + } + Storage["totals"]["down"] = tot_down; + Storage["totals"]["up"] = tot_up; + Storage["totals"]["count"] = tot_count; + Storage["totals"]["now"] = now; + Storage["buffer"] = name; + Storage["meta"] = Strm->metadata; + if (Storage["meta"].isMember("audio")){ + Storage["meta"]["audio"].removeMember("init"); + } + if (Storage["meta"].isMember("video")){ + Storage["meta"]["video"].removeMember("init"); + } + ret = Storage.toString(); + Storage["log"].null(); + return ret; + } + + ///\brief Get a new DTSC::Ring object for a user. + ///\return A new DTSC::Ring object. + DTSC::Ring * Stream::getRing(){ + return Strm->getRing(); + } + + ///\brief Drop a DTSC::Ring object. + ///\param ring The DTSC::Ring to be invalidated. + void Stream::dropRing(DTSC::Ring * ring){ + Strm->dropRing(ring); + } + + ///\brief Get the (constant) header data of this stream. + ///\return A reference to the header data of the stream. + std::string & Stream::getHeader(){ + return Strm->outHeader(); + } + + ///\brief Set the IP address to accept push data from. + ///\param ip The new IP to accept push data from. + void Stream::setWaitingIP(std::string ip){ + waiting_ip = ip; + } + + ///\brief Check if this is the IP address to accept push data from. + ///\param ip The IP address to check. + ///\return True if it is the correct address, false otherwise. + bool Stream::checkWaitingIP(std::string ip){ + if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){ + return true; + }else{ + std::cout << ip << " != (::ffff:)" << waiting_ip << std::endl; + return false; + } + } + + ///\brief Sets the current socket for push data. + ///\param S The new socket for accepting push data. + ///\return True if succesful, false otherwise. + bool Stream::setInput(Socket::Connection S){ + if (ip_input.connected()){ + return false; + }else{ + ip_input = S; + return true; + } + } + + ///\brief Gets the current socket for push data. + ///\return A reference to the push socket. + Socket::Connection & Stream::getIPInput(){ + return ip_input; + } + + ///\brief Stores intermediate statistics. + ///\param username The name of the user. + ///\param stats The final statistics to store. + void Stream::saveStats(std::string username, Stats & stats){ + tthread::lock_guard guard(stats_mutex); + Storage["curr"][username]["connector"] = stats.connector; + Storage["curr"][username]["up"] = stats.up; + Storage["curr"][username]["down"] = stats.down; + Storage["curr"][username]["conntime"] = stats.conntime; + Storage["curr"][username]["host"] = stats.host; + Storage["curr"][username]["start"] = Util::epoch() - stats.conntime; + } + + ///\brief Stores final statistics. + ///\param username The name of the user. + ///\param stats The final statistics to store. + ///\param reason The reason for disconnecting. + void Stream::clearStats(std::string username, Stats & stats, std::string reason){ + tthread::lock_guard guard(stats_mutex); + if (Storage["curr"].isMember(username)){ + Storage["curr"].removeMember(username); + #if DEBUG >= 4 + std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and " + << stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl; + #endif + } + Storage["log"][username]["connector"] = stats.connector; + Storage["log"][username]["up"] = stats.up; + Storage["log"][username]["down"] = stats.down; + Storage["log"][username]["conntime"] = stats.conntime; + Storage["log"][username]["host"] = stats.host; + Storage["log"][username]["start"] = Util::epoch() - stats.conntime; + } + + ///\brief Clean up broken connections + void Stream::cleanUsers(){ + bool repeat = false; + tthread::lock_guard guard(stats_mutex); + do{ + repeat = false; + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){ + delete *usersIt; + users.erase(usersIt); + repeat = true; + break; + }else{ + if ( !( * *usersIt).S.connected()){ + if (( * *usersIt).Thread->joinable()){ + ( * *usersIt).Thread->join(); + delete ( * *usersIt).Thread; + ( * *usersIt).Thread = 0; + } } } } } - } - }while (repeat); -} - -/// Blocks until writing is safe. -void Buffer::Stream::getWriteLock(){ - rw_mutex.lock(); - writers++; - while (writers != 1 && readers != 0){ - rw_change.wait(rw_mutex); + }while (repeat); } - rw_mutex.unlock(); -} -/// Drops a previously gotten write lock. -void Buffer::Stream::dropWriteLock(bool newpackets_available){ - if (newpackets_available){ - if (Strm->getPacket(0).isMember("keyframe")){ - stats_mutex.lock(); - Strm->updateHeaders(); - stats_mutex.unlock(); + ///\brief Ask to obtain a write lock. + /// + /// Blocks until writing is safe. + void Stream::getWriteLock(){ + rw_mutex.lock(); + writers++; + while (writers != 1 && readers != 0){ + rw_change.wait(rw_mutex); + } + rw_mutex.unlock(); + } + + ///\brief Drops a previously obtained write lock. + ///\param newPacketsAvailable Whether new packets are available to update the index. + void Stream::dropWriteLock(bool newPacketsAvailable){ + if (newPacketsAvailable){ + if (Strm->getPacket(0).isMember("keyframe")){ + stats_mutex.lock(); + Strm->updateHeaders(); + stats_mutex.unlock(); + } + } + rw_mutex.lock(); + writers--; + rw_mutex.unlock(); + rw_change.notify_all(); + if (newPacketsAvailable){ + moreData.notify_all(); } } - rw_mutex.lock(); - writers--; - rw_mutex.unlock(); - rw_change.notify_all(); - if (newpackets_available){ - moreData.notify_all(); + + ///\brief Ask to obtain a read lock. + /// + ///Blocks until reading is safe. + void Stream::getReadLock(){ + rw_mutex.lock(); + while (writers > 0){ + rw_change.wait(rw_mutex); + } + readers++; + rw_mutex.unlock(); + } + + ///\brief Drops a previously obtained read lock. + void Stream::dropReadLock(){ + rw_mutex.lock(); + readers--; + rw_mutex.unlock(); + rw_change.notify_all(); + } + + ///\brief Retrieves a reference to the DTSC::Stream + ///\return A reference to the used DTSC::Stream + DTSC::Stream * Stream::getStream(){ + return Strm; + } + + ///\brief Sets the buffer name. + ///\param n The new name of the buffer. + void Stream::setName(std::string n){ + name = n; + } + + ///\brief Add a user to the userlist. + ///\param newUser The user to be added. + void Stream::addUser(user * newUser){ + tthread::lock_guard guard(stats_mutex); + users.push_back(newUser); + } + + ///\brief Blocks the thread until new data is available. + void Stream::waitForData(){ + tthread::lock_guard guard(stats_mutex); + moreData.wait(stats_mutex); } } - -/// Blocks until reading is safe. -void Buffer::Stream::getReadLock(){ - rw_mutex.lock(); - while (writers > 0){ - rw_change.wait(rw_mutex); - } - readers++; - rw_mutex.unlock(); -} - -/// Drops a previously gotten read lock. -void Buffer::Stream::dropReadLock(){ - rw_mutex.lock(); - readers--; - rw_mutex.unlock(); - rw_change.notify_all(); -} - -/// Retrieves a reference to the DTSC::Stream -DTSC::Stream * Buffer::Stream::getStream(){ - return Strm; -} - -/// Sets the buffer name. -void Buffer::Stream::setName(std::string n){ - name = n; -} - -/// Add a user to the userlist. -void Buffer::Stream::addUser(user * new_user){ - tthread::lock_guard guard(stats_mutex); - users.push_back(new_user); -} - -/// Blocks the thread until new data is available. -void Buffer::Stream::waitForData(){ - tthread::lock_guard guard(stats_mutex); - moreData.wait(stats_mutex); -} diff --git a/src/buffer/buffer_stream.h b/src/buffer/buffer_stream.h index e0f58133..07286277 100644 --- a/src/buffer/buffer_stream.h +++ b/src/buffer/buffer_stream.h @@ -50,7 +50,7 @@ namespace Buffer { /// Sets the buffer name. void setName(std::string n); /// Add a user to the userlist. - void addUser(user * new_user); + void addUser(user * newUser); /// Blocks the thread until new data is available. void waitForData(); /// Cleanup function diff --git a/src/buffer/buffer_user.cpp b/src/buffer/buffer_user.cpp index 5e723da8..57011556 100644 --- a/src/buffer/buffer_user.cpp +++ b/src/buffer/buffer_user.cpp @@ -1,73 +1,116 @@ /// \file buffer_user.cpp /// Contains code for buffer users. - #include "buffer_user.h" #include "buffer_stream.h" + #include -#include //for atoi and friends -int Buffer::user::UserCount = 0; +#include -/// Creates a new user from a newly connected socket. -/// Also prints "User connected" text to stdout. -Buffer::user::user(Socket::Connection fd){ - S = fd; - MyNum = UserCount++; - std::stringstream st; - st << MyNum; - MyStr = st.str(); - curr_up = 0; - curr_down = 0; - currsend = 0; - myRing = 0; - Thread = 0; - gotproperaudio = false; - lastpointer = 0; -} //constructor +namespace Buffer { + int user::UserCount = 0; -/// Drops held DTSC::Ring class, if one is held. -Buffer::user::~user(){ - Stream::get()->dropRing(myRing); -} //destructor + ///\brief Creates a new user from a newly connected socket. + /// + ///Also prints "User connected" text to stdout. + ///\param fd A connection to the user. + user::user(Socket::Connection fd){ + S = fd; + MyNum = UserCount++; + std::stringstream st; + st << MyNum; + MyStr = st.str(); + curr_up = 0; + curr_down = 0; + currsend = 0; + myRing = 0; + Thread = 0; + gotproperaudio = false; + lastpointer = 0; + } //constructor -/// Disconnects the current user. Doesn't do anything if already disconnected. -/// Prints "Disconnected user" to stdout if disconnect took place. -void Buffer::user::Disconnect(std::string reason){ - if (S.connected()){ - S.close(); - } - Stream::get()->clearStats(MyStr, lastStats, reason); -} //Disconnect + ///\brief Drops held DTSC::Ring class, if one is held. + user::~user(){ + Stream::get()->dropRing(myRing); + } //destructor -/// Tries to send the current buffer, returns true if success, false otherwise. -/// Has a side effect of dropping the connection if send will never complete. -bool Buffer::user::doSend(const char * ptr, int len){ - if ( !len){ - return true; - } //do not do empty sends - int r = S.iwrite(ptr + currsend, len - currsend); - if (r <= 0){ - if (errno == EWOULDBLOCK){ + ///\brief Disconnects the current user. Doesn't do anything if already disconnected. + /// + ///Prints "Disconnected user" to stdout if disconnect took place. + ///\param reason The reason for disconnecting the user. + void user::Disconnect(std::string reason){ + if (S.connected()){ + S.close(); + } + Stream::get()->clearStats(MyStr, lastStats, reason); + } //Disconnect + + ///\brief Tries to send data to the user. + /// + ///Has a side effect of dropping the connection if send will never complete. + ///\param ptr A pointer to the data that is to be sent. + ///\param len The amount of bytes to be sent from this pointer. + ///\return True if len bytes are sent, false otherwise. + bool user::doSend(const char * ptr, int len){ + if ( !len){ + return true; + } //do not do empty sends + int r = S.iwrite(ptr + currsend, len - currsend); + if (r <= 0){ + if (errno == EWOULDBLOCK){ + return false; + } + Disconnect(S.getError()); return false; } - Disconnect(S.getError()); - return false; - } - currsend += r; - return (currsend == len); -} //doSend + currsend += r; + return (currsend == len); + } //doSend -/// Try to send data to this user. Disconnects if any problems occur. -bool Buffer::user::Send(){ - if ( !myRing){ - return false; - } //no ring! - if ( !S.connected()){ - return false; - } //cancel if not connected - if (myRing->waiting){ - Stream::get()->waitForData(); - if ( !myRing->waiting){ - Stream::get()->getReadLock(); + ///\brief Try to send the current buffer. + /// + ///\return True if the send was succesful, false otherwise. + bool user::Send(){ + if ( !myRing){ + return false; + } //no ring! + if ( !S.connected()){ + return false; + } //cancel if not connected + if (myRing->waiting){ + Stream::get()->waitForData(); + if ( !myRing->waiting){ + Stream::get()->getReadLock(); + if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){ + myRing->playCount--; + if ( !myRing->playCount){ + JSON::Value pausemark; + pausemark["datatype"] = "pause_marker"; + pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt(); + pausemark.toPacked(); + S.SendNow(pausemark.toNetPacked()); + } + } + Stream::get()->dropReadLock(); + } + return false; + } //still waiting for next buffer? + if (myRing->starved){ + //if corrupt data, warn and get new DTSC::Ring + std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; + Stream::get()->dropRing(myRing); + myRing = Stream::get()->getRing(); + return false; + } + //try to complete a send + Stream::get()->getReadLock(); + if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){ + //switch to next buffer + currsend = 0; + if (myRing->b <= 0){ + myRing->waiting = true; + return false; + } //no next buffer? go in waiting mode. + myRing->b--; if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){ myRing->playCount--; if ( !myRing->playCount){ @@ -79,71 +122,46 @@ bool Buffer::user::Send(){ } } Stream::get()->dropReadLock(); - } - return false; - } //still waiting for next buffer? - if (myRing->starved){ - //if corrupt data, warn and get new DTSC::Ring - std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; - Stream::get()->dropRing(myRing); - myRing = Stream::get()->getRing(); - return false; - } - //try to complete a send - Stream::get()->getReadLock(); - if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){ - //switch to next buffer - currsend = 0; - if (myRing->b <= 0){ - myRing->waiting = true; return false; - } //no next buffer? go in waiting mode. - myRing->b--; - if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){ - myRing->playCount--; - if ( !myRing->playCount){ - JSON::Value pausemark; - pausemark["datatype"] = "pause_marker"; - pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt(); - pausemark.toPacked(); - S.SendNow(pausemark.toNetPacked()); - } - } + } //completed a send Stream::get()->dropReadLock(); - return false; - } //completed a send - Stream::get()->dropReadLock(); - return true; -} //send + return true; + } //send -/// Default constructor - should not be in use. -Buffer::Stats::Stats(){ - up = 0; - down = 0; - conntime = 0; -} + ///\brief Default stats constructor. + /// + ///Should not be used. + Stats::Stats(){ + up = 0; + down = 0; + conntime = 0; + } -/// Reads a stats string and parses it to the internal representation. -Buffer::Stats::Stats(std::string s){ - size_t f = s.find(' '); - if (f != std::string::npos){ - host = s.substr(0, f); - s.erase(0, f + 1); - } - f = s.find(' '); - if (f != std::string::npos){ - connector = s.substr(0, f); - s.erase(0, f + 1); - } - f = s.find(' '); - if (f != std::string::npos){ - conntime = atoi(s.substr(0, f).c_str()); - s.erase(0, f + 1); - } - f = s.find(' '); - if (f != std::string::npos){ - up = atoi(s.substr(0, f).c_str()); - s.erase(0, f + 1); - down = atoi(s.c_str()); + ///\brief Stats constructor reading a string. + /// + ///Reads a stats string and parses it to the internal representation. + ///\param s The string of stats. + Stats::Stats(std::string s){ + size_t f = s.find(' '); + if (f != std::string::npos){ + host = s.substr(0, f); + s.erase(0, f + 1); + } + f = s.find(' '); + if (f != std::string::npos){ + connector = s.substr(0, f); + s.erase(0, f + 1); + } + f = s.find(' '); + if (f != std::string::npos){ + conntime = atoi(s.substr(0, f).c_str()); + s.erase(0, f + 1); + } + f = s.find(' '); + if (f != std::string::npos){ + up = atoi(s.substr(0, f).c_str()); + s.erase(0, f + 1); + down = atoi(s.c_str()); + } } }