Fixed Buffer not shutting down properly in some cases.

This commit is contained in:
Thulinma 2013-04-10 22:19:25 +02:00
parent e242965266
commit 1a5e0be093

View file

@ -13,6 +13,7 @@
#include <sstream> #include <sstream>
#include <sys/time.h> #include <sys/time.h>
#include <mist/config.h> #include <mist/config.h>
#include <mist/timing.h>
#include "buffer_stream.h" #include "buffer_stream.h"
#include <mist/stream.h> #include <mist/stream.h>
@ -23,14 +24,6 @@ namespace Buffer {
Stream * thisStream = 0; Stream * thisStream = 0;
Socket::Server SS; ///< The server socket. Socket::Server SS; ///< The server socket.
///\brief Gets the current system time in milliseconds.
///\return The current timestamp in milliseconds.
long long int getNowMS(){
timeval t;
gettimeofday( &t, 0);
return t.tv_sec * 1000 + t.tv_usec / 1000;
} //getNowMS
///\brief A function running in a thread to send all statistics. ///\brief A function running in a thread to send all statistics.
///\param empty A null pointer. ///\param empty A null pointer.
void handleStats(void * empty){ void handleStats(void * empty){
@ -40,7 +33,7 @@ namespace Buffer {
std::string double_newline = "\n\n"; std::string double_newline = "\n\n";
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
while (buffer_running){ while (buffer_running){
usleep(1000000); //sleep one second Util::sleep(1000); //sleep one second
Stream::get()->cleanUsers(); Stream::get()->cleanUsers();
if ( !StatsSocket.connected()){ if ( !StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/mist/statistics", true); StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
@ -71,7 +64,7 @@ namespace Buffer {
Stream::get()->dropReadLock(); Stream::get()->dropReadLock();
while (usr->S.connected()){ while (usr->S.connected()){
usleep(5000); //sleep 5ms Util::sleep(5); //sleep 5ms
if ( !usr->myRing->playCount || !usr->Send()){ if ( !usr->myRing->playCount || !usr->Send()){
if (usr->myRing->updated){ if (usr->myRing->updated){
Stream::get()->getReadLock(); Stream::get()->getReadLock();
@ -178,7 +171,7 @@ namespace Buffer {
while (std::cin.good() && buffer_running){ while (std::cin.good() && buffer_running){
//slow down packet receiving to real-time //slow down packet receiving to real-time
now = getNowMS(); now = Util::getMS();
if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
thisStream->getWriteLock(); thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(inBuffer)){ if (thisStream->getStream()->parsePacket(inBuffer)){
@ -195,7 +188,7 @@ namespace Buffer {
inBuffer.append(charBuffer, charCount); inBuffer.append(charBuffer, charCount);
} }
}else{ }else{
usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000); Util::sleep(std::min(15LL, lastPacket - (now - timeDiff)));
} }
} }
buffer_running = false; buffer_running = false;
@ -220,14 +213,14 @@ namespace Buffer {
}else{ }else{
thisStream->dropWriteLock(false); thisStream->dropWriteLock(false);
packed_parsed = false; packed_parsed = false;
usleep(1000); //1ms wait Util::sleep(1); //1ms wait
} }
}while (packed_parsed); }while (packed_parsed);
}else{ }else{
usleep(1000); //1ms wait Util::sleep(1); //1ms wait
} }
}else{ }else{
usleep(1000000); //1s wait Util::sleep(1000); //1s wait
} }
} }
} }
@ -257,6 +250,7 @@ namespace Buffer {
perror("Could not create stream socket"); perror("Could not create stream socket");
return 1; return 1;
} }
SS.setBlocking(false);
conf.activate(); conf.activate();
thisStream = Stream::get(); thisStream = Stream::get();
thisStream->setName(name); thisStream->setName(name);
@ -264,17 +258,18 @@ namespace Buffer {
Socket::Connection incoming; Socket::Connection incoming;
Socket::Connection std_input(fileno(stdin)); Socket::Connection std_input(fileno(stdin));
tthread::thread * StatsThread = 0;
if (conf.getBool("reportstats")){ if (conf.getBool("reportstats")){
StatsThread = new tthread::thread(handleStats, 0); tthread::thread StatsThread(handleStats, 0);
StatsThread.detach();
} }
tthread::thread * StdinThread = 0;
std::string await_ip = conf.getString("awaiting_ip"); std::string await_ip = conf.getString("awaiting_ip");
if (await_ip == ""){ if (await_ip == ""){
StdinThread = new tthread::thread(handleStdin, 0); tthread::thread StdinThread(handleStdin, 0);
StdinThread.detach();
}else{ }else{
thisStream->setWaitingIP(await_ip); thisStream->setWaitingIP(await_ip);
StdinThread = new tthread::thread(handlePushin, 0); tthread::thread StdinThread(handlePushin, 0);
StdinThread.detach();
} }
while (buffer_running && SS.connected() && conf.is_active){ while (buffer_running && SS.connected() && conf.is_active){
@ -285,6 +280,8 @@ namespace Buffer {
user * usr_ptr = new user(incoming); user * usr_ptr = new user(incoming);
thisStream->addUser(usr_ptr); thisStream->addUser(usr_ptr);
usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr); usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr);
}else{
Util::sleep(50);//sleep 50ms
} }
} //main loop } //main loop
@ -292,22 +289,14 @@ namespace Buffer {
buffer_running = false; buffer_running = false;
std::cout << "Buffer shutting down" << std::endl; std::cout << "Buffer shutting down" << std::endl;
SS.close(); SS.close();
if (StatsThread){
StatsThread->join();
delete StatsThread;
}
if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().connected()){
thisStream->getIPInput().close(); thisStream->getIPInput().close();
} }
StdinThread->join();
delete StdinThread;
delete thisStream; delete thisStream;
return 0; return 0;
} }
} } //Buffer namespace
;
//Buffer namespace
///\brief Entry point for Buffer, simply calls Buffer::Start(). ///\brief Entry point for Buffer, simply calls Buffer::Start().
int main(int argc, char ** argv){ int main(int argc, char ** argv){