diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index 999c6d68..b37936af 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -107,14 +107,16 @@ namespace Buffer { break; } case 't': { - newSelect.clear(); - std::string tmp = usr->S.Received().get().substr(2); - while (tmp != ""){ - newSelect.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str())); - if (tmp.find(' ') != std::string::npos){ - tmp.erase(0,tmp.find(' ')+1); - }else{ - tmp = ""; + if (usr->S.Received().get().size() >= 3){ + newSelect.clear(); + std::string tmp = usr->S.Received().get().substr(2); + while (tmp != ""){ + newSelect.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str())); + if (tmp.find(' ') != std::string::npos){ + tmp.erase(0,tmp.find(' ')+1); + }else{ + tmp = ""; + } } } break; @@ -153,7 +155,9 @@ namespace Buffer { } } } - Util::sleep(5); //sleep 5ms + if (usr->myRing->waiting){ + Util::sleep(300); //sleep 5ms + } } } usr->Disconnect("Socket closed."); @@ -221,12 +225,12 @@ namespace Buffer { thisStream->dropWriteLock(true); }else{ thisStream->dropWriteLock(false); - Util::sleep(10); //10ms wait + Util::sleep(25); //10ms wait break; } } }else{ - Util::sleep(10); //10ms wait + Util::sleep(1000); //10ms wait } }else{ if (connected){ diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index ed8c8454..45d7e46c 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -32,7 +32,7 @@ namespace Buffer { ///\brief Do cleanup on delete. Stream::~Stream(){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); if (users.size() > 0){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ if (( * *usersIt).S.connected()){ @@ -50,7 +50,7 @@ namespace Buffer { static std::string ret; long long int now = Util::epoch(); unsigned int tot_up = 0, tot_down = 0, tot_count = 0; - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); if (users.size() > 0){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ tot_down += ( * *usersIt).curr_down; @@ -134,7 +134,7 @@ namespace Buffer { ///\param username The name of the user. ///\param stats The final statistics to store. void Stream::saveStats(std::string username, Stats & stats){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); Storage["curr"][username]["connector"] = stats.connector; Storage["curr"][username]["up"] = stats.up; Storage["curr"][username]["down"] = stats.down; @@ -148,7 +148,7 @@ namespace Buffer { ///\param stats The final statistics to store. ///\param reason The reason for disconnecting. void Stream::clearStats(std::string username, Stats & stats, std::string reason){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); if (Storage["curr"].isMember(username)){ Storage["curr"].removeMember(username); #if DEBUG >= 4 @@ -223,20 +223,20 @@ namespace Buffer { ///\brief Add a user to the userlist. ///\param newUser The user to be added. void Stream::addUser(user * newUser){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); users.insert(newUser); } ///\brief Removes a user to the userlist. ///\param newUser The user to be removed. void Stream::removeUser(user * oldUser){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); users.erase(oldUser); } ///\brief Disconnects all users. void Stream::disconnectUsers(){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ (*usersIt)->Disconnect("Stream reset"); } @@ -244,7 +244,7 @@ namespace Buffer { ///\brief Blocks the thread until new data is available. void Stream::waitForData(){ - tthread::lock_guard guard(stats_mutex); + tthread::lock_guard guard(stats_mutex); moreData.wait(stats_mutex); } } diff --git a/src/buffer/buffer_stream.h b/src/buffer/buffer_stream.h index c3a0ddf3..490a1624 100644 --- a/src/buffer/buffer_stream.h +++ b/src/buffer/buffer_stream.h @@ -68,7 +68,7 @@ namespace Buffer { DTSC::Stream * Strm; std::string waiting_ip; ///< IP address for media push. Socket::Connection ip_input; ///< Connection used for media push. - tthread::mutex stats_mutex; ///< Mutex for stats/users modifications. + tthread::recursive_mutex stats_mutex; ///< Mutex for stats/users modifications. std::set users; ///< All connected users. std::set::iterator usersIt; ///< Iterator for all connected users. std::string name; ///< Name for this buffer. diff --git a/src/buffer/buffer_user.cpp b/src/buffer/buffer_user.cpp index 13d4a660..92bf239c 100644 --- a/src/buffer/buffer_user.cpp +++ b/src/buffer/buffer_user.cpp @@ -110,6 +110,7 @@ namespace Buffer { if (Stream::get()->getStream()->isNewest(myRing->b)){ //no next buffer? go in waiting mode. myRing->waiting = true; + Stream::get()->dropReadLock(); return false; } myRing->b = Stream::get()->getStream()->getNext(myRing->b, allowedTracks); @@ -128,6 +129,7 @@ namespace Buffer { return false; } //completed a send Stream::get()->dropReadLock(); + Util::sleep(300); return true; } //send