Buffer simplifications, preparations and additions.

This commit is contained in:
Thulinma 2013-12-03 10:06:45 +01:00
parent 045fe1cd70
commit 4ad866cc61
6 changed files with 104 additions and 127 deletions

View file

@ -47,7 +47,7 @@ bin_PROGRAMS+=MistDTSC2MP4
bin_PROGRAMS+=MistDTSC2SRT bin_PROGRAMS+=MistDTSC2SRT
#buffer folder (MistBuffer, MistPlayer) #buffer folder (MistBuffer, MistPlayer)
MistBuffer_SOURCES=buffer/buffer.cpp buffer/buffer_user.h buffer/buffer_user.cpp buffer/buffer_stream.h buffer/buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION MistBuffer_SOURCES=buffer/buffer.cpp buffer/buffer_stream.h buffer/buffer_stream.cpp tinythread.cpp tinythread.h ../VERSION
MistBuffer_LDADD=$(MIST_LIBS) -lpthread MistBuffer_LDADD=$(MIST_LIBS) -lpthread
MistPlayer_SOURCES=buffer/player.cpp MistPlayer_SOURCES=buffer/player.cpp

View file

@ -60,9 +60,7 @@ namespace Buffer {
#endif #endif
conn.setBlocking(true); conn.setBlocking(true);
while (buffer_running && conn.connected()){ while (buffer_running && conn.connected()){
if (conn.spool()){ thisStream->parsePacket(conn);
thisStream->parsePacket(conn.Received());
}
} }
if (buffer_running){ if (buffer_running){
thisStream->endStream(); thisStream->endStream();

View file

@ -3,6 +3,7 @@
#include "buffer_stream.h" #include "buffer_stream.h"
#include <mist/timing.h> #include <mist/timing.h>
#include <stdlib.h>
namespace Buffer { namespace Buffer {
/// Stores the singleton reference. /// Stores the singleton reference.
@ -59,17 +60,14 @@ namespace Buffer {
Storage["totals"]["count"] = tot_count; Storage["totals"]["count"] = tot_count;
Storage["totals"]["now"] = now; Storage["totals"]["now"] = now;
Storage["buffer"] = name; Storage["buffer"] = name;
Storage["meta"] = metadata; std::map<int,DTSC::Track>::iterator it;
for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){
if(Storage["meta"].isMember("tracks") && Storage["meta"]["tracks"].size() > 0){ std::cout << it->second.getIdentifier() << ": " << it->second.firstms << "-" << it->second.lastms << " (" << it->second.keys.size() << ")" << std::endl;
for(JSON::ObjIter it = Storage["meta"]["tracks"].ObjBegin(); it != Storage["meta"]["tracks"].ObjEnd(); it++){
it->second.removeMember("keys");
it->second.removeMember("frags");
}
//delete empty trackname if present - these are never interesting
Storage["meta"]["tracks"].removeMember("");
} }
Storage["meta"] = metadata.toJSON();
ret = Storage.toString(); ret = Storage.toString();
Storage["log"].null(); Storage["log"].null();
return ret; return ret;
@ -137,6 +135,7 @@ namespace Buffer {
Storage["log"][username]["host"] = stats.host; Storage["log"][username]["host"] = stats.host;
Storage["log"][username]["start"] = Util::epoch() - stats.conntime; Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
} }
/// The deletion callback override that will disconnect users /// The deletion callback override that will disconnect users
/// whom are currently receiving a tag that is being deleted. /// whom are currently receiving a tag that is being deleted.
void Stream::deletionCallback(DTSC::livePos deleting){ void Stream::deletionCallback(DTSC::livePos deleting){
@ -173,12 +172,13 @@ namespace Buffer {
} }
/// parsePacket override that will lock the rw_mutex during parsing. /// parsePacket override that will lock the rw_mutex during parsing.
bool Stream::parsePacket(Socket::Buffer & buffer){ bool Stream::parsePacket(Socket::Connection & c){
bool ret = false; bool ret = false;
if (!c.spool()){
return ret;
}
rw_mutex.lock(); rw_mutex.lock();
while (DTSC::Stream::parsePacket(buffer)){ while (DTSC::Stream::parsePacket(c.Received())){
//TODO: Update metadata with call erik will write
//metadata.netPrepare();
ret = true; ret = true;
} }
rw_mutex.unlock(); rw_mutex.unlock();
@ -218,4 +218,58 @@ namespace Buffer {
moreData.wait(stats_mutex); moreData.wait(stats_mutex);
} }
///Creates a new user from a newly connected socket.
///Also prints "User connected" text to stdout.
///\param fd A connection to the user.
user::user(Socket::Connection fd, long long ID){
sID = JSON::Value(ID).asString();
S = fd;
curr_up = 0;
curr_down = 0;
myRing = 0;
} //constructor
///Disconnects the current user. Doesn't do anything if already disconnected.
///Prints "Disconnected user" to stdout if disconnect took place.
///\param reason The reason for disconnecting the user.
void user::Disconnect(std::string reason){
S.close();
Stream::get()->clearStats(sID, lastStats, reason);
} //Disconnect
///Default stats constructor.
///Should not be used.
Stats::Stats(){
up = 0;
down = 0;
conntime = 0;
}
///Stats constructor reading a string.
///Reads a stats string and parses it to the internal representation.
///\param s The string of stats.
Stats::Stats(std::string s){
size_t f = s.find(' ');
if (f != std::string::npos){
host = s.substr(0, f);
s.erase(0, f + 1);
}
f = s.find(' ');
if (f != std::string::npos){
connector = s.substr(0, f);
s.erase(0, f + 1);
}
f = s.find(' ');
if (f != std::string::npos){
conntime = atoi(s.substr(0, f).c_str());
s.erase(0, f + 1);
}
f = s.find(' ');
if (f != std::string::npos){
up = atoi(s.substr(0, f).c_str());
s.erase(0, f + 1);
down = atoi(s.c_str());
}
}
} }

View file

@ -7,9 +7,41 @@
#include <mist/json.h> #include <mist/json.h>
#include <mist/socket.h> #include <mist/socket.h>
#include "tinythread.h" #include "tinythread.h"
#include "buffer_user.h"
namespace Buffer { namespace Buffer {
/// Converts a stats line to up, down, host, connector and conntime values.
class Stats{
public:
unsigned int up;///<The amount of bytes sent upstream.
unsigned int down;///<The amount of bytes received downstream.
std::string host;///<The connected host.
std::string connector;///<The connector the user is connected with.
unsigned int conntime;///<The amount of time the user is connected.
Stats(std::string s);
Stats();
};
///\brief Keeps track of connected users.
///
///Keeps track of which buffer the user currently uses,
///and its connection status.
class user{
public:
DTSC::Ring * myRing; ///< Ring of the buffer for this user.
unsigned int playUntil; ///< Time until where is being played or zero if undefined.
Stats lastStats; ///< Holds last known stats for this connection.
Stats tmpStats; ///< Holds temporary stats for this connection.
std::string sID; ///< Holds the connection ID.
unsigned int curr_up; ///< Holds the current estimated transfer speed up.
unsigned int curr_down; ///< Holds the current estimated transfer speed down.
Socket::Connection S; ///< Connection to user
/// Creates a new user from a newly connected socket.
user(Socket::Connection fd, long long int ID);
/// Disconnects the current user. Doesn't do anything if already disconnected.
void Disconnect(std::string reason);
};
/// Keeps track of a single streams inputs and outputs, taking care of thread safety and all other related issues. /// Keeps track of a single streams inputs and outputs, taking care of thread safety and all other related issues.
class Stream : public DTSC::Stream{ class Stream : public DTSC::Stream{
public: public:
@ -43,12 +75,10 @@ namespace Buffer {
~Stream(); ~Stream();
/// TODO: WRITEME /// TODO: WRITEME
bool parsePacket(std::string & buffer); bool parsePacket(std::string & buffer);
bool parsePacket(Socket::Buffer & buffer); bool parsePacket(Socket::Connection & c);
DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks); DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks);
private: private:
void deletionCallback(DTSC::livePos deleting); void deletionCallback(DTSC::livePos deleting);
volatile int readers; ///< Current count of active readers;
volatile int writers; ///< Current count of waiting/active writers.
tthread::mutex rw_mutex; ///< Mutex for read/write locking. tthread::mutex rw_mutex; ///< Mutex for read/write locking.
tthread::condition_variable rw_change; ///< Triggered when reader/writer count changes. tthread::condition_variable rw_change; ///< Triggered when reader/writer count changes.
static Stream * ref; static Stream * ref;

View file

@ -1,63 +0,0 @@
/// \file buffer_user.cpp
/// Contains code for buffer users.
#include "buffer_user.h"
#include "buffer_stream.h"
#include <sstream>
#include <stdlib.h>
namespace Buffer {
///Creates a new user from a newly connected socket.
///Also prints "User connected" text to stdout.
///\param fd A connection to the user.
user::user(Socket::Connection fd, long long ID){
sID = JSON::Value(ID).asString();
S = fd;
curr_up = 0;
curr_down = 0;
myRing = 0;
} //constructor
///Disconnects the current user. Doesn't do anything if already disconnected.
///Prints "Disconnected user" to stdout if disconnect took place.
///\param reason The reason for disconnecting the user.
void user::Disconnect(std::string reason){
S.close();
Stream::get()->clearStats(sID, lastStats, reason);
} //Disconnect
///Default stats constructor.
///Should not be used.
Stats::Stats(){
up = 0;
down = 0;
conntime = 0;
}
///Stats constructor reading a string.
///Reads a stats string and parses it to the internal representation.
///\param s The string of stats.
Stats::Stats(std::string s){
size_t f = s.find(' ');
if (f != std::string::npos){
host = s.substr(0, f);
s.erase(0, f + 1);
}
f = s.find(' ');
if (f != std::string::npos){
connector = s.substr(0, f);
s.erase(0, f + 1);
}
f = s.find(' ');
if (f != std::string::npos){
conntime = atoi(s.substr(0, f).c_str());
s.erase(0, f + 1);
}
f = s.find(' ');
if (f != std::string::npos){
up = atoi(s.substr(0, f).c_str());
s.erase(0, f + 1);
down = atoi(s.c_str());
}
}
}

View file

@ -1,42 +0,0 @@
/// \file buffer_user.h
/// Contains definitions for buffer users.
#pragma once
#include <string>
#include <mist/dtsc.h>
#include <mist/socket.h>
#include "tinythread.h"
namespace Buffer {
/// Converts a stats line to up, down, host, connector and conntime values.
class Stats{
public:
unsigned int up;///<The amount of bytes sent upstream.
unsigned int down;///<The amount of bytes received downstream.
std::string host;///<The connected host.
std::string connector;///<The connector the user is connected with.
unsigned int conntime;///<The amount of time the user is connected.
Stats();
Stats(std::string s);
};
///\brief Keeps track of connected users.
///
///Keeps track of which buffer the user currently uses,
///and its connection status.
class user{
public:
DTSC::Ring * myRing; ///< Ring of the buffer for this user.
unsigned int playUntil; ///< Time until where is being played or zero if undefined.
Stats lastStats; ///< Holds last known stats for this connection.
Stats tmpStats; ///< Holds temporary stats for this connection.
std::string sID; ///< Holds the connection ID.
unsigned int curr_up; ///< Holds the current estimated transfer speed up.
unsigned int curr_down; ///< Holds the current estimated transfer speed down.
Socket::Connection S; ///< Connection to user
/// Creates a new user from a newly connected socket.
user(Socket::Connection fd, long long int ID);
/// Disconnects the current user. Doesn't do anything if already disconnected.
void Disconnect(std::string reason);
};
}