From 0a3b34e9b620e72418ef6f990ffc5852c13cbc87 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 18 Nov 2013 16:48:59 +0100 Subject: [PATCH] Updated buffer and controller for upcoming features. --- src/buffer/buffer.cpp | 325 +++++++++++++------------- src/buffer/buffer_stream.cpp | 216 +++++++---------- src/buffer/buffer_stream.h | 31 +-- src/buffer/buffer_user.cpp | 126 +--------- src/buffer/buffer_user.h | 19 +- src/controller/controller.cpp | 18 ++ src/controller/controller_streams.cpp | 22 +- 7 files changed, 306 insertions(+), 451 deletions(-) diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index 422eaec4..0d27b319 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -38,134 +38,32 @@ namespace Buffer { StatsSocket = Socket::Connection(Util::getTmpFolder() + "statistics", true); } if (StatsSocket.connected()){ - Stream::get()->getReadLock(); - StatsSocket.Send(Stream::get()->getStats()); - Stream::get()->dropReadLock(); + StatsSocket.SendNow(Stream::get()->getStats()); StatsSocket.SendNow(double_newline); + if (StatsSocket.spool()){ + //Got a response. + buffer_running = false; + } } } StatsSocket.close(); } - ///\brief A function running in a thread to handle a new user connection. - ///\param v_usr The user that is connected. - void handleUser(void * v_usr){ - std::set newSelect; - user * usr = (user*)v_usr; - thisStream->addUser(usr); -#if DEBUG >= 5 - std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl; -#endif - Stream::get()->getReadLock(); - usr->myRing = thisStream->getRing(); - if (thisStream->getStream()->metadata && thisStream->getHeader().size() > 0){ - usr->S.SendNow(thisStream->getHeader()); - } - Stream::get()->dropReadLock(); - - while (usr->S.connected()){ - if ( !usr->myRing->playCount || !usr->Send(newSelect)){ - if (usr->S.spool()){ - while (usr->S.Received().size()){ - //delete anything that doesn't end with a newline - if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ - usr->S.Received().get().clear(); - continue; - } - usr->S.Received().get().resize(usr->S.Received().get().size() - 1); - if ( !usr->S.Received().get().empty()){ - switch (usr->S.Received().get()[0]){ - case 'P': { //Push - std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; - if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ - usr->S.Received().get().clear(); - if (thisStream->setInput(usr->S)){ - std::cout << "Push accepted!" << std::endl; - usr->S = Socket::Connection( -1); - return; - }else{ - usr->Disconnect("Push denied - push already in progress!"); - } - }else{ - usr->Disconnect("Push denied - invalid IP address!"); - } - break; - } - case 'S': { //Stats - usr->tmpStats = Stats(usr->S.Received().get().substr(2)); - unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; - if (secs < 1){ - secs = 1; - } - usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; - usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; - usr->lastStats = usr->tmpStats; - thisStream->saveStats(usr->MyStr, usr->tmpStats); - Stream::get()->getReadLock(); - usr->S.SendNow(thisStream->getHeader()); - Stream::get()->dropReadLock(); - break; - } - case 't': { - if (usr->S.Received().get().size() >= 3){ - newSelect.clear(); - std::string tmp = usr->S.Received().get().substr(2); - while (tmp != ""){ - newSelect.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str())); - if (tmp.find(' ') != std::string::npos){ - tmp.erase(0,tmp.find(' ')+1); - }else{ - tmp = ""; - } - } - } - break; - } - case 's': { //second-seek - unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt(); - usr->myRing->waiting = false; - usr->myRing->starved = false; - usr->myRing->b = thisStream->getStream()->msSeek(ms, newSelect); - if (usr->myRing->playCount > 0){ - usr->myRing->playCount = 0; - } - break; - } - case 'p': { //play - usr->myRing->playCount = -1; - if (usr->S.Received().get().size() >= 2){ - usr->playUntil = atoi(usr->S.Received().get().substr(2).c_str()); - }else{ - usr->playUntil = 0; - } - break; - } - case 'o': { //once-play - if (usr->myRing->playCount >= 0){ - usr->myRing->playCount++; - } - break; - } - case 'q': { //quit-playing - usr->myRing->playCount = 0; - break; - } - } - usr->S.Received().get().clear(); - } - } - } - if (usr->myRing->waiting){ - Util::sleep(300); //sleep 5ms - } + ///\brief A function to handle input data. + ///\param conn A socket reference. + void handlePushIn(Socket::Connection & conn){ + conn.setBlocking(true); + while (buffer_running && conn.connected()){ + if (conn.spool()){ + thisStream->parsePacket(conn.Received()); } } - usr->Disconnect("Socket closed."); - thisStream->removeUser(usr); + if (buffer_running){ + thisStream->endStream(); + } } - + ///\brief A function running a thread to handle input data through stdin. - /// ///Automatically slows down to realtime playback. ///\param empty A null pointer. void handleStdin(void * empty){ @@ -178,20 +76,17 @@ namespace Buffer { char charBuffer[1024 * 10]; unsigned int charCount; long long int now; - + while (std::cin.good() && buffer_running){ //slow down packet receiving to real-time now = Util::getMS(); if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ - thisStream->getWriteLock(); - if (thisStream->getStream()->parsePacket(inBuffer)){ - lastPacket = thisStream->getStream()->getTime(); + if (thisStream->parsePacket(inBuffer)){ + lastPacket = thisStream->getTime(); if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){ timeDiff = now - lastPacket; } - thisStream->dropWriteLock(true); }else{ - thisStream->dropWriteLock(false); std::cin.read(charBuffer, 1024 * 10); charCount = std::cin.gcount(); inBuffer.append(charBuffer, charCount); @@ -203,44 +98,154 @@ namespace Buffer { buffer_running = false; } - ///\brief A function running a thread to handle input data through rtmp push. - ///\param empty A null pointer. - void handlePushin(void * empty){ - bool connected = false; - if (empty != 0){ - return; - } - while (buffer_running){ - if (thisStream->getIPInput().connected()){ - if (!connected){ - connected = true; - thisStream->getIPInput().setBlocking(true); - } - if (thisStream->getIPInput().spool()){ + ///\brief A function running in a thread to handle a new user connection. + ///\param v_usr The user that is connected. + void handleUser(void * v_usr){ + std::set allowedTracks; + user * usr = (user*)v_usr; + thisStream->addUser(usr); +#if DEBUG >= 5 + std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl; +#endif + usr->myRing = thisStream->getRing(); + thisStream->sendMeta(usr->S); - thisStream->getWriteLock(); - bool newPackets = false; - while (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ - if (thisStream->getStream()->metadata.isMember("reset")){ - thisStream->disconnectUsers(); - thisStream->getStream()->metadata.removeMember("reset"); - thisStream->getStream()->metadata.netPrepare(); + while (usr->S.connected()){ + if (usr->myRing->playCount){ + if (usr->myRing->waiting){ + Stream::get()->waitForData(); + if ( !Stream::get()->isNewest(usr->myRing->b, allowedTracks)){ + usr->myRing->waiting = false; + usr->myRing->b = Stream::get()->getNext(usr->myRing->b, allowedTracks); + if ((Stream::get()->getPacket(usr->myRing->b).isMember("keyframe") && (usr->myRing->playCount > 0)) || (usr->playUntil && usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt())){ + usr->myRing->playCount--; + if (usr->myRing->playCount < 1 || usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt()){ + usr->myRing->playCount = 0; + JSON::Value pausemark; + pausemark["datatype"] = "pause_marker"; + pausemark["time"] = Stream::get()->getPacket(usr->myRing->b)["time"].asInt(); + pausemark.toPacked(); + usr->S.SendNow(pausemark.toNetPacked()); } - newPackets = true; } - thisStream->dropWriteLock(newPackets); - + } + }else{ + //complete a send + Stream::get()->getPacket(usr->myRing->b).sendTo(usr->S); + if ( !usr->S.connected()){break;} + //switch to next buffer + if (Stream::get()->isNewest(usr->myRing->b, allowedTracks)){ + //no next buffer? go in waiting mode. + usr->myRing->waiting = true; + }else{ + usr->myRing->b = Stream::get()->getNext(usr->myRing->b, allowedTracks); + if ((Stream::get()->getPacket(usr->myRing->b).isMember("keyframe") && (usr->myRing->playCount > 0)) || (usr->playUntil && usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt())){ + usr->myRing->playCount--; + if (usr->myRing->playCount < 1 || usr->playUntil <= Stream::get()->getPacket(usr->myRing->b)["time"].asInt()){ + usr->myRing->playCount = 0; + JSON::Value pausemark; + pausemark["datatype"] = "pause_marker"; + pausemark["time"] = Stream::get()->getPacket(usr->myRing->b)["time"].asInt(); + pausemark.toPacked(); + usr->S.SendNow(pausemark.toNetPacked()); + } + } + } } - }else{ - if (connected){ - connected = false; - thisStream->getWriteLock(); - thisStream->getStream()->endStream(); - thisStream->dropWriteLock(true); + } + if (usr->S.spool()){ + while (usr->S.Received().size()){ + //delete anything that doesn't end with a newline + if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ + usr->S.Received().get().clear(); + continue; + } + usr->S.Received().get().resize(usr->S.Received().get().size() - 1); + if ( !usr->S.Received().get().empty()){ + switch (usr->S.Received().get()[0]){ + case 'P': { //Push + std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; + if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ + usr->S.Received().get().clear(); + Socket::Connection tmp = usr->S; + usr->S = Socket::Connection( -1); + thisStream->removeUser(usr); + delete usr; + return handlePushIn(tmp); + }else{ + usr->Disconnect("Push denied - invalid IP address!"); + } + break; + } + case 'S': { //Stats + usr->tmpStats = Stats(usr->S.Received().get().substr(2)); + unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; + if (secs < 1){ + secs = 1; + } + usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; + usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; + usr->lastStats = usr->tmpStats; + thisStream->saveStats(usr->sID, usr->tmpStats); + //TODO: Re-enable this + //thisStream->sendMeta(usr->S); + break; + } + case 't': { + if (usr->S.Received().get().size() >= 3){ + allowedTracks.clear(); + std::string tmp = usr->S.Received().get().substr(2); + while (tmp != ""){ + allowedTracks.insert(atoi(tmp.substr(0,tmp.find(' ')).c_str())); + if (tmp.find(' ') != std::string::npos){ + tmp.erase(0,tmp.find(' ')+1); + }else{ + tmp = ""; + } + } + } + break; + } + case 's': { //second-seek + unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt(); + usr->myRing->waiting = false; + usr->myRing->starved = false; + usr->myRing->b = thisStream->msSeek(ms, allowedTracks); + if (usr->myRing->playCount > 0){ + usr->myRing->playCount = 0; + } + break; + } + case 'p': { //play + usr->myRing->playCount = -1; + if (usr->S.Received().get().size() >= 2){ + usr->playUntil = atoi(usr->S.Received().get().substr(2).c_str()); + }else{ + usr->playUntil = 0; + } + break; + } + case 'o': { //once-play + if (usr->myRing->playCount >= 0){ + usr->myRing->playCount++; + } + break; + } + case 'q': { //quit-playing + usr->myRing->playCount = 0; + break; + } + } + usr->S.Received().get().clear(); + } } - Util::sleep(1000); //1s wait + } + if (usr->myRing->waiting){ + Util::sleep(300); //sleep 5ms } } + usr->Disconnect("Socket closed."); + thisStream->removeUser(usr); } ///\brief Starts a loop, waiting for connections to send data to. @@ -272,7 +277,7 @@ namespace Buffer { conf.activate(); thisStream = Stream::get(); thisStream->setName(name); - thisStream->getStream()->setBufferTime(conf.getInteger("time")); + thisStream->setBufferTime(conf.getInteger("time")); Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); @@ -286,16 +291,15 @@ namespace Buffer { StdinThread.detach(); }else{ thisStream->setWaitingIP(await_ip); - tthread::thread StdinThread(handlePushin, 0); - StdinThread.detach(); } + unsigned int userId = 0; while (buffer_running && SS.connected() && conf.is_active){ //check for new connections, accept them if there are any //starts a thread for every accepted connection incoming = SS.accept(true); if (incoming.connected()){ - tthread::thread thisUser(handleUser, (void *)new user(incoming)); + tthread::thread thisUser(handleUser, (void *)new user(incoming, userId++)); thisUser.detach(); }else{ Util::sleep(50);//sleep 50ms @@ -306,9 +310,6 @@ namespace Buffer { buffer_running = false; std::cout << "Buffer shutting down" << std::endl; SS.close(); - if (thisStream->getIPInput().connected()){ - thisStream->getIPInput().close(); - } delete thisStream; return 0; } diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index d1cce2d2..168e3c71 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -5,11 +5,11 @@ #include namespace Buffer { - ///\brief Stores the singleton reference. + /// Stores the singleton reference. Stream * Stream::ref = 0; - ///\brief Returns a reference to the singleton instance of this class. - ///\return A reference to the class. + /// Returns a reference to the singleton instance of this class. + /// \return A reference to the class. Stream * Stream::get(){ static tthread::mutex creator; if (ref == 0){ @@ -23,14 +23,10 @@ namespace Buffer { return ref; } - ///\brief Creates a new DTSC::Stream object, private function so only one instance can exist. - Stream::Stream(){ - Strm = new DTSC::Stream(5); - readers = 0; - writers = 0; - } + /// Creates a new DTSC::Stream object, private function so only one instance can exist. + Stream::Stream() : DTSC::Stream(5){} - ///\brief Do cleanup on delete. + /// Do cleanup on delete. Stream::~Stream(){ tthread::lock_guard guard(stats_mutex); if (users.size() > 0){ @@ -41,11 +37,10 @@ namespace Buffer { } } moreData.notify_all(); - delete Strm; } - ///\brief Calculate and return the current statistics. - ///\return The current statistics in JSON format. + /// Calculate and return the current statistics. + /// \return The current statistics in JSON format. std::string & Stream::getStats(){ static std::string ret; long long int now = Util::epoch(); @@ -64,7 +59,7 @@ namespace Buffer { Storage["totals"]["now"] = now; Storage["buffer"] = name; - Storage["meta"] = Strm->metadata; + Storage["meta"] = metadata; if(Storage["meta"].isMember("tracks") && Storage["meta"]["tracks"].size() > 0){ for(JSON::ObjIter it = Storage["meta"]["tracks"].ObjBegin(); it != Storage["meta"]["tracks"].ObjEnd(); it++){ @@ -79,33 +74,15 @@ namespace Buffer { return ret; } - ///\brief Get a new DTSC::Ring object for a user. - ///\return A new DTSC::Ring object. - DTSC::Ring * Stream::getRing(){ - return Strm->getRing(); - } - - ///\brief Drop a DTSC::Ring object. - ///\param ring The DTSC::Ring to be invalidated. - void Stream::dropRing(DTSC::Ring * ring){ - Strm->dropRing(ring); - } - - ///\brief Get the (constant) header data of this stream. - ///\return A reference to the header data of the stream. - std::string & Stream::getHeader(){ - return Strm->outHeader(); - } - - ///\brief Set the IP address to accept push data from. - ///\param ip The new IP to accept push data from. + /// Set the IP address to accept push data from. + /// \param ip The new IP to accept push data from. void Stream::setWaitingIP(std::string ip){ waiting_ip = ip; } - ///\brief Check if this is the IP address to accept push data from. - ///\param ip The IP address to check. - ///\return True if it is the correct address, false otherwise. + /// Check if this is the IP address to accept push data from. + /// \param ip The IP address to check. + /// \return True if it is the correct address, false otherwise. bool Stream::checkWaitingIP(std::string ip){ if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){ return true; @@ -115,27 +92,9 @@ namespace Buffer { } } - ///\brief Sets the current socket for push data. - ///\param S The new socket for accepting push data. - ///\return True if succesful, false otherwise. - bool Stream::setInput(Socket::Connection S){ - if (ip_input.connected()){ - return false; - }else{ - ip_input = S; - return true; - } - } - - ///\brief Gets the current socket for push data. - ///\return A reference to the push socket. - Socket::Connection & Stream::getIPInput(){ - return ip_input; - } - - ///\brief Stores intermediate statistics. - ///\param username The name of the user. - ///\param stats The final statistics to store. + /// Stores intermediate statistics. + /// \param username The name of the user. + /// \param stats The final statistics to store. void Stream::saveStats(std::string username, Stats & stats){ tthread::lock_guard guard(stats_mutex); Storage["curr"][username]["connector"] = stats.connector; @@ -146,10 +105,10 @@ namespace Buffer { Storage["curr"][username]["start"] = Util::epoch() - stats.conntime; } - ///\brief Stores final statistics. - ///\param username The name of the user. - ///\param stats The final statistics to store. - ///\param reason The reason for disconnecting. + /// Stores final statistics. + /// \param username The name of the user. + /// \param stats The final statistics to store. + /// \param reason The reason for disconnecting. void Stream::clearStats(std::string username, Stats & stats, std::string reason){ tthread::lock_guard guard(stats_mutex); if (Storage["curr"].isMember(username)){ @@ -166,91 +125,92 @@ namespace Buffer { Storage["log"][username]["host"] = stats.host; Storage["log"][username]["start"] = Util::epoch() - stats.conntime; } - - ///\brief Ask to obtain a write lock. - /// - /// Blocks until writing is safe. - void Stream::getWriteLock(){ - rw_mutex.lock(); - writers++; - while (writers != 1 && readers != 0){ - rw_change.wait(rw_mutex); - } - rw_mutex.unlock(); - } - - ///\brief Drops a previously obtained write lock. - ///\param newPacketsAvailable Whether new packets are available to update the index. - void Stream::dropWriteLock(bool newPacketsAvailable){ - if (newPacketsAvailable){ - Strm->metadata.netPrepare(); - } - rw_mutex.lock(); - writers--; - rw_mutex.unlock(); - rw_change.notify_all(); - if (newPacketsAvailable){ - moreData.notify_all(); + /// The deletion callback override that will disconnect users + /// whom are currently receiving a tag that is being deleted. + void Stream::deletionCallback(DTSC::livePos deleting){ + tthread::lock_guard guard(stats_mutex); + for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ + if ((*usersIt)->myRing->b == deleting){ + (*usersIt)->Disconnect("Buffer underrun"); + } } } - ///\brief Ask to obtain a read lock. - /// - ///Blocks until reading is safe. - void Stream::getReadLock(){ - rw_mutex.lock(); - while (writers > 0){ - rw_change.wait(rw_mutex); - } - readers++; - rw_mutex.unlock(); - } - - ///\brief Drops a previously obtained read lock. - void Stream::dropReadLock(){ - rw_mutex.lock(); - readers--; - rw_mutex.unlock(); - rw_change.notify_all(); - } - - ///\brief Retrieves a reference to the DTSC::Stream - ///\return A reference to the used DTSC::Stream - DTSC::Stream * Stream::getStream(){ - return Strm; - } - - ///\brief Sets the buffer name. - ///\param n The new name of the buffer. + /// Sets the buffer name. + /// \param n The new name of the buffer. void Stream::setName(std::string n){ name = n; } + + /// parsePacket override that will lock the rw_mutex during parsing. + bool Stream::parsePacket(std::string & buffer){ + rw_mutex.lock(); + bool ret = DTSC::Stream::parsePacket(buffer); + rw_mutex.unlock(); + if (ret){ + rw_change.notify_all(); + moreData.notify_all(); + } + return ret; + } + + /// getNext override that will lock the rw_mutex during checking. + DTSC::livePos Stream::getNext(DTSC::livePos & pos, std::set & allowedTracks){ + tthread::lock_guard guard(rw_mutex); + return DTSC::Stream::getNext(pos, allowedTracks); + } + + /// parsePacket override that will lock the rw_mutex during parsing. + bool Stream::parsePacket(Socket::Buffer & buffer){ + bool ret = false; + rw_mutex.lock(); + while (DTSC::Stream::parsePacket(buffer)){ + //TODO: Update metadata with call erik will write + //metadata.netPrepare(); + ret = true; + } + rw_mutex.unlock(); + if (ret){ + rw_change.notify_all(); + moreData.notify_all(); + } + return ret; + } + + /// Metadata sender that locks the rw_mutex during sending. + void Stream::sendMeta(Socket::Connection & s){ + if (metadata){ + rw_mutex.lock(); + metadata.sendTo(s); + rw_mutex.unlock(); + } + } - ///\brief Add a user to the userlist. - ///\param newUser The user to be added. + /// Add a user to the userlist. + /// \param newUser The user to be added. void Stream::addUser(user * newUser){ tthread::lock_guard guard(stats_mutex); users.insert(newUser); } - ///\brief Removes a user to the userlist. - ///\param newUser The user to be removed. + /// Removes a user from the userlist. + /// \param newUser The user to be removed. void Stream::removeUser(user * oldUser){ tthread::lock_guard guard(stats_mutex); users.erase(oldUser); } - ///\brief Disconnects all users. - void Stream::disconnectUsers(){ - tthread::lock_guard guard(stats_mutex); - for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ - (*usersIt)->Disconnect("Stream reset"); - } - } - - ///\brief Blocks the thread until new data is available. + /// Blocks the thread until new data is available. void Stream::waitForData(){ tthread::lock_guard guard(stats_mutex); moreData.wait(stats_mutex); } + + /// Cuts all data before the whereToCut point. + void Stream::cutBefore(int whereToCut){ + while (buffers.size() > 0 && buffers.begin()->first.seekTime < buffercount){ + cutOneBuffer(); + } + } + } diff --git a/src/buffer/buffer_stream.h b/src/buffer/buffer_stream.h index 490a1624..9911df03 100644 --- a/src/buffer/buffer_stream.h +++ b/src/buffer/buffer_stream.h @@ -11,18 +11,12 @@ namespace Buffer { /// Keeps track of a single streams inputs and outputs, taking care of thread safety and all other related issues. - class Stream{ + class Stream : public DTSC::Stream{ public: /// Get a reference to this Stream object. static Stream * get(); /// Get the current statistics in JSON format. std::string & getStats(); - /// Get a new DTSC::Ring object for a user. - DTSC::Ring * getRing(); - /// Drop a DTSC::Ring object. - void dropRing(DTSC::Ring * ring); - /// Get the (constant) header data of this stream. - std::string & getHeader(); /// Set the IP address to accept push data from. void setWaitingIP(std::string ip); /// Check if this is the IP address to accept push data from. @@ -35,29 +29,25 @@ namespace Buffer { void saveStats(std::string username, Stats & stats); /// Stores final statistics. void clearStats(std::string username, Stats & stats, std::string reason); - /// Blocks until writing is safe. - void getWriteLock(); - /// Drops a previously gotten write lock. - void dropWriteLock(bool newpackets_available); - /// Blocks until reading is safe. - void getReadLock(); - /// Drops a previously gotten read lock. - void dropReadLock(); - /// Retrieves a reference to the DTSC::Stream - DTSC::Stream * getStream(); /// Sets the buffer name. void setName(std::string n); /// Add a user to the userlist. void addUser(user * newUser); /// Delete a user from the userlist. void removeUser(user * oldUser); - /// Disconnects all users. - void disconnectUsers(); /// Blocks the thread until new data is available. void waitForData(); + /// Sends the metadata to a specific socket + void sendMeta(Socket::Connection & s); /// Cleanup function ~Stream(); - private: + /// TODO: WRITEME + bool parsePacket(std::string & buffer); + bool parsePacket(Socket::Buffer & buffer); + DTSC::livePos getNext(DTSC::livePos & pos, std::set & allowedTracks); + void cutBefore(int whereToCut); + private: + void deletionCallback(DTSC::livePos deleting); volatile int readers; ///< Current count of active readers; volatile int writers; ///< Current count of waiting/active writers. tthread::mutex rw_mutex; ///< Mutex for read/write locking. @@ -65,7 +55,6 @@ namespace Buffer { static Stream * ref; Stream(); JSON::Value Storage; ///< Global storage of data. - DTSC::Stream * Strm; std::string waiting_ip; ///< IP address for media push. Socket::Connection ip_input; ///< Connection used for media push. tthread::recursive_mutex stats_mutex; ///< Mutex for stats/users modifications. diff --git a/src/buffer/buffer_user.cpp b/src/buffer/buffer_user.cpp index 866e829b..f8dad153 100644 --- a/src/buffer/buffer_user.cpp +++ b/src/buffer/buffer_user.cpp @@ -7,135 +7,26 @@ #include namespace Buffer { - int user::UserCount = 0; - - ///\brief Creates a new user from a newly connected socket. - /// + ///Creates a new user from a newly connected socket. ///Also prints "User connected" text to stdout. ///\param fd A connection to the user. - user::user(Socket::Connection fd){ + user::user(Socket::Connection fd, long long ID){ + sID = JSON::Value(ID).asStringRef(); S = fd; - MyNum = UserCount++; - std::stringstream st; - st << MyNum; - MyStr = st.str(); curr_up = 0; curr_down = 0; - currsend = 0; myRing = 0; - gotproperaudio = false; - lastpointer = 0; } //constructor - ///\brief Drops held DTSC::Ring class, if one is held. - user::~user(){ - } //destructor - - ///\brief Disconnects the current user. Doesn't do anything if already disconnected. - /// + ///Disconnects the current user. Doesn't do anything if already disconnected. ///Prints "Disconnected user" to stdout if disconnect took place. ///\param reason The reason for disconnecting the user. void user::Disconnect(std::string reason){ - if (S.connected()){ - S.close(); - } - Stream::get()->clearStats(MyStr, lastStats, reason); + S.close(); + Stream::get()->clearStats(sID, lastStats, reason); } //Disconnect - ///\brief Tries to send data to the user. - /// - ///Has a side effect of dropping the connection if send will never complete. - ///\param ptr A pointer to the data that is to be sent. - ///\param len The amount of bytes to be sent from this pointer. - ///\return True if len bytes are sent, false otherwise. - bool user::doSend(const char * ptr, int len){ - if ( !len){ - return true; - } //do not do empty sends - int r = S.iwrite(ptr + currsend, len - currsend); - if (r <= 0){ - if (errno == EWOULDBLOCK){ - return false; - } - Disconnect(S.getError()); - return false; - } - currsend += r; - return (currsend == len); - } //doSend - - ///\brief Try to send the current buffer. - /// - ///\return True if the send was succesful, false otherwise. - bool user::Send(std::set & allowedTracks){ - if ( !myRing){ - return false; - } //no ring! - if ( !S.connected()){ - return false; - } //cancel if not connected - if (myRing->waiting){ - Stream::get()->waitForData(); - if (!Stream::get()->getStream()->isNewest(myRing->b, allowedTracks)){ - myRing->waiting = false; - Stream::get()->getReadLock(); - myRing->b = Stream::get()->getStream()->getNext(myRing->b, allowedTracks); - if ((Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && (myRing->playCount > 0)) || (playUntil && playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt())){ - myRing->playCount--; - if (myRing->playCount < 1 || playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt()){ - myRing->playCount = 0; - JSON::Value pausemark; - pausemark["datatype"] = "pause_marker"; - pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt(); - pausemark.toPacked(); - S.SendNow(pausemark.toNetPacked()); - } - } - Stream::get()->dropReadLock(); - } - return false; - } //still waiting for next buffer? - if (myRing->starved){ - //if corrupt data, warn and get new DTSC::Ring - std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; - Stream::get()->dropRing(myRing); - myRing = Stream::get()->getRing(); - return false; - } - //try to complete a send - Stream::get()->getReadLock(); - if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){ - //switch to next buffer - currsend = 0; - DTSC::livePos newPos = Stream::get()->getStream()->getNext(myRing->b, allowedTracks); - if (myRing->b == newPos){ - //no next buffer? go in waiting mode. - myRing->waiting = true; - Stream::get()->dropReadLock(); - return false; - } - myRing->b = newPos; - if ((Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && (myRing->playCount > 0)) || (playUntil && playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt())){ - myRing->playCount--; - if (myRing->playCount < 1 || playUntil <= Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt()){ - myRing->playCount = 0; - JSON::Value pausemark; - pausemark["datatype"] = "pause_marker"; - pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt(); - pausemark.toPacked(); - S.SendNow(pausemark.toNetPacked()); - } - } - Stream::get()->dropReadLock(); - return false; - } //completed a send - Stream::get()->dropReadLock(); - Util::sleep(300); - return true; - } //send - - ///\brief Default stats constructor. - /// + ///Default stats constructor. ///Should not be used. Stats::Stats(){ up = 0; @@ -143,8 +34,7 @@ namespace Buffer { conntime = 0; } - ///\brief Stats constructor reading a string. - /// + ///Stats constructor reading a string. ///Reads a stats string and parses it to the internal representation. ///\param s The string of stats. Stats::Stats(std::string s){ diff --git a/src/buffer/buffer_user.h b/src/buffer/buffer_user.h index 23008c33..97f51499 100644 --- a/src/buffer/buffer_user.h +++ b/src/buffer/buffer_user.h @@ -27,31 +27,16 @@ namespace Buffer { class user{ public: DTSC::Ring * myRing; ///< Ring of the buffer for this user. - int MyNum; ///< User ID of this user. unsigned int playUntil; ///< Time until where is being played or zero if undefined. - std::string MyStr; ///< User ID of this user as a string. - std::string inbuffer; ///< Used to buffer input data. - int currsend; ///< Current amount of bytes sent. Stats lastStats; ///< Holds last known stats for this connection. Stats tmpStats; ///< Holds temporary stats for this connection. + std::string sID; ///< Holds the connection ID. unsigned int curr_up; ///< Holds the current estimated transfer speed up. unsigned int curr_down; ///< Holds the current estimated transfer speed down. - bool gotproperaudio; ///< Whether the user received proper audio yet. - void * lastpointer; ///< Pointer to data part of current buffer. - static int UserCount; ///< Global user counter. Socket::Connection S; ///< Connection to user /// Creates a new user from a newly connected socket. - /// Also prints "User connected" text to stdout. - user(Socket::Connection fd); - /// Drops held DTSC::Ring class, if one is held. - ~user(); + user(Socket::Connection fd, long long int ID); /// Disconnects the current user. Doesn't do anything if already disconnected. - /// Prints "Disconnected user" to stdout if disconnect took place. void Disconnect(std::string reason); - /// Tries to send the current buffer, returns true if success, false otherwise. - /// Has a side effect of dropping the connection if send will never complete. - bool doSend(const char * ptr, int len); - /// Try to send data to this user. Disconnects if any problems occur. - bool Send(std::set & allowedTracks); }; } diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 6484aa24..452edcbc 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -421,6 +421,24 @@ int main(int argc, char ** argv){ if (Request.isMember("meta")){ Controller::Storage["streams"][thisbuffer]["meta"] = Request["meta"]; } + if (Controller::Storage["streams"][thisbuffer].isMember("updated")){ + Controller::Storage["streams"][thisbuffer].removeMember("updated"); + if (Controller::Storage["streams"][thisbuffer].isMember("cut")){ + it->SendNow("c"+Controller::Storage["streams"][thisbuffer]["cut"].asString()+"\n"); + }else{ + it->SendNow("c0\n"); + } + if (Controller::Storage["streams"][thisbuffer].isMember("DVR")){ + it->SendNow("d"+Controller::Storage["streams"][thisbuffer]["DVR"].asString()+"\n"); + }else{ + it->SendNow("d20000\n"); + } + if (Controller::Storage["streams"][thisbuffer].isMember("source") && Controller::Storage["streams"][thisbuffer]["source"].asStringRef().substr(0, 7) == "push://"){ + it->SendNow("s"+Controller::Storage["streams"][thisbuffer]["source"].asStringRef().substr(7)+"\n"); + }else{ + it->SendNow("s127.0.01\n"); + } + } if (Request.isMember("totals")){ Controller::Storage["statistics"][thisbuffer]["curr"] = Request["curr"]; std::string nowstr = Request["totals"]["now"].asString(); diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index a40e406b..3d86e04b 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -20,7 +20,10 @@ namespace Controller { if (one.isMember("source") != two.isMember("source") || one["source"] != two["source"]){ return false; } - if (one.isMember("DVR") != two.isMember("DVR") || one["DVR"] != two["DVR"]){ + if (one.isMember("DVR") != two.isMember("DVR") || (one.isMember("DVR") && one["DVR"] != two["DVR"])){ + return false; + } + if (one.isMember("cut") != two.isMember("cut") || (one.isMember("cut") && one["cut"] != two["cut"])){ return false; } return true; @@ -213,15 +216,24 @@ namespace Controller { out[jit->first].null(); out[jit->first]["name"] = jit->first; out[jit->first]["source"] = jit->second["source"]; - out[jit->first]["DVR"] = jit->second["DVR"]; + out[jit->first]["DVR"] = jit->second["DVR"].asInt(); + out[jit->first]["cut"] = jit->second["cut"].asInt(); + out[jit->first]["updated"] = 1ll; Log("STRM", std::string("Updated stream ") + jit->first); - Util::Procs::Stop(jit->first); - startStream(jit->first, out[jit->first]); + if (out[jit->first]["source"].asStringRef().substr(0, 7) != "push://"){ + Util::Procs::Stop(jit->first); + startStream(jit->first, out[jit->first]); + }else{ + if ( !Util::Procs::isActive(jit->first)){ + startStream(jit->first, out[jit->first]); + } + } } }else{ out[jit->first]["name"] = jit->first; out[jit->first]["source"] = jit->second["source"]; - out[jit->first]["DVR"] = jit->second["DVR"]; + out[jit->first]["DVR"] = jit->second["DVR"].asInt(); + out[jit->first]["cut"] = jit->second["cut"].asInt(); Log("STRM", std::string("New stream ") + jit->first); startStream(jit->first, out[jit->first]); }