Made user thread handling in buffer more sane and more bulletproof.

This commit is contained in:
Thulinma 2013-04-10 23:07:08 +02:00
parent 1a5e0be093
commit 719ac8de7b
5 changed files with 21 additions and 41 deletions

View file

@ -34,7 +34,6 @@ namespace Buffer {
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
while (buffer_running){ while (buffer_running){
Util::sleep(1000); //sleep one second Util::sleep(1000); //sleep one second
Stream::get()->cleanUsers();
if ( !StatsSocket.connected()){ if ( !StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/mist/statistics", true); StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
} }
@ -52,6 +51,7 @@ namespace Buffer {
///\param v_usr The user that is connected. ///\param v_usr The user that is connected.
void handleUser(void * v_usr){ void handleUser(void * v_usr){
user * usr = (user*)v_usr; user * usr = (user*)v_usr;
thisStream->addUser(usr);
#if DEBUG >= 5 #if DEBUG >= 5
std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl; std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl;
#endif #endif
@ -152,6 +152,7 @@ namespace Buffer {
} }
} }
usr->Disconnect("Socket closed."); usr->Disconnect("Socket closed.");
thisStream->removeUser(usr);
} }
///\brief A function running a thread to handle input data through stdin. ///\brief A function running a thread to handle input data through stdin.
@ -277,9 +278,8 @@ namespace Buffer {
//starts a thread for every accepted connection //starts a thread for every accepted connection
incoming = SS.accept(true); incoming = SS.accept(true);
if (incoming.connected()){ if (incoming.connected()){
user * usr_ptr = new user(incoming); tthread::thread thisUser(handleUser, (void *)new user(incoming));
thisStream->addUser(usr_ptr); thisUser.detach();
usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr);
}else{ }else{
Util::sleep(50);//sleep 50ms Util::sleep(50);//sleep 50ms
} }

View file

@ -33,9 +33,11 @@ namespace Buffer {
///\brief Do cleanup on delete. ///\brief Do cleanup on delete.
Stream::~Stream(){ Stream::~Stream(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ if (users.size() > 0){
if (( * *usersIt).S.connected()){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
( * *usersIt).S.close(); if (( * *usersIt).S.connected()){
( * *usersIt).S.close();
}
} }
} }
moreData.notify_all(); moreData.notify_all();
@ -161,33 +163,6 @@ namespace Buffer {
Storage["log"][username]["start"] = Util::epoch() - stats.conntime; Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
} }
///\brief Clean up broken connections
void Stream::cleanUsers(){
bool repeat = false;
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
do{
repeat = false;
if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){
delete *usersIt;
users.erase(usersIt);
repeat = true;
break;
}else{
if ( !( * *usersIt).S.connected()){
if (( * *usersIt).Thread->joinable()){
( * *usersIt).Thread->join();
delete ( * *usersIt).Thread;
( * *usersIt).Thread = 0;
}
}
}
}
}
}while (repeat);
}
///\brief Ask to obtain a write lock. ///\brief Ask to obtain a write lock.
/// ///
/// Blocks until writing is safe. /// Blocks until writing is safe.
@ -255,7 +230,14 @@ namespace Buffer {
///\param newUser The user to be added. ///\param newUser The user to be added.
void Stream::addUser(user * newUser){ void Stream::addUser(user * newUser){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
users.push_back(newUser); users.insert(newUser);
}
///\brief Add a user to the userlist.
///\param newUser The user to be added.
void Stream::removeUser(user * oldUser){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
users.erase(oldUser);
} }
///\brief Blocks the thread until new data is available. ///\brief Blocks the thread until new data is available.

View file

@ -35,8 +35,6 @@ namespace Buffer {
void saveStats(std::string username, Stats & stats); void saveStats(std::string username, Stats & stats);
/// Stores final statistics. /// Stores final statistics.
void clearStats(std::string username, Stats & stats, std::string reason); void clearStats(std::string username, Stats & stats, std::string reason);
/// Cleans up broken connections
void cleanUsers();
/// Blocks until writing is safe. /// Blocks until writing is safe.
void getWriteLock(); void getWriteLock();
/// Drops a previously gotten write lock. /// Drops a previously gotten write lock.
@ -51,6 +49,8 @@ namespace Buffer {
void setName(std::string n); void setName(std::string n);
/// Add a user to the userlist. /// Add a user to the userlist.
void addUser(user * newUser); void addUser(user * newUser);
/// Delete a user from the userlist.
void removeUser(user * oldUser);
/// Blocks the thread until new data is available. /// Blocks the thread until new data is available.
void waitForData(); void waitForData();
/// Cleanup function /// Cleanup function
@ -67,8 +67,8 @@ namespace Buffer {
std::string waiting_ip; ///< IP address for media push. std::string waiting_ip; ///< IP address for media push.
Socket::Connection ip_input; ///< Connection used for media push. Socket::Connection ip_input; ///< Connection used for media push.
tthread::mutex stats_mutex; ///< Mutex for stats/users modifications. tthread::mutex stats_mutex; ///< Mutex for stats/users modifications.
std::vector<user*> users; ///< All connected users. std::set<user*> users; ///< All connected users.
std::vector<user*>::iterator usersIt; ///< Iterator for all connected users. std::set<user*>::iterator usersIt; ///< Iterator for all connected users.
std::string name; ///< Name for this buffer. std::string name; ///< Name for this buffer.
tthread::condition_variable moreData; ///< Triggered when more data becomes available. tthread::condition_variable moreData; ///< Triggered when more data becomes available.
}; };

View file

@ -23,7 +23,6 @@ namespace Buffer {
curr_down = 0; curr_down = 0;
currsend = 0; currsend = 0;
myRing = 0; myRing = 0;
Thread = 0;
gotproperaudio = false; gotproperaudio = false;
lastpointer = 0; lastpointer = 0;
} //constructor } //constructor

View file

@ -26,7 +26,6 @@ namespace Buffer {
///and its connection status. ///and its connection status.
class user{ class user{
public: public:
tthread::thread * Thread; ///< Holds the thread dealing with this user.
DTSC::Ring * myRing; ///< Ring of the buffer for this user. DTSC::Ring * myRing; ///< Ring of the buffer for this user.
int MyNum; ///< User ID of this user. int MyNum; ///< User ID of this user.
std::string MyStr; ///< User ID of this user as a string. std::string MyStr; ///< User ID of this user as a string.