From 60a78c6a47fc6f562b9bfd58ebabc41d5d9556e1 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 29 Jan 2014 13:48:18 +0100 Subject: [PATCH] Fixed several MistBuffer threading and timing issues. --- src/buffer/buffer.cpp | 6 +++--- src/buffer/buffer_stream.cpp | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index cda0ca66..bf90d11e 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -16,6 +16,7 @@ #include #include "buffer_stream.h" #include +#include /// Holds all code unique to the Buffer. namespace Buffer { @@ -72,7 +73,7 @@ namespace Buffer { } long long int wait_time = Util::getMS(); while (Util::getMS() - wait_time < thisStream->metadata.bufferWindow){ - Util::sleep(thisStream->metadata.bufferWindow); + Util::sleep(thisStream->metadata.bufferWindow - (Util::getMS() - wait_time)); } thisStream->removeSocket(sockNo); } @@ -315,6 +316,7 @@ namespace Buffer { } unsigned int userId = 0; + SS.setBlocking(true); while (buffer_running && SS.connected() && conf.is_active){ //check for new connections, accept them if there are any //starts a thread for every accepted connection @@ -322,8 +324,6 @@ namespace Buffer { if (incoming.connected()){ tthread::thread thisUser(handleUser, (void *)new user(incoming, ++userId)); thisUser.detach(); - }else{ - Util::sleep(50);//sleep 50ms } } //main loop diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index c69167e2..e297d926 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -71,7 +71,9 @@ namespace Buffer { Storage["totals"]["now"] = now; Storage["buffer"] = name; + rw_mutex.lock(); Storage["meta"] = metadata.toJSON(); + rw_mutex.unlock(); if (Storage["meta"].isMember("tracks")){ for (JSON::ObjIter oIt = Storage["meta"]["tracks"].ObjBegin(); oIt != Storage["meta"]["tracks"].ObjEnd(); ++oIt){ oIt->second.removeMember("fragments"); @@ -190,7 +192,9 @@ namespace Buffer { /// Removes a track and all related buffers from the stream. void Stream::removeTrack(int trackId){ + rw_mutex.lock(); metadata.tracks.erase(trackId); + rw_mutex.unlock(); std::set toDelete; for (std::map::iterator it = buffers.begin(); it != buffers.end(); it++){ if (it->first.trackID == (unsigned long long int)trackId){ @@ -209,12 +213,14 @@ namespace Buffer { void Stream::removeSocket(int sockNo){ std::set toDelete; std::map::iterator it; + rw_mutex.lock(); for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){ if ((it->first & (sockNo << 16)) == (sockNo << 16)){ toDelete.insert(it->first); Log("BUFF", "Stream "+name+" lost input for track: "+ it->second.getIdentifier()); } } + rw_mutex.unlock(); while (toDelete.size()){ removeTrack(*toDelete.begin()); toDelete.erase(toDelete.begin());