Namespace refactor and documentation in the separate buffer files.

This commit is contained in:
Erik Zandvliet 2013-03-28 15:42:46 +01:00
parent 9b41a07c2f
commit 1e2f1602f8
3 changed files with 376 additions and 332 deletions

View file

@ -4,237 +4,263 @@
#include "buffer_stream.h" #include "buffer_stream.h"
#include <mist/timing.h> #include <mist/timing.h>
/// Stores the globally equal reference. namespace Buffer {
Buffer::Stream * Buffer::Stream::ref = 0; ///\brief Stores the singleton reference.
Stream * Stream::ref = 0;
/// Returns a globally equal reference to this class. ///\brief Returns a reference to the singleton instance of this class.
Buffer::Stream * Buffer::Stream::get(){ ///\return A reference to the class.
static tthread::mutex creator; Stream * Stream::get(){
if (ref == 0){ static tthread::mutex creator;
//prevent creating two at the same time
creator.lock();
if (ref == 0){ if (ref == 0){
ref = new Stream(); //prevent creating two at the same time
creator.lock();
if (ref == 0){
ref = new Stream();
}
creator.unlock();
} }
creator.unlock(); return ref;
} }
return ref;
}
/// Creates a new DTSC::Stream object, private function so only one instance can exist. ///\brief Creates a new DTSC::Stream object, private function so only one instance can exist.
Buffer::Stream::Stream(){ Stream::Stream(){
Strm = new DTSC::Stream(5); Strm = new DTSC::Stream(5);
readers = 0; readers = 0;
writers = 0; writers = 0;
}
/// Do cleanup on delete.
Buffer::Stream::~Stream(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if (( * *usersIt).S.connected()){
( * *usersIt).S.close();
}
} }
moreData.notify_all();
delete Strm;
}
/// Calculate and return the current statistics in JSON format. ///\brief Do cleanup on delete.
std::string & Buffer::Stream::getStats(){ Stream::~Stream(){
static std::string ret; tthread::lock_guard<tthread::mutex> guard(stats_mutex);
long long int now = Util::epoch();
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
tot_down += ( * *usersIt).curr_down; if (( * *usersIt).S.connected()){
tot_up += ( * *usersIt).curr_up; ( * *usersIt).S.close();
tot_count++; }
} }
moreData.notify_all();
delete Strm;
} }
Storage["totals"]["down"] = tot_down;
Storage["totals"]["up"] = tot_up;
Storage["totals"]["count"] = tot_count;
Storage["totals"]["now"] = now;
Storage["buffer"] = name;
Storage["meta"] = Strm->metadata;
if (Storage["meta"].isMember("audio")){
Storage["meta"]["audio"].removeMember("init");
}
if (Storage["meta"].isMember("video")){
Storage["meta"]["video"].removeMember("init");
}
ret = Storage.toString();
Storage["log"].null();
return ret;
}
/// Get a new DTSC::Ring object for a user. ///\brief Calculate and return the current statistics.
DTSC::Ring * Buffer::Stream::getRing(){ ///\return The current statistics in JSON format.
return Strm->getRing(); std::string & Stream::getStats(){
} static std::string ret;
long long int now = Util::epoch();
/// Drop a DTSC::Ring object. unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
void Buffer::Stream::dropRing(DTSC::Ring * ring){ tthread::lock_guard<tthread::mutex> guard(stats_mutex);
Strm->dropRing(ring);
}
/// Get the (constant) header data of this stream.
std::string & Buffer::Stream::getHeader(){
return Strm->outHeader();
}
/// Set the IP address to accept push data from.
void Buffer::Stream::setWaitingIP(std::string ip){
waiting_ip = ip;
}
/// Check if this is the IP address to accept push data from.
bool Buffer::Stream::checkWaitingIP(std::string ip){
if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){
return true;
}else{
std::cout << ip << " != (::ffff:)" << waiting_ip << std::endl;
return false;
}
}
/// Sets the current socket for push data.
bool Buffer::Stream::setInput(Socket::Connection S){
if (ip_input.connected()){
return false;
}else{
ip_input = S;
return true;
}
}
/// Gets the current socket for push data.
Socket::Connection & Buffer::Stream::getIPInput(){
return ip_input;
}
/// Stores intermediate statistics.
void Buffer::Stream::saveStats(std::string username, Stats & stats){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
Storage["curr"][username]["connector"] = stats.connector;
Storage["curr"][username]["up"] = stats.up;
Storage["curr"][username]["down"] = stats.down;
Storage["curr"][username]["conntime"] = stats.conntime;
Storage["curr"][username]["host"] = stats.host;
Storage["curr"][username]["start"] = Util::epoch() - stats.conntime;
}
/// Stores final statistics.
void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
if (Storage["curr"].isMember(username)){
Storage["curr"].removeMember(username);
#if DEBUG >= 4
std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and "
<< stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl;
#endif
}
Storage["log"][username]["connector"] = stats.connector;
Storage["log"][username]["up"] = stats.up;
Storage["log"][username]["down"] = stats.down;
Storage["log"][username]["conntime"] = stats.conntime;
Storage["log"][username]["host"] = stats.host;
Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
}
/// Cleans up broken connections
void Buffer::Stream::cleanUsers(){
bool repeat = false;
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
do{
repeat = false;
if (users.size() > 0){ if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){ tot_down += ( * *usersIt).curr_down;
delete *usersIt; tot_up += ( * *usersIt).curr_up;
users.erase(usersIt); tot_count++;
repeat = true; }
break; }
}else{ Storage["totals"]["down"] = tot_down;
if ( !( * *usersIt).S.connected()){ Storage["totals"]["up"] = tot_up;
if (( * *usersIt).Thread->joinable()){ Storage["totals"]["count"] = tot_count;
( * *usersIt).Thread->join(); Storage["totals"]["now"] = now;
delete ( * *usersIt).Thread; Storage["buffer"] = name;
( * *usersIt).Thread = 0; Storage["meta"] = Strm->metadata;
if (Storage["meta"].isMember("audio")){
Storage["meta"]["audio"].removeMember("init");
}
if (Storage["meta"].isMember("video")){
Storage["meta"]["video"].removeMember("init");
}
ret = Storage.toString();
Storage["log"].null();
return ret;
}
///\brief Get a new DTSC::Ring object for a user.
///\return A new DTSC::Ring object.
DTSC::Ring * Stream::getRing(){
return Strm->getRing();
}
///\brief Drop a DTSC::Ring object.
///\param ring The DTSC::Ring to be invalidated.
void Stream::dropRing(DTSC::Ring * ring){
Strm->dropRing(ring);
}
///\brief Get the (constant) header data of this stream.
///\return A reference to the header data of the stream.
std::string & Stream::getHeader(){
return Strm->outHeader();
}
///\brief Set the IP address to accept push data from.
///\param ip The new IP to accept push data from.
void Stream::setWaitingIP(std::string ip){
waiting_ip = ip;
}
///\brief Check if this is the IP address to accept push data from.
///\param ip The IP address to check.
///\return True if it is the correct address, false otherwise.
bool Stream::checkWaitingIP(std::string ip){
if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){
return true;
}else{
std::cout << ip << " != (::ffff:)" << waiting_ip << std::endl;
return false;
}
}
///\brief Sets the current socket for push data.
///\param S The new socket for accepting push data.
///\return True if succesful, false otherwise.
bool Stream::setInput(Socket::Connection S){
if (ip_input.connected()){
return false;
}else{
ip_input = S;
return true;
}
}
///\brief Gets the current socket for push data.
///\return A reference to the push socket.
Socket::Connection & Stream::getIPInput(){
return ip_input;
}
///\brief Stores intermediate statistics.
///\param username The name of the user.
///\param stats The final statistics to store.
void Stream::saveStats(std::string username, Stats & stats){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
Storage["curr"][username]["connector"] = stats.connector;
Storage["curr"][username]["up"] = stats.up;
Storage["curr"][username]["down"] = stats.down;
Storage["curr"][username]["conntime"] = stats.conntime;
Storage["curr"][username]["host"] = stats.host;
Storage["curr"][username]["start"] = Util::epoch() - stats.conntime;
}
///\brief Stores final statistics.
///\param username The name of the user.
///\param stats The final statistics to store.
///\param reason The reason for disconnecting.
void Stream::clearStats(std::string username, Stats & stats, std::string reason){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
if (Storage["curr"].isMember(username)){
Storage["curr"].removeMember(username);
#if DEBUG >= 4
std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and "
<< stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl;
#endif
}
Storage["log"][username]["connector"] = stats.connector;
Storage["log"][username]["up"] = stats.up;
Storage["log"][username]["down"] = stats.down;
Storage["log"][username]["conntime"] = stats.conntime;
Storage["log"][username]["host"] = stats.host;
Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
}
///\brief Clean up broken connections
void Stream::cleanUsers(){
bool repeat = false;
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
do{
repeat = false;
if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){
delete *usersIt;
users.erase(usersIt);
repeat = true;
break;
}else{
if ( !( * *usersIt).S.connected()){
if (( * *usersIt).Thread->joinable()){
( * *usersIt).Thread->join();
delete ( * *usersIt).Thread;
( * *usersIt).Thread = 0;
}
} }
} }
} }
} }
} }while (repeat);
}while (repeat);
}
/// Blocks until writing is safe.
void Buffer::Stream::getWriteLock(){
rw_mutex.lock();
writers++;
while (writers != 1 && readers != 0){
rw_change.wait(rw_mutex);
} }
rw_mutex.unlock();
}
/// Drops a previously gotten write lock. ///\brief Ask to obtain a write lock.
void Buffer::Stream::dropWriteLock(bool newpackets_available){ ///
if (newpackets_available){ /// Blocks until writing is safe.
if (Strm->getPacket(0).isMember("keyframe")){ void Stream::getWriteLock(){
stats_mutex.lock(); rw_mutex.lock();
Strm->updateHeaders(); writers++;
stats_mutex.unlock(); while (writers != 1 && readers != 0){
rw_change.wait(rw_mutex);
}
rw_mutex.unlock();
}
///\brief Drops a previously obtained write lock.
///\param newPacketsAvailable Whether new packets are available to update the index.
void Stream::dropWriteLock(bool newPacketsAvailable){
if (newPacketsAvailable){
if (Strm->getPacket(0).isMember("keyframe")){
stats_mutex.lock();
Strm->updateHeaders();
stats_mutex.unlock();
}
}
rw_mutex.lock();
writers--;
rw_mutex.unlock();
rw_change.notify_all();
if (newPacketsAvailable){
moreData.notify_all();
} }
} }
rw_mutex.lock();
writers--; ///\brief Ask to obtain a read lock.
rw_mutex.unlock(); ///
rw_change.notify_all(); ///Blocks until reading is safe.
if (newpackets_available){ void Stream::getReadLock(){
moreData.notify_all(); rw_mutex.lock();
while (writers > 0){
rw_change.wait(rw_mutex);
}
readers++;
rw_mutex.unlock();
}
///\brief Drops a previously obtained read lock.
void Stream::dropReadLock(){
rw_mutex.lock();
readers--;
rw_mutex.unlock();
rw_change.notify_all();
}
///\brief Retrieves a reference to the DTSC::Stream
///\return A reference to the used DTSC::Stream
DTSC::Stream * Stream::getStream(){
return Strm;
}
///\brief Sets the buffer name.
///\param n The new name of the buffer.
void Stream::setName(std::string n){
name = n;
}
///\brief Add a user to the userlist.
///\param newUser The user to be added.
void Stream::addUser(user * newUser){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
users.push_back(newUser);
}
///\brief Blocks the thread until new data is available.
void Stream::waitForData(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
moreData.wait(stats_mutex);
} }
} }
/// Blocks until reading is safe.
void Buffer::Stream::getReadLock(){
rw_mutex.lock();
while (writers > 0){
rw_change.wait(rw_mutex);
}
readers++;
rw_mutex.unlock();
}
/// Drops a previously gotten read lock.
void Buffer::Stream::dropReadLock(){
rw_mutex.lock();
readers--;
rw_mutex.unlock();
rw_change.notify_all();
}
/// Retrieves a reference to the DTSC::Stream
DTSC::Stream * Buffer::Stream::getStream(){
return Strm;
}
/// Sets the buffer name.
void Buffer::Stream::setName(std::string n){
name = n;
}
/// Add a user to the userlist.
void Buffer::Stream::addUser(user * new_user){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
users.push_back(new_user);
}
/// Blocks the thread until new data is available.
void Buffer::Stream::waitForData(){
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
moreData.wait(stats_mutex);
}

View file

@ -50,7 +50,7 @@ namespace Buffer {
/// Sets the buffer name. /// Sets the buffer name.
void setName(std::string n); void setName(std::string n);
/// Add a user to the userlist. /// Add a user to the userlist.
void addUser(user * new_user); void addUser(user * newUser);
/// Blocks the thread until new data is available. /// Blocks the thread until new data is available.
void waitForData(); void waitForData();
/// Cleanup function /// Cleanup function

View file

@ -1,73 +1,116 @@
/// \file buffer_user.cpp /// \file buffer_user.cpp
/// Contains code for buffer users. /// Contains code for buffer users.
#include "buffer_user.h" #include "buffer_user.h"
#include "buffer_stream.h" #include "buffer_stream.h"
#include <sstream> #include <sstream>
#include <stdlib.h> //for atoi and friends #include <stdlib.h>
int Buffer::user::UserCount = 0;
/// Creates a new user from a newly connected socket. namespace Buffer {
/// Also prints "User connected" text to stdout. int user::UserCount = 0;
Buffer::user::user(Socket::Connection fd){
S = fd;
MyNum = UserCount++;
std::stringstream st;
st << MyNum;
MyStr = st.str();
curr_up = 0;
curr_down = 0;
currsend = 0;
myRing = 0;
Thread = 0;
gotproperaudio = false;
lastpointer = 0;
} //constructor
/// Drops held DTSC::Ring class, if one is held. ///\brief Creates a new user from a newly connected socket.
Buffer::user::~user(){ ///
Stream::get()->dropRing(myRing); ///Also prints "User connected" text to stdout.
} //destructor ///\param fd A connection to the user.
user::user(Socket::Connection fd){
S = fd;
MyNum = UserCount++;
std::stringstream st;
st << MyNum;
MyStr = st.str();
curr_up = 0;
curr_down = 0;
currsend = 0;
myRing = 0;
Thread = 0;
gotproperaudio = false;
lastpointer = 0;
} //constructor
/// Disconnects the current user. Doesn't do anything if already disconnected. ///\brief Drops held DTSC::Ring class, if one is held.
/// Prints "Disconnected user" to stdout if disconnect took place. user::~user(){
void Buffer::user::Disconnect(std::string reason){ Stream::get()->dropRing(myRing);
if (S.connected()){ } //destructor
S.close();
}
Stream::get()->clearStats(MyStr, lastStats, reason);
} //Disconnect
/// Tries to send the current buffer, returns true if success, false otherwise. ///\brief Disconnects the current user. Doesn't do anything if already disconnected.
/// Has a side effect of dropping the connection if send will never complete. ///
bool Buffer::user::doSend(const char * ptr, int len){ ///Prints "Disconnected user" to stdout if disconnect took place.
if ( !len){ ///\param reason The reason for disconnecting the user.
return true; void user::Disconnect(std::string reason){
} //do not do empty sends if (S.connected()){
int r = S.iwrite(ptr + currsend, len - currsend); S.close();
if (r <= 0){ }
if (errno == EWOULDBLOCK){ Stream::get()->clearStats(MyStr, lastStats, reason);
} //Disconnect
///\brief Tries to send data to the user.
///
///Has a side effect of dropping the connection if send will never complete.
///\param ptr A pointer to the data that is to be sent.
///\param len The amount of bytes to be sent from this pointer.
///\return True if len bytes are sent, false otherwise.
bool user::doSend(const char * ptr, int len){
if ( !len){
return true;
} //do not do empty sends
int r = S.iwrite(ptr + currsend, len - currsend);
if (r <= 0){
if (errno == EWOULDBLOCK){
return false;
}
Disconnect(S.getError());
return false; return false;
} }
Disconnect(S.getError()); currsend += r;
return false; return (currsend == len);
} } //doSend
currsend += r;
return (currsend == len);
} //doSend
/// Try to send data to this user. Disconnects if any problems occur. ///\brief Try to send the current buffer.
bool Buffer::user::Send(){ ///
if ( !myRing){ ///\return True if the send was succesful, false otherwise.
return false; bool user::Send(){
} //no ring! if ( !myRing){
if ( !S.connected()){ return false;
return false; } //no ring!
} //cancel if not connected if ( !S.connected()){
if (myRing->waiting){ return false;
Stream::get()->waitForData(); } //cancel if not connected
if ( !myRing->waiting){ if (myRing->waiting){
Stream::get()->getReadLock(); Stream::get()->waitForData();
if ( !myRing->waiting){
Stream::get()->getReadLock();
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
myRing->playCount--;
if ( !myRing->playCount){
JSON::Value pausemark;
pausemark["datatype"] = "pause_marker";
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
pausemark.toPacked();
S.SendNow(pausemark.toNetPacked());
}
}
Stream::get()->dropReadLock();
}
return false;
} //still waiting for next buffer?
if (myRing->starved){
//if corrupt data, warn and get new DTSC::Ring
std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
Stream::get()->dropRing(myRing);
myRing = Stream::get()->getRing();
return false;
}
//try to complete a send
Stream::get()->getReadLock();
if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){
//switch to next buffer
currsend = 0;
if (myRing->b <= 0){
myRing->waiting = true;
return false;
} //no next buffer? go in waiting mode.
myRing->b--;
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){ if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
myRing->playCount--; myRing->playCount--;
if ( !myRing->playCount){ if ( !myRing->playCount){
@ -79,71 +122,46 @@ bool Buffer::user::Send(){
} }
} }
Stream::get()->dropReadLock(); Stream::get()->dropReadLock();
}
return false;
} //still waiting for next buffer?
if (myRing->starved){
//if corrupt data, warn and get new DTSC::Ring
std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
Stream::get()->dropRing(myRing);
myRing = Stream::get()->getRing();
return false;
}
//try to complete a send
Stream::get()->getReadLock();
if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){
//switch to next buffer
currsend = 0;
if (myRing->b <= 0){
myRing->waiting = true;
return false; return false;
} //no next buffer? go in waiting mode. } //completed a send
myRing->b--;
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
myRing->playCount--;
if ( !myRing->playCount){
JSON::Value pausemark;
pausemark["datatype"] = "pause_marker";
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
pausemark.toPacked();
S.SendNow(pausemark.toNetPacked());
}
}
Stream::get()->dropReadLock(); Stream::get()->dropReadLock();
return false; return true;
} //completed a send } //send
Stream::get()->dropReadLock();
return true;
} //send
/// Default constructor - should not be in use. ///\brief Default stats constructor.
Buffer::Stats::Stats(){ ///
up = 0; ///Should not be used.
down = 0; Stats::Stats(){
conntime = 0; up = 0;
} down = 0;
conntime = 0;
}
/// Reads a stats string and parses it to the internal representation. ///\brief Stats constructor reading a string.
Buffer::Stats::Stats(std::string s){ ///
size_t f = s.find(' '); ///Reads a stats string and parses it to the internal representation.
if (f != std::string::npos){ ///\param s The string of stats.
host = s.substr(0, f); Stats::Stats(std::string s){
s.erase(0, f + 1); size_t f = s.find(' ');
} if (f != std::string::npos){
f = s.find(' '); host = s.substr(0, f);
if (f != std::string::npos){ s.erase(0, f + 1);
connector = s.substr(0, f); }
s.erase(0, f + 1); f = s.find(' ');
} if (f != std::string::npos){
f = s.find(' '); connector = s.substr(0, f);
if (f != std::string::npos){ s.erase(0, f + 1);
conntime = atoi(s.substr(0, f).c_str()); }
s.erase(0, f + 1); f = s.find(' ');
} if (f != std::string::npos){
f = s.find(' '); conntime = atoi(s.substr(0, f).c_str());
if (f != std::string::npos){ s.erase(0, f + 1);
up = atoi(s.substr(0, f).c_str()); }
s.erase(0, f + 1); f = s.find(' ');
down = atoi(s.c_str()); if (f != std::string::npos){
up = atoi(s.substr(0, f).c_str());
s.erase(0, f + 1);
down = atoi(s.c_str());
}
} }
} }