Buffer cpu fixes.

This commit is contained in:
Erik Zandvliet 2013-07-22 14:39:12 +02:00
parent acec4f9253
commit a7c4cc4118
4 changed files with 26 additions and 20 deletions

View file

@ -107,14 +107,16 @@ namespace Buffer {
break; break;
} }
case 't': { case 't': {
newSelect.clear(); if (usr->S.Received().get().size() >= 3){
std::string tmp = usr->S.Received().get().substr(2); newSelect.clear();
while (tmp != ""){ std::string tmp = usr->S.Received().get().substr(2);
newSelect.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str())); while (tmp != ""){
if (tmp.find(' ') != std::string::npos){ newSelect.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str()));
tmp.erase(0,tmp.find(' ')+1); if (tmp.find(' ') != std::string::npos){
}else{ tmp.erase(0,tmp.find(' ')+1);
tmp = ""; }else{
tmp = "";
}
} }
} }
break; break;
@ -153,7 +155,9 @@ namespace Buffer {
} }
} }
} }
Util::sleep(5); //sleep 5ms if (usr->myRing->waiting){
Util::sleep(300); //sleep 5ms
}
} }
} }
usr->Disconnect("Socket closed."); usr->Disconnect("Socket closed.");
@ -221,12 +225,12 @@ namespace Buffer {
thisStream->dropWriteLock(true); thisStream->dropWriteLock(true);
}else{ }else{
thisStream->dropWriteLock(false); thisStream->dropWriteLock(false);
Util::sleep(10); //10ms wait Util::sleep(25); //10ms wait
break; break;
} }
} }
}else{ }else{
Util::sleep(10); //10ms wait Util::sleep(1000); //10ms wait
} }
}else{ }else{
if (connected){ if (connected){

View file

@ -32,7 +32,7 @@ namespace Buffer {
///\brief Do cleanup on delete. ///\brief Do cleanup on delete.
Stream::~Stream(){ Stream::~Stream(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
if (users.size() > 0){ if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if (( * *usersIt).S.connected()){ if (( * *usersIt).S.connected()){
@ -50,7 +50,7 @@ namespace Buffer {
static std::string ret; static std::string ret;
long long int now = Util::epoch(); long long int now = Util::epoch();
unsigned int tot_up = 0, tot_down = 0, tot_count = 0; unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
if (users.size() > 0){ if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
tot_down += ( * *usersIt).curr_down; tot_down += ( * *usersIt).curr_down;
@ -134,7 +134,7 @@ namespace Buffer {
///\param username The name of the user. ///\param username The name of the user.
///\param stats The final statistics to store. ///\param stats The final statistics to store.
void Stream::saveStats(std::string username, Stats & stats){ void Stream::saveStats(std::string username, Stats & stats){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
Storage["curr"][username]["connector"] = stats.connector; Storage["curr"][username]["connector"] = stats.connector;
Storage["curr"][username]["up"] = stats.up; Storage["curr"][username]["up"] = stats.up;
Storage["curr"][username]["down"] = stats.down; Storage["curr"][username]["down"] = stats.down;
@ -148,7 +148,7 @@ namespace Buffer {
///\param stats The final statistics to store. ///\param stats The final statistics to store.
///\param reason The reason for disconnecting. ///\param reason The reason for disconnecting.
void Stream::clearStats(std::string username, Stats & stats, std::string reason){ void Stream::clearStats(std::string username, Stats & stats, std::string reason){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
if (Storage["curr"].isMember(username)){ if (Storage["curr"].isMember(username)){
Storage["curr"].removeMember(username); Storage["curr"].removeMember(username);
#if DEBUG >= 4 #if DEBUG >= 4
@ -223,20 +223,20 @@ namespace Buffer {
///\brief Add a user to the userlist. ///\brief Add a user to the userlist.
///\param newUser The user to be added. ///\param newUser The user to be added.
void Stream::addUser(user * newUser){ void Stream::addUser(user * newUser){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
users.insert(newUser); users.insert(newUser);
} }
///\brief Removes a user to the userlist. ///\brief Removes a user to the userlist.
///\param newUser The user to be removed. ///\param newUser The user to be removed.
void Stream::removeUser(user * oldUser){ void Stream::removeUser(user * oldUser){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
users.erase(oldUser); users.erase(oldUser);
} }
///\brief Disconnects all users. ///\brief Disconnects all users.
void Stream::disconnectUsers(){ void Stream::disconnectUsers(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
(*usersIt)->Disconnect("Stream reset"); (*usersIt)->Disconnect("Stream reset");
} }
@ -244,7 +244,7 @@ namespace Buffer {
///\brief Blocks the thread until new data is available. ///\brief Blocks the thread until new data is available.
void Stream::waitForData(){ void Stream::waitForData(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex); tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
moreData.wait(stats_mutex); moreData.wait(stats_mutex);
} }
} }

View file

@ -68,7 +68,7 @@ namespace Buffer {
DTSC::Stream * Strm; DTSC::Stream * Strm;
std::string waiting_ip; ///< IP address for media push. std::string waiting_ip; ///< IP address for media push.
Socket::Connection ip_input; ///< Connection used for media push. Socket::Connection ip_input; ///< Connection used for media push.
tthread::mutex stats_mutex; ///< Mutex for stats/users modifications. tthread::recursive_mutex stats_mutex; ///< Mutex for stats/users modifications.
std::set<user*> users; ///< All connected users. std::set<user*> users; ///< All connected users.
std::set<user*>::iterator usersIt; ///< Iterator for all connected users. std::set<user*>::iterator usersIt; ///< Iterator for all connected users.
std::string name; ///< Name for this buffer. std::string name; ///< Name for this buffer.

View file

@ -110,6 +110,7 @@ namespace Buffer {
if (Stream::get()->getStream()->isNewest(myRing->b)){ if (Stream::get()->getStream()->isNewest(myRing->b)){
//no next buffer? go in waiting mode. //no next buffer? go in waiting mode.
myRing->waiting = true; myRing->waiting = true;
Stream::get()->dropReadLock();
return false; return false;
} }
myRing->b = Stream::get()->getStream()->getNext(myRing->b, allowedTracks); myRing->b = Stream::get()->getStream()->getNext(myRing->b, allowedTracks);
@ -128,6 +129,7 @@ namespace Buffer {
return false; return false;
} //completed a send } //completed a send
Stream::get()->dropReadLock(); Stream::get()->dropReadLock();
Util::sleep(300);
return true; return true;
} //send } //send