Updated buffer and controller for upcoming features.

This commit is contained in:
Thulinma 2013-11-18 16:48:59 +01:00
parent 1986b1517b
commit 0a3b34e9b6
7 changed files with 306 additions and 451 deletions

View file

@ -38,33 +38,121 @@ namespace Buffer {
StatsSocket = Socket::Connection(Util::getTmpFolder() + "statistics", true);
}
if (StatsSocket.connected()){
Stream::get()->getReadLock();
StatsSocket.Send(Stream::get()->getStats());
Stream::get()->dropReadLock();
StatsSocket.SendNow(Stream::get()->getStats());
StatsSocket.SendNow(double_newline);
if (StatsSocket.spool()){
//Got a response.
buffer_running = false;
}
}
}
StatsSocket.close();
}
///\brief A function to handle input data.
///\param conn A socket reference.
void handlePushIn(Socket::Connection & conn){
conn.setBlocking(true);
while (buffer_running && conn.connected()){
if (conn.spool()){
thisStream->parsePacket(conn.Received());
}
}
if (buffer_running){
thisStream->endStream();
}
}
///\brief A function running a thread to handle input data through stdin.
///Automatically slows down to realtime playback.
///\param empty A null pointer.
void handleStdin(void * empty){
if (empty != 0){
return;
}
long long int timeDiff = 0; //difference between local time and stream time
unsigned int lastPacket = 0; //last parsed packet timestamp
std::string inBuffer;
char charBuffer[1024 * 10];
unsigned int charCount;
long long int now;
while (std::cin.good() && buffer_running){
//slow down packet receiving to real-time
now = Util::getMS();
if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
if (thisStream->parsePacket(inBuffer)){
lastPacket = thisStream->getTime();
if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){
timeDiff = now - lastPacket;
}
}else{
std::cin.read(charBuffer, 1024 * 10);
charCount = std::cin.gcount();
inBuffer.append(charBuffer, charCount);
}
}else{
Util::sleep(std::min(15LL, lastPacket - (now - timeDiff)));
}
}
buffer_running = false;
}
///\brief A function running in a thread to handle a new user connection.
///\param v_usr The user that is connected.
void handleUser(void * v_usr){
std::set<int> newSelect;
std::set<int> allowedTracks;
user * usr = (user*)v_usr;
thisStream->addUser(usr);
#if DEBUG >= 5
std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl;
#endif
Stream::get()->getReadLock();
usr->myRing = thisStream->getRing();
if (thisStream->getStream()->metadata && thisStream->getHeader().size() > 0){
usr->S.SendNow(thisStream->getHeader());
}
Stream::get()->dropReadLock();
thisStream->sendMeta(usr->S);
while (usr->S.connected()){
if ( !usr->myRing->playCount || !usr->Send(newSelect)){
if (usr->myRing->playCount){
if (usr->myRing->waiting){
Stream::get()->waitForData();
if ( !Stream::get()->isNewest(usr->myRing->b, allowedTracks)){
usr->myRing->waiting = false;
usr->myRing->b = Stream::get()->getNext(usr->myRing->b, allowedTracks);
if ((Stream::get()->getPacket(usr->myRing->b).isMember("keyframe") && (usr->myRing->playCount > 0)) || (usr->playUntil && usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt())){
usr->myRing->playCount--;
if (usr->myRing->playCount < 1 || usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt()){
usr->myRing->playCount = 0;
JSON::Value pausemark;
pausemark["datatype"] = "pause_marker";
pausemark["time"] = Stream::get()->getPacket(usr->myRing->b)["time"].asInt();
pausemark.toPacked();
usr->S.SendNow(pausemark.toNetPacked());
}
}
}
}else{
//complete a send
Stream::get()->getPacket(usr->myRing->b).sendTo(usr->S);
if ( !usr->S.connected()){break;}
//switch to next buffer
if (Stream::get()->isNewest(usr->myRing->b, allowedTracks)){
//no next buffer? go in waiting mode.
usr->myRing->waiting = true;
}else{
usr->myRing->b = Stream::get()->getNext(usr->myRing->b, allowedTracks);
if ((Stream::get()->getPacket(usr->myRing->b).isMember("keyframe") && (usr->myRing->playCount > 0)) || (usr->playUntil && usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt())){
usr->myRing->playCount--;
if (usr->myRing->playCount < 1 || usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt()){
usr->myRing->playCount = 0;
JSON::Value pausemark;
pausemark["datatype"] = "pause_marker";
pausemark["time"] = Stream::get()->getPacket(usr->myRing->b)["time"].asInt();
pausemark.toPacked();
usr->S.SendNow(pausemark.toNetPacked());
}
}
}
}
}
if (usr->S.spool()){
while (usr->S.Received().size()){
//delete anything that doesn't end with a newline
@ -79,13 +167,11 @@ namespace Buffer {
std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl;
if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){
usr->S.Received().get().clear();
if (thisStream->setInput(usr->S)){
std::cout << "Push accepted!" << std::endl;
Socket::Connection tmp = usr->S;
usr->S = Socket::Connection( -1);
return;
}else{
usr->Disconnect("Push denied - push already in progress!");
}
thisStream->removeUser(usr);
delete usr;
return handlePushIn(tmp);
}else{
usr->Disconnect("Push denied - invalid IP address!");
}
@ -100,18 +186,17 @@ namespace Buffer {
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
usr->lastStats = usr->tmpStats;
thisStream->saveStats(usr->MyStr, usr->tmpStats);
Stream::get()->getReadLock();
usr->S.SendNow(thisStream->getHeader());
Stream::get()->dropReadLock();
thisStream->saveStats(usr->sID, usr->tmpStats);
//TODO: Re-enable this
//thisStream->sendMeta(usr->S);
break;
}
case 't': {
if (usr->S.Received().get().size() >= 3){
newSelect.clear();
allowedTracks.clear();
std::string tmp = usr->S.Received().get().substr(2);
while (tmp != ""){
newSelect.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str()));
allowedTracks.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str()));
if (tmp.find(' ') != std::string::npos){
tmp.erase(0,tmp.find(' ')+1);
}else{
@ -125,7 +210,7 @@ namespace Buffer {
unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt();
usr->myRing->waiting = false;
usr->myRing->starved = false;
usr->myRing->b = thisStream->getStream()->msSeek(ms, newSelect);
usr->myRing->b = thisStream->msSeek(ms, allowedTracks);
if (usr->myRing->playCount > 0){
usr->myRing->playCount = 0;
}
@ -159,90 +244,10 @@ namespace Buffer {
Util::sleep(300); //sleep 5ms
}
}
}
usr->Disconnect("Socket closed.");
thisStream->removeUser(usr);
}
///\brief A function running a thread to handle input data through stdin.
///
///Automatically slows down to realtime playback.
///\param empty A null pointer.
void handleStdin(void * empty){
if (empty != 0){
return;
}
long long int timeDiff = 0; //difference between local time and stream time
unsigned int lastPacket = 0; //last parsed packet timestamp
std::string inBuffer;
char charBuffer[1024 * 10];
unsigned int charCount;
long long int now;
while (std::cin.good() && buffer_running){
//slow down packet receiving to real-time
now = Util::getMS();
if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(inBuffer)){
lastPacket = thisStream->getStream()->getTime();
if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){
timeDiff = now - lastPacket;
}
thisStream->dropWriteLock(true);
}else{
thisStream->dropWriteLock(false);
std::cin.read(charBuffer, 1024 * 10);
charCount = std::cin.gcount();
inBuffer.append(charBuffer, charCount);
}
}else{
Util::sleep(std::min(15LL, lastPacket - (now - timeDiff)));
}
}
buffer_running = false;
}
///\brief A function running a thread to handle input data through rtmp push.
///\param empty A null pointer.
void handlePushin(void * empty){
bool connected = false;
if (empty != 0){
return;
}
while (buffer_running){
if (thisStream->getIPInput().connected()){
if (!connected){
connected = true;
thisStream->getIPInput().setBlocking(true);
}
if (thisStream->getIPInput().spool()){
thisStream->getWriteLock();
bool newPackets = false;
while (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){
if (thisStream->getStream()->metadata.isMember("reset")){
thisStream->disconnectUsers();
thisStream->getStream()->metadata.removeMember("reset");
thisStream->getStream()->metadata.netPrepare();
}
newPackets = true;
}
thisStream->dropWriteLock(newPackets);
}
}else{
if (connected){
connected = false;
thisStream->getWriteLock();
thisStream->getStream()->endStream();
thisStream->dropWriteLock(true);
}
Util::sleep(1000); //1s wait
}
}
}
///\brief Starts a loop, waiting for connections to send data to.
///\param argc The number of arguments to the program.
///\param argv The arguments to the program.
@ -272,7 +277,7 @@ namespace Buffer {
conf.activate();
thisStream = Stream::get();
thisStream->setName(name);
thisStream->getStream()->setBufferTime(conf.getInteger("time"));
thisStream->setBufferTime(conf.getInteger("time"));
Socket::Connection incoming;
Socket::Connection std_input(fileno(stdin));
@ -286,16 +291,15 @@ namespace Buffer {
StdinThread.detach();
}else{
thisStream->setWaitingIP(await_ip);
tthread::thread StdinThread(handlePushin, 0);
StdinThread.detach();
}
unsigned int userId = 0;
while (buffer_running && SS.connected() && conf.is_active){
//check for new connections, accept them if there are any
//starts a thread for every accepted connection
incoming = SS.accept(true);
if (incoming.connected()){
tthread::thread thisUser(handleUser, (void *)new user(incoming));
tthread::thread thisUser(handleUser, (void *)new user(incoming, userId++));
thisUser.detach();
}else{
Util::sleep(50);//sleep 50ms
@ -306,9 +310,6 @@ namespace Buffer {
buffer_running = false;
std::cout << "Buffer shutting down" << std::endl;
SS.close();
if (thisStream->getIPInput().connected()){
thisStream->getIPInput().close();
}
delete thisStream;
return 0;
}

View file

@ -5,11 +5,11 @@
#include <mist/timing.h>
namespace Buffer {
///\brief Stores the singleton reference.
/// Stores the singleton reference.
Stream * Stream::ref = 0;
///\brief Returns a reference to the singleton instance of this class.
///\return A reference to the class.
/// Returns a reference to the singleton instance of this class.
/// \return A reference to the class.
Stream * Stream::get(){
static tthread::mutex creator;
if (ref == 0){
@ -23,14 +23,10 @@ namespace Buffer {
return ref;
}
///\brief Creates a new DTSC::Stream object, private function so only one instance can exist.
Stream::Stream(){
Strm = new DTSC::Stream(5);
readers = 0;
writers = 0;
}
/// Creates a new DTSC::Stream object, private function so only one instance can exist.
Stream::Stream() : DTSC::Stream(5){}
///\brief Do cleanup on delete.
/// Do cleanup on delete.
Stream::~Stream(){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
if (users.size() > 0){
@ -41,11 +37,10 @@ namespace Buffer {
}
}
moreData.notify_all();
delete Strm;
}
///\brief Calculate and return the current statistics.
///\return The current statistics in JSON format.
/// Calculate and return the current statistics.
/// \return The current statistics in JSON format.
std::string & Stream::getStats(){
static std::string ret;
long long int now = Util::epoch();
@ -64,7 +59,7 @@ namespace Buffer {
Storage["totals"]["now"] = now;
Storage["buffer"] = name;
Storage["meta"] = Strm->metadata;
Storage["meta"] = metadata;
if(Storage["meta"].isMember("tracks") && Storage["meta"]["tracks"].size() > 0){
for(JSON::ObjIter it = Storage["meta"]["tracks"].ObjBegin(); it != Storage["meta"]["tracks"].ObjEnd(); it++){
@ -79,33 +74,15 @@ namespace Buffer {
return ret;
}
///\brief Get a new DTSC::Ring object for a user.
///\return A new DTSC::Ring object.
DTSC::Ring * Stream::getRing(){
return Strm->getRing();
}
///\brief Drop a DTSC::Ring object.
///\param ring The DTSC::Ring to be invalidated.
void Stream::dropRing(DTSC::Ring * ring){
Strm->dropRing(ring);
}
///\brief Get the (constant) header data of this stream.
///\return A reference to the header data of the stream.
std::string & Stream::getHeader(){
return Strm->outHeader();
}
///\brief Set the IP address to accept push data from.
///\param ip The new IP to accept push data from.
/// Set the IP address to accept push data from.
/// \param ip The new IP to accept push data from.
void Stream::setWaitingIP(std::string ip){
waiting_ip = ip;
}
///\brief Check if this is the IP address to accept push data from.
///\param ip The IP address to check.
///\return True if it is the correct address, false otherwise.
/// Check if this is the IP address to accept push data from.
/// \param ip The IP address to check.
/// \return True if it is the correct address, false otherwise.
bool Stream::checkWaitingIP(std::string ip){
if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){
return true;
@ -115,27 +92,9 @@ namespace Buffer {
}
}
///\brief Sets the current socket for push data.
///\param S The new socket for accepting push data.
///\return True if succesful, false otherwise.
bool Stream::setInput(Socket::Connection S){
if (ip_input.connected()){
return false;
}else{
ip_input = S;
return true;
}
}
///\brief Gets the current socket for push data.
///\return A reference to the push socket.
Socket::Connection & Stream::getIPInput(){
return ip_input;
}
///\brief Stores intermediate statistics.
///\param username The name of the user.
///\param stats The final statistics to store.
/// Stores intermediate statistics.
/// \param username The name of the user.
/// \param stats The final statistics to store.
void Stream::saveStats(std::string username, Stats & stats){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
Storage["curr"][username]["connector"] = stats.connector;
@ -146,10 +105,10 @@ namespace Buffer {
Storage["curr"][username]["start"] = Util::epoch() - stats.conntime;
}
///\brief Stores final statistics.
///\param username The name of the user.
///\param stats The final statistics to store.
///\param reason The reason for disconnecting.
/// Stores final statistics.
/// \param username The name of the user.
/// \param stats The final statistics to store.
/// \param reason The reason for disconnecting.
void Stream::clearStats(std::string username, Stats & stats, std::string reason){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
if (Storage["curr"].isMember(username)){
@ -166,91 +125,92 @@ namespace Buffer {
Storage["log"][username]["host"] = stats.host;
Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
}
///\brief Ask to obtain a write lock.
///
/// Blocks until writing is safe.
void Stream::getWriteLock(){
rw_mutex.lock();
writers++;
while (writers != 1 && readers != 0){
rw_change.wait(rw_mutex);
/// The deletion callback override that will disconnect users
/// whom are currently receiving a tag that is being deleted.
void Stream::deletionCallback(DTSC::livePos deleting){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if ((*usersIt)->myRing->b == deleting){
(*usersIt)->Disconnect("Buffer underrun");
}
rw_mutex.unlock();
}
///\brief Drops a previously obtained write lock.
///\param newPacketsAvailable Whether new packets are available to update the index.
void Stream::dropWriteLock(bool newPacketsAvailable){
if (newPacketsAvailable){
Strm->metadata.netPrepare();
}
rw_mutex.lock();
writers--;
rw_mutex.unlock();
rw_change.notify_all();
if (newPacketsAvailable){
moreData.notify_all();
}
}
///\brief Ask to obtain a read lock.
///
///Blocks until reading is safe.
void Stream::getReadLock(){
rw_mutex.lock();
while (writers > 0){
rw_change.wait(rw_mutex);
}
readers++;
rw_mutex.unlock();
}
///\brief Drops a previously obtained read lock.
void Stream::dropReadLock(){
rw_mutex.lock();
readers--;
rw_mutex.unlock();
rw_change.notify_all();
}
///\brief Retrieves a reference to the DTSC::Stream
///\return A reference to the used DTSC::Stream
DTSC::Stream * Stream::getStream(){
return Strm;
}
///\brief Sets the buffer name.
///\param n The new name of the buffer.
/// Sets the buffer name.
/// \param n The new name of the buffer.
void Stream::setName(std::string n){
name = n;
}
///\brief Add a user to the userlist.
///\param newUser The user to be added.
/// parsePacket override that will lock the rw_mutex during parsing.
bool Stream::parsePacket(std::string & buffer){
rw_mutex.lock();
bool ret = DTSC::Stream::parsePacket(buffer);
rw_mutex.unlock();
if (ret){
rw_change.notify_all();
moreData.notify_all();
}
return ret;
}
/// getNext override that will lock the rw_mutex during checking.
DTSC::livePos Stream::getNext(DTSC::livePos & pos, std::set<int> & allowedTracks){
tthread::lock_guard<tthread::mutex> guard(rw_mutex);
return DTSC::Stream::getNext(pos, allowedTracks);
}
/// parsePacket override that will lock the rw_mutex during parsing.
bool Stream::parsePacket(Socket::Buffer & buffer){
bool ret = false;
rw_mutex.lock();
while (DTSC::Stream::parsePacket(buffer)){
//TODO: Update metadata with call erik will write
//metadata.netPrepare();
ret = true;
}
rw_mutex.unlock();
if (ret){
rw_change.notify_all();
moreData.notify_all();
}
return ret;
}
/// Metadata sender that locks the rw_mutex during sending.
void Stream::sendMeta(Socket::Connection & s){
if (metadata){
rw_mutex.lock();
metadata.sendTo(s);
rw_mutex.unlock();
}
}
/// Add a user to the userlist.
/// \param newUser The user to be added.
void Stream::addUser(user * newUser){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
users.insert(newUser);
}
///\brief Removes a user to the userlist.
///\param newUser The user to be removed.
/// Removes a user from the userlist.
/// \param newUser The user to be removed.
void Stream::removeUser(user * oldUser){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
users.erase(oldUser);
}
///\brief Disconnects all users.
void Stream::disconnectUsers(){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
(*usersIt)->Disconnect("Stream reset");
}
}
///\brief Blocks the thread until new data is available.
/// Blocks the thread until new data is available.
void Stream::waitForData(){
tthread::lock_guard<tthread::recursive_mutex> guard(stats_mutex);
moreData.wait(stats_mutex);
}
/// Cuts all data before the whereToCut point.
void Stream::cutBefore(int whereToCut){
while (buffers.size() > 0 && buffers.begin()->first.seekTime < buffercount){
cutOneBuffer();
}
}
}

View file

@ -11,18 +11,12 @@
namespace Buffer {
/// Keeps track of a single streams inputs and outputs, taking care of thread safety and all other related issues.
class Stream{
class Stream : public DTSC::Stream{
public:
/// Get a reference to this Stream object.
static Stream * get();
/// Get the current statistics in JSON format.
std::string & getStats();
/// Get a new DTSC::Ring object for a user.
DTSC::Ring * getRing();
/// Drop a DTSC::Ring object.
void dropRing(DTSC::Ring * ring);
/// Get the (constant) header data of this stream.
std::string & getHeader();
/// Set the IP address to accept push data from.
void setWaitingIP(std::string ip);
/// Check if this is the IP address to accept push data from.
@ -35,29 +29,25 @@ namespace Buffer {
void saveStats(std::string username, Stats & stats);
/// Stores final statistics.
void clearStats(std::string username, Stats & stats, std::string reason);
/// Blocks until writing is safe.
void getWriteLock();
/// Drops a previously gotten write lock.
void dropWriteLock(bool newpackets_available);
/// Blocks until reading is safe.
void getReadLock();
/// Drops a previously gotten read lock.
void dropReadLock();
/// Retrieves a reference to the DTSC::Stream
DTSC::Stream * getStream();
/// Sets the buffer name.
void setName(std::string n);
/// Add a user to the userlist.
void addUser(user * newUser);
/// Delete a user from the userlist.
void removeUser(user * oldUser);
/// Disconnects all users.
void disconnectUsers();
/// Blocks the thread until new data is available.
void waitForData();
/// Sends the metadata to a specific socket
void sendMeta(Socket::Connection & s);
/// Cleanup function
~Stream();
/// TODO: WRITEME
bool parsePacket(std::string & buffer);
bool parsePacket(Socket::Buffer & buffer);
DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks);
void cutBefore(int whereToCut);
private:
void deletionCallback(DTSC::livePos deleting);
volatile int readers; ///< Current count of active readers;
volatile int writers; ///< Current count of waiting/active writers.
tthread::mutex rw_mutex; ///< Mutex for read/write locking.
@ -65,7 +55,6 @@ namespace Buffer {
static Stream * ref;
Stream();
JSON::Value Storage; ///< Global storage of data.
DTSC::Stream * Strm;
std::string waiting_ip; ///< IP address for media push.
Socket::Connection ip_input; ///< Connection used for media push.
tthread::recursive_mutex stats_mutex; ///< Mutex for stats/users modifications.

View file

@ -7,135 +7,26 @@
#include <stdlib.h>
namespace Buffer {
int user::UserCount = 0;
///\brief Creates a new user from a newly connected socket.
///
///Creates a new user from a newly connected socket.
///Also prints "User connected" text to stdout.
///\param fd A connection to the user.
user::user(Socket::Connection fd){
user::user(Socket::Connection fd, long long ID){
sID = JSON::Value(ID).asStringRef();
S = fd;
MyNum = UserCount++;
std::stringstream st;
st << MyNum;
MyStr = st.str();
curr_up = 0;
curr_down = 0;
currsend = 0;
myRing = 0;
gotproperaudio = false;
lastpointer = 0;
} //constructor
///\brief Drops held DTSC::Ring class, if one is held.
user::~user(){
} //destructor
///\brief Disconnects the current user. Doesn't do anything if already disconnected.
///
///Disconnects the current user. Doesn't do anything if already disconnected.
///Prints "Disconnected user" to stdout if disconnect took place.
///\param reason The reason for disconnecting the user.
void user::Disconnect(std::string reason){
if (S.connected()){
S.close();
}
Stream::get()->clearStats(MyStr, lastStats, reason);
Stream::get()->clearStats(sID, lastStats, reason);
} //Disconnect
///\brief Tries to send data to the user.
///
///Has a side effect of dropping the connection if send will never complete.
///\param ptr A pointer to the data that is to be sent.
///\param len The amount of bytes to be sent from this pointer.
///\return True if len bytes are sent, false otherwise.
bool user::doSend(const char * ptr, int len){
if ( !len){
return true;
} //do not do empty sends
int r = S.iwrite(ptr + currsend, len - currsend);
if (r <= 0){
if (errno == EWOULDBLOCK){
return false;
}
Disconnect(S.getError());
return false;
}
currsend += r;
return (currsend == len);
} //doSend
///\brief Try to send the current buffer.
///
///\return True if the send was succesful, false otherwise.
bool user::Send(std::set<int> & allowedTracks){
if ( !myRing){
return false;
} //no ring!
if ( !S.connected()){
return false;
} //cancel if not connected
if (myRing->waiting){
Stream::get()->waitForData();
if (!Stream::get()->getStream()->isNewest(myRing->b, allowedTracks)){
myRing->waiting = false;
Stream::get()->getReadLock();
myRing->b = Stream::get()->getStream()->getNext(myRing->b, allowedTracks);
if ((Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && (myRing->playCount > 0)) || (playUntil && playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt())){
myRing->playCount--;
if (myRing->playCount < 1 || playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt()){
myRing->playCount = 0;
JSON::Value pausemark;
pausemark["datatype"] = "pause_marker";
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
pausemark.toPacked();
S.SendNow(pausemark.toNetPacked());
}
}
Stream::get()->dropReadLock();
}
return false;
} //still waiting for next buffer?
if (myRing->starved){
//if corrupt data, warn and get new DTSC::Ring
std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
Stream::get()->dropRing(myRing);
myRing = Stream::get()->getRing();
return false;
}
//try to complete a send
Stream::get()->getReadLock();
if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){
//switch to next buffer
currsend = 0;
DTSC::livePos newPos = Stream::get()->getStream()->getNext(myRing->b, allowedTracks);
if (myRing->b == newPos){
//no next buffer? go in waiting mode.
myRing->waiting = true;
Stream::get()->dropReadLock();
return false;
}
myRing->b = newPos;
if ((Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && (myRing->playCount > 0)) || (playUntil && playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt())){
myRing->playCount--;
if (myRing->playCount < 1 || playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt()){
myRing->playCount = 0;
JSON::Value pausemark;
pausemark["datatype"] = "pause_marker";
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
pausemark.toPacked();
S.SendNow(pausemark.toNetPacked());
}
}
Stream::get()->dropReadLock();
return false;
} //completed a send
Stream::get()->dropReadLock();
Util::sleep(300);
return true;
} //send
///\brief Default stats constructor.
///
///Default stats constructor.
///Should not be used.
Stats::Stats(){
up = 0;
@ -143,8 +34,7 @@ namespace Buffer {
conntime = 0;
}
///\brief Stats constructor reading a string.
///
///Stats constructor reading a string.
///Reads a stats string and parses it to the internal representation.
///\param s The string of stats.
Stats::Stats(std::string s){

View file

@ -27,31 +27,16 @@ namespace Buffer {
class user{
public:
DTSC::Ring * myRing; ///< Ring of the buffer for this user.
int MyNum; ///< User ID of this user.
unsigned int playUntil; ///< Time until where is being played or zero if undefined.
std::string MyStr; ///< User ID of this user as a string.
std::string inbuffer; ///< Used to buffer input data.
int currsend; ///< Current amount of bytes sent.
Stats lastStats; ///< Holds last known stats for this connection.
Stats tmpStats; ///< Holds temporary stats for this connection.
std::string sID; ///< Holds the connection ID.
unsigned int curr_up; ///< Holds the current estimated transfer speed up.
unsigned int curr_down; ///< Holds the current estimated transfer speed down.
bool gotproperaudio; ///< Whether the user received proper audio yet.
void * lastpointer; ///< Pointer to data part of current buffer.
static int UserCount; ///< Global user counter.
Socket::Connection S; ///< Connection to user
/// Creates a new user from a newly connected socket.
/// Also prints "User connected" text to stdout.
user(Socket::Connection fd);
/// Drops held DTSC::Ring class, if one is held.
~user();
user(Socket::Connection fd, long long int ID);
/// Disconnects the current user. Doesn't do anything if already disconnected.
/// Prints "Disconnected user" to stdout if disconnect took place.
void Disconnect(std::string reason);
/// Tries to send the current buffer, returns true if success, false otherwise.
/// Has a side effect of dropping the connection if send will never complete.
bool doSend(const char * ptr, int len);
/// Try to send data to this user. Disconnects if any problems occur.
bool Send(std::set<int> & allowedTracks);
};
}

View file

@ -421,6 +421,24 @@ int main(int argc, char ** argv){
if (Request.isMember("meta")){
Controller::Storage["streams"][thisbuffer]["meta"] = Request["meta"];
}
if (Controller::Storage["streams"][thisbuffer].isMember("updated")){
Controller::Storage["streams"][thisbuffer].removeMember("updated");
if (Controller::Storage["streams"][thisbuffer].isMember("cut")){
it->SendNow("c"+Controller::Storage["streams"][thisbuffer]["cut"].asString()+"\n");
}else{
it->SendNow("c0\n");
}
if (Controller::Storage["streams"][thisbuffer].isMember("DVR")){
it->SendNow("d"+Controller::Storage["streams"][thisbuffer]["DVR"].asString()+"\n");
}else{
it->SendNow("d20000\n");
}
if (Controller::Storage["streams"][thisbuffer].isMember("source") && Controller::Storage["streams"][thisbuffer]["source"].asStringRef().substr(0, 7) == "push://"){
it->SendNow("s"+Controller::Storage["streams"][thisbuffer]["source"].asStringRef().substr(7)+"\n");
}else{
it->SendNow("s127.0.01\n");
}
}
if (Request.isMember("totals")){
Controller::Storage["statistics"][thisbuffer]["curr"] = Request["curr"];
std::string nowstr = Request["totals"]["now"].asString();

View file

@ -20,7 +20,10 @@ namespace Controller {
if (one.isMember("source") != two.isMember("source") || one["source"] != two["source"]){
return false;
}
if (one.isMember("DVR") != two.isMember("DVR") || one["DVR"] != two["DVR"]){
if (one.isMember("DVR") != two.isMember("DVR") || (one.isMember("DVR") && one["DVR"] != two["DVR"])){
return false;
}
if (one.isMember("cut") != two.isMember("cut") || (one.isMember("cut") && one["cut"] != two["cut"])){
return false;
}
return true;
@ -213,15 +216,24 @@ namespace Controller {
out[jit->first].null();
out[jit->first]["name"] = jit->first;
out[jit->first]["source"] = jit->second["source"];
out[jit->first]["DVR"] = jit->second["DVR"];
out[jit->first]["DVR"] = jit->second["DVR"].asInt();
out[jit->first]["cut"] = jit->second["cut"].asInt();
out[jit->first]["updated"] = 1ll;
Log("STRM", std::string("Updated stream ") + jit->first);
if (out[jit->first]["source"].asStringRef().substr(0, 7) != "push://"){
Util::Procs::Stop(jit->first);
startStream(jit->first, out[jit->first]);
}else{
if ( !Util::Procs::isActive(jit->first)){
startStream(jit->first, out[jit->first]);
}
}
}
}else{
out[jit->first]["name"] = jit->first;
out[jit->first]["source"] = jit->second["source"];
out[jit->first]["DVR"] = jit->second["DVR"];
out[jit->first]["DVR"] = jit->second["DVR"].asInt();
out[jit->first]["cut"] = jit->second["cut"].asInt();
Log("STRM", std::string("New stream ") + jit->first);
startStream(jit->first, out[jit->first]);
}