diff --git a/src/buffer.cpp b/src/buffer.cpp index cdc22e5f..26461ceb 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -58,69 +58,73 @@ namespace Buffer { #endif usr->myRing = thisStream->getRing(); - if (thisStream->getHeader().size() > 0){ + if (thisStream->getStream()->metadata && thisStream->getHeader().size() > 0){ usr->S.SendNow(thisStream->getHeader()); } while (usr->S.connected()){ usleep(5000); //sleep 5ms usr->Send(); - if (usr->S.spool() && usr->S.Received().size()){ - //delete anything that doesn't end with a newline - if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ - usr->S.Received().get().clear(); - continue; - } - usr->S.Received().get().resize(usr->S.Received().get().size() - 1); - if ( !usr->S.Received().get().empty()){ - switch (usr->S.Received().get()[0]){ - case 'P': { //Push - std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; - if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ - if (thisStream->setInput(usr->S)){ - std::cout << "Push accepted!" << std::endl; - usr->S = Socket::Connection( -1); - return; + if (usr->S.spool()){ + while (usr->S.Received().size()){ + //delete anything that doesn't end with a newline + if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ + usr->S.Received().get().clear(); + continue; + } + usr->S.Received().get().resize(usr->S.Received().get().size() - 1); + if ( !usr->S.Received().get().empty()){ + switch (usr->S.Received().get()[0]){ + case 'P': { //Push + std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; + if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ + usr->S.Received().get().clear(); + if (thisStream->setInput(usr->S)){ + std::cout << "Push accepted!" << std::endl; + usr->S = Socket::Connection( -1); + return; + }else{ + usr->Disconnect("Push denied - push already in progress!"); + } }else{ - usr->Disconnect("Push denied - push already in progress!"); + usr->Disconnect("Push denied - invalid IP address!"); } - }else{ - usr->Disconnect("Push denied - invalid IP address!"); } - } - break; - case 'S': { //Stats - usr->tmpStats = Stats(usr->S.Received().get().substr(2)); - unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; - if (secs < 1){ - secs = 1; + break; + case 'S': { //Stats + usr->tmpStats = Stats(usr->S.Received().get().substr(2)); + unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; + if (secs < 1){ + secs = 1; + } + usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; + usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; + usr->lastStats = usr->tmpStats; + thisStream->saveStats(usr->MyStr, usr->tmpStats); } - usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; - usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; - usr->lastStats = usr->tmpStats; - thisStream->saveStats(usr->MyStr, usr->tmpStats); + break; + case 's': { //second-seek + //ignored for now + } + break; + case 'f': { //frame-seek + //ignored for now + } + break; + case 'p': { //play + //ignored for now + } + break; + case 'o': { //once-play + //ignored for now + } + break; + case 'q': { //quit-playing + //ignored for now + } + break; } - break; - case 's': { //second-seek - //ignored for now - } - break; - case 'f': { //frame-seek - //ignored for now - } - break; - case 'p': { //play - //ignored for now - } - break; - case 'o': { //once-play - //ignored for now - } - break; - case 'q': { //quit-playing - //ignored for now - } - break; + usr->S.Received().get().clear(); } } } @@ -163,7 +167,6 @@ namespace Buffer { } } buffer_running = false; - SS.close(); } /// Loop reading DTSC data from an IP push address. @@ -175,14 +178,18 @@ namespace Buffer { while (buffer_running){ if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().spool()){ - thisStream->getWriteLock(); - if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ - //thisStream->getStream()->outPacket(0); - thisStream->dropWriteLock(true); - }else{ - thisStream->dropWriteLock(false); - usleep(1000); //1ms wait - } + bool packed_parsed = false; + do { + thisStream->getWriteLock(); + if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ + thisStream->dropWriteLock(true); + packed_parsed = true; + }else{ + thisStream->dropWriteLock(false); + packed_parsed = false; + usleep(1000); //1ms wait + } + } while(packed_parsed); }else{ usleep(1000); //1ms wait } @@ -190,7 +197,6 @@ namespace Buffer { usleep(1000000); //1s wait } } - SS.close(); } /// Starts a loop, waiting for connections to send data to. diff --git a/src/buffer_stream.cpp b/src/buffer_stream.cpp index ef88a2f8..0d154cf8 100644 --- a/src/buffer_stream.cpp +++ b/src/buffer_stream.cpp @@ -30,18 +30,14 @@ Buffer::Stream::Stream(){ /// Do cleanup on delete. Buffer::Stream::~Stream(){ - while (users.size() > 0){ - stats_mutex.lock(); - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - if (( * *usersIt).S.connected()){ - ( * *usersIt).S.close(); - printf("Closing user %s\n", ( * *usersIt).MyStr.c_str()); - } + tthread::lock_guard guard(stats_mutex); + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if (( * *usersIt).S.connected()){ + ( * *usersIt).S.close(); + printf("Closing user %s\n", ( * *usersIt).MyStr.c_str()); } - stats_mutex.unlock(); - moreData.notify_all(); - cleanUsers(); } + moreData.notify_all(); delete Strm; } @@ -50,7 +46,7 @@ std::string & Buffer::Stream::getStats(){ static std::string ret; long long int now = Util::epoch(); unsigned int tot_up = 0, tot_down = 0, tot_count = 0; - stats_mutex.lock(); + tthread::lock_guard guard(stats_mutex); if (users.size() > 0){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ tot_down += ( * *usersIt).curr_down; @@ -72,7 +68,6 @@ std::string & Buffer::Stream::getStats(){ } ret = Storage.toString(); Storage["log"].null(); - stats_mutex.unlock(); return ret; } @@ -123,19 +118,18 @@ Socket::Connection & Buffer::Stream::getIPInput(){ /// Stores intermediate statistics. void Buffer::Stream::saveStats(std::string username, Stats & stats){ - stats_mutex.lock(); + tthread::lock_guard guard(stats_mutex); Storage["curr"][username]["connector"] = stats.connector; Storage["curr"][username]["up"] = stats.up; Storage["curr"][username]["down"] = stats.down; Storage["curr"][username]["conntime"] = stats.conntime; Storage["curr"][username]["host"] = stats.host; Storage["curr"][username]["start"] = Util::epoch() - stats.conntime; - stats_mutex.unlock(); } /// Stores final statistics. void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){ - stats_mutex.lock(); + tthread::lock_guard guard(stats_mutex); if (Storage["curr"].isMember(username)){ Storage["curr"].removeMember(username); #if DEBUG >= 4 @@ -149,13 +143,12 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string Storage["log"][username]["conntime"] = stats.conntime; Storage["log"][username]["host"] = stats.host; Storage["log"][username]["start"] = Util::epoch() - stats.conntime; - stats_mutex.unlock(); } /// Cleans up broken connections void Buffer::Stream::cleanUsers(){ bool repeat = false; - stats_mutex.lock(); + tthread::lock_guard guard(stats_mutex); do{ repeat = false; if (users.size() > 0){ @@ -177,7 +170,6 @@ void Buffer::Stream::cleanUsers(){ } } }while (repeat); - stats_mutex.unlock(); } /// Blocks until writing is safe. @@ -231,14 +223,12 @@ void Buffer::Stream::setName(std::string n){ /// Add a user to the userlist. void Buffer::Stream::addUser(user * new_user){ - stats_mutex.lock(); + tthread::lock_guard guard(stats_mutex); users.push_back(new_user); - stats_mutex.unlock(); } /// Blocks the thread until new data is available. void Buffer::Stream::waitForData(){ - stats_mutex.lock(); + tthread::lock_guard guard(stats_mutex); moreData.wait(stats_mutex); - stats_mutex.unlock(); } diff --git a/src/buffer_user.cpp b/src/buffer_user.cpp index e67a04dc..5cb42baf 100644 --- a/src/buffer_user.cpp +++ b/src/buffer_user.cpp @@ -42,7 +42,7 @@ void Buffer::user::Disconnect(std::string reason){ /// Has a side effect of dropping the connection if send will never complete. bool Buffer::user::doSend(const char * ptr, int len){ if ( !len){ - return false; + return true; } //do not do empty sends int r = S.iwrite(ptr + currsend, len - currsend); if (r <= 0){