Fixed several MistBuffer threading and timing issues.
This commit is contained in:
parent
99eb0ca6a4
commit
60a78c6a47
2 changed files with 9 additions and 3 deletions
|
@ -16,6 +16,7 @@
|
||||||
#include <mist/timing.h>
|
#include <mist/timing.h>
|
||||||
#include "buffer_stream.h"
|
#include "buffer_stream.h"
|
||||||
#include <mist/stream.h>
|
#include <mist/stream.h>
|
||||||
|
#include <mist/defines.h>
|
||||||
|
|
||||||
/// Holds all code unique to the Buffer.
|
/// Holds all code unique to the Buffer.
|
||||||
namespace Buffer {
|
namespace Buffer {
|
||||||
|
@ -72,7 +73,7 @@ namespace Buffer {
|
||||||
}
|
}
|
||||||
long long int wait_time = Util::getMS();
|
long long int wait_time = Util::getMS();
|
||||||
while (Util::getMS() - wait_time < thisStream->metadata.bufferWindow){
|
while (Util::getMS() - wait_time < thisStream->metadata.bufferWindow){
|
||||||
Util::sleep(thisStream->metadata.bufferWindow);
|
Util::sleep(thisStream->metadata.bufferWindow - (Util::getMS() - wait_time));
|
||||||
}
|
}
|
||||||
thisStream->removeSocket(sockNo);
|
thisStream->removeSocket(sockNo);
|
||||||
}
|
}
|
||||||
|
@ -315,6 +316,7 @@ namespace Buffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned int userId = 0;
|
unsigned int userId = 0;
|
||||||
|
SS.setBlocking(true);
|
||||||
while (buffer_running && SS.connected() && conf.is_active){
|
while (buffer_running && SS.connected() && conf.is_active){
|
||||||
//check for new connections, accept them if there are any
|
//check for new connections, accept them if there are any
|
||||||
//starts a thread for every accepted connection
|
//starts a thread for every accepted connection
|
||||||
|
@ -322,8 +324,6 @@ namespace Buffer {
|
||||||
if (incoming.connected()){
|
if (incoming.connected()){
|
||||||
tthread::thread thisUser(handleUser, (void *)new user(incoming, ++userId));
|
tthread::thread thisUser(handleUser, (void *)new user(incoming, ++userId));
|
||||||
thisUser.detach();
|
thisUser.detach();
|
||||||
}else{
|
|
||||||
Util::sleep(50);//sleep 50ms
|
|
||||||
}
|
}
|
||||||
} //main loop
|
} //main loop
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,9 @@ namespace Buffer {
|
||||||
Storage["totals"]["now"] = now;
|
Storage["totals"]["now"] = now;
|
||||||
Storage["buffer"] = name;
|
Storage["buffer"] = name;
|
||||||
|
|
||||||
|
rw_mutex.lock();
|
||||||
Storage["meta"] = metadata.toJSON();
|
Storage["meta"] = metadata.toJSON();
|
||||||
|
rw_mutex.unlock();
|
||||||
if (Storage["meta"].isMember("tracks")){
|
if (Storage["meta"].isMember("tracks")){
|
||||||
for (JSON::ObjIter oIt = Storage["meta"]["tracks"].ObjBegin(); oIt != Storage["meta"]["tracks"].ObjEnd(); ++oIt){
|
for (JSON::ObjIter oIt = Storage["meta"]["tracks"].ObjBegin(); oIt != Storage["meta"]["tracks"].ObjEnd(); ++oIt){
|
||||||
oIt->second.removeMember("fragments");
|
oIt->second.removeMember("fragments");
|
||||||
|
@ -190,7 +192,9 @@ namespace Buffer {
|
||||||
|
|
||||||
/// Removes a track and all related buffers from the stream.
|
/// Removes a track and all related buffers from the stream.
|
||||||
void Stream::removeTrack(int trackId){
|
void Stream::removeTrack(int trackId){
|
||||||
|
rw_mutex.lock();
|
||||||
metadata.tracks.erase(trackId);
|
metadata.tracks.erase(trackId);
|
||||||
|
rw_mutex.unlock();
|
||||||
std::set<DTSC::livePos> toDelete;
|
std::set<DTSC::livePos> toDelete;
|
||||||
for (std::map<DTSC::livePos, JSON::Value >::iterator it = buffers.begin(); it != buffers.end(); it++){
|
for (std::map<DTSC::livePos, JSON::Value >::iterator it = buffers.begin(); it != buffers.end(); it++){
|
||||||
if (it->first.trackID == (unsigned long long int)trackId){
|
if (it->first.trackID == (unsigned long long int)trackId){
|
||||||
|
@ -209,12 +213,14 @@ namespace Buffer {
|
||||||
void Stream::removeSocket(int sockNo){
|
void Stream::removeSocket(int sockNo){
|
||||||
std::set<int> toDelete;
|
std::set<int> toDelete;
|
||||||
std::map<int,DTSC::Track>::iterator it;
|
std::map<int,DTSC::Track>::iterator it;
|
||||||
|
rw_mutex.lock();
|
||||||
for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){
|
for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){
|
||||||
if ((it->first & (sockNo << 16)) == (sockNo << 16)){
|
if ((it->first & (sockNo << 16)) == (sockNo << 16)){
|
||||||
toDelete.insert(it->first);
|
toDelete.insert(it->first);
|
||||||
Log("BUFF", "Stream "+name+" lost input for track: "+ it->second.getIdentifier());
|
Log("BUFF", "Stream "+name+" lost input for track: "+ it->second.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
rw_mutex.unlock();
|
||||||
while (toDelete.size()){
|
while (toDelete.size()){
|
||||||
removeTrack(*toDelete.begin());
|
removeTrack(*toDelete.begin());
|
||||||
toDelete.erase(toDelete.begin());
|
toDelete.erase(toDelete.begin());
|
||||||
|
|
Loading…
Add table
Reference in a new issue