From 1a5e0be093e3921b058021ba68b1ca0b728b298b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 10 Apr 2013 22:19:25 +0200 Subject: [PATCH] Fixed Buffer not shutting down properly in some cases. --- src/buffer/buffer.cpp | 47 +++++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index 97a087f3..821fb1be 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "buffer_stream.h" #include @@ -23,14 +24,6 @@ namespace Buffer { Stream * thisStream = 0; Socket::Server SS; ///< The server socket. - ///\brief Gets the current system time in milliseconds. - ///\return The current timestamp in milliseconds. - long long int getNowMS(){ - timeval t; - gettimeofday( &t, 0); - return t.tv_sec * 1000 + t.tv_usec / 1000; - } //getNowMS - ///\brief A function running in a thread to send all statistics. ///\param empty A null pointer. void handleStats(void * empty){ @@ -40,7 +33,7 @@ namespace Buffer { std::string double_newline = "\n\n"; Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); while (buffer_running){ - usleep(1000000); //sleep one second + Util::sleep(1000); //sleep one second Stream::get()->cleanUsers(); if ( !StatsSocket.connected()){ StatsSocket = Socket::Connection("/tmp/mist/statistics", true); @@ -71,7 +64,7 @@ namespace Buffer { Stream::get()->dropReadLock(); while (usr->S.connected()){ - usleep(5000); //sleep 5ms + Util::sleep(5); //sleep 5ms if ( !usr->myRing->playCount || !usr->Send()){ if (usr->myRing->updated){ Stream::get()->getReadLock(); @@ -178,7 +171,7 @@ namespace Buffer { while (std::cin.good() && buffer_running){ //slow down packet receiving to real-time - now = getNowMS(); + now = Util::getMS(); if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ thisStream->getWriteLock(); if (thisStream->getStream()->parsePacket(inBuffer)){ @@ -195,7 +188,7 @@ namespace Buffer { inBuffer.append(charBuffer, charCount); } }else{ - usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000); + Util::sleep(std::min(15LL, lastPacket - (now - timeDiff))); } } buffer_running = false; @@ -220,14 +213,14 @@ namespace Buffer { }else{ thisStream->dropWriteLock(false); packed_parsed = false; - usleep(1000); //1ms wait + Util::sleep(1); //1ms wait } }while (packed_parsed); }else{ - usleep(1000); //1ms wait + Util::sleep(1); //1ms wait } }else{ - usleep(1000000); //1s wait + Util::sleep(1000); //1s wait } } } @@ -257,6 +250,7 @@ namespace Buffer { perror("Could not create stream socket"); return 1; } + SS.setBlocking(false); conf.activate(); thisStream = Stream::get(); thisStream->setName(name); @@ -264,17 +258,18 @@ namespace Buffer { Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); - tthread::thread * StatsThread = 0; if (conf.getBool("reportstats")){ - StatsThread = new tthread::thread(handleStats, 0); + tthread::thread StatsThread(handleStats, 0); + StatsThread.detach(); } - tthread::thread * StdinThread = 0; std::string await_ip = conf.getString("awaiting_ip"); if (await_ip == ""){ - StdinThread = new tthread::thread(handleStdin, 0); + tthread::thread StdinThread(handleStdin, 0); + StdinThread.detach(); }else{ thisStream->setWaitingIP(await_ip); - StdinThread = new tthread::thread(handlePushin, 0); + tthread::thread StdinThread(handlePushin, 0); + StdinThread.detach(); } while (buffer_running && SS.connected() && conf.is_active){ @@ -285,6 +280,8 @@ namespace Buffer { user * usr_ptr = new user(incoming); thisStream->addUser(usr_ptr); usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr); + }else{ + Util::sleep(50);//sleep 50ms } } //main loop @@ -292,22 +289,14 @@ namespace Buffer { buffer_running = false; std::cout << "Buffer shutting down" << std::endl; SS.close(); - if (StatsThread){ - StatsThread->join(); - delete StatsThread; - } if (thisStream->getIPInput().connected()){ thisStream->getIPInput().close(); } - StdinThread->join(); - delete StdinThread; delete thisStream; return 0; } -} -; -//Buffer namespace +} //Buffer namespace ///\brief Entry point for Buffer, simply calls Buffer::Start(). int main(int argc, char ** argv){