diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index 821fb1be..c847d412 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -34,7 +34,6 @@ namespace Buffer { Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); while (buffer_running){ Util::sleep(1000); //sleep one second - Stream::get()->cleanUsers(); if ( !StatsSocket.connected()){ StatsSocket = Socket::Connection("/tmp/mist/statistics", true); } @@ -52,6 +51,7 @@ namespace Buffer { ///\param v_usr The user that is connected. void handleUser(void * v_usr){ user * usr = (user*)v_usr; + thisStream->addUser(usr); #if DEBUG >= 5 std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl; #endif @@ -152,6 +152,7 @@ namespace Buffer { } } usr->Disconnect("Socket closed."); + thisStream->removeUser(usr); } ///\brief A function running a thread to handle input data through stdin. @@ -277,9 +278,8 @@ namespace Buffer { //starts a thread for every accepted connection incoming = SS.accept(true); if (incoming.connected()){ - user * usr_ptr = new user(incoming); - thisStream->addUser(usr_ptr); - usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr); + tthread::thread thisUser(handleUser, (void *)new user(incoming)); + thisUser.detach(); }else{ Util::sleep(50);//sleep 50ms } diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index 3419c711..52f941e1 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -33,9 +33,11 @@ namespace Buffer { ///\brief Do cleanup on delete. Stream::~Stream(){ tthread::lock_guard guard(stats_mutex); - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (( * *usersIt).S.connected()){ - ( * *usersIt).S.close(); + if (users.size() > 0){ + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if (( * *usersIt).S.connected()){ + ( * *usersIt).S.close(); + } } } moreData.notify_all(); @@ -161,33 +163,6 @@ namespace Buffer { Storage["log"][username]["start"] = Util::epoch() - stats.conntime; } - ///\brief Clean up broken connections - void Stream::cleanUsers(){ - bool repeat = false; - tthread::lock_guard guard(stats_mutex); - do{ - repeat = false; - if (users.size() > 0){ - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){ - delete *usersIt; - users.erase(usersIt); - repeat = true; - break; - }else{ - if ( !( * *usersIt).S.connected()){ - if (( * *usersIt).Thread->joinable()){ - ( * *usersIt).Thread->join(); - delete ( * *usersIt).Thread; - ( * *usersIt).Thread = 0; - } - } - } - } - } - }while (repeat); - } - ///\brief Ask to obtain a write lock. /// /// Blocks until writing is safe. @@ -255,9 +230,16 @@ namespace Buffer { ///\param newUser The user to be added. void Stream::addUser(user * newUser){ tthread::lock_guard guard(stats_mutex); - users.push_back(newUser); + users.insert(newUser); } + ///\brief Add a user to the userlist. + ///\param newUser The user to be added. + void Stream::removeUser(user * oldUser){ + tthread::lock_guard guard(stats_mutex); + users.erase(oldUser); + } + ///\brief Blocks the thread until new data is available. void Stream::waitForData(){ tthread::lock_guard guard(stats_mutex); diff --git a/src/buffer/buffer_stream.h b/src/buffer/buffer_stream.h index 07286277..bd086f3d 100644 --- a/src/buffer/buffer_stream.h +++ b/src/buffer/buffer_stream.h @@ -35,8 +35,6 @@ namespace Buffer { void saveStats(std::string username, Stats & stats); /// Stores final statistics. void clearStats(std::string username, Stats & stats, std::string reason); - /// Cleans up broken connections - void cleanUsers(); /// Blocks until writing is safe. void getWriteLock(); /// Drops a previously gotten write lock. @@ -51,6 +49,8 @@ namespace Buffer { void setName(std::string n); /// Add a user to the userlist. void addUser(user * newUser); + /// Delete a user from the userlist. + void removeUser(user * oldUser); /// Blocks the thread until new data is available. void waitForData(); /// Cleanup function @@ -67,8 +67,8 @@ namespace Buffer { std::string waiting_ip; ///< IP address for media push. Socket::Connection ip_input; ///< Connection used for media push. tthread::mutex stats_mutex; ///< Mutex for stats/users modifications. - std::vector users; ///< All connected users. - std::vector::iterator usersIt; ///< Iterator for all connected users. + std::set users; ///< All connected users. + std::set::iterator usersIt; ///< Iterator for all connected users. std::string name; ///< Name for this buffer. tthread::condition_variable moreData; ///< Triggered when more data becomes available. }; diff --git a/src/buffer/buffer_user.cpp b/src/buffer/buffer_user.cpp index 57011556..add7bb64 100644 --- a/src/buffer/buffer_user.cpp +++ b/src/buffer/buffer_user.cpp @@ -23,7 +23,6 @@ namespace Buffer { curr_down = 0; currsend = 0; myRing = 0; - Thread = 0; gotproperaudio = false; lastpointer = 0; } //constructor diff --git a/src/buffer/buffer_user.h b/src/buffer/buffer_user.h index 81901368..7e252a42 100644 --- a/src/buffer/buffer_user.h +++ b/src/buffer/buffer_user.h @@ -26,7 +26,6 @@ namespace Buffer { ///and its connection status. class user{ public: - tthread::thread * Thread; ///< Holds the thread dealing with this user. DTSC::Ring * myRing; ///< Ring of the buffer for this user. int MyNum; ///< User ID of this user. std::string MyStr; ///< User ID of this user as a string.