diff --git a/src/buffer/buffer.cpp b/src/buffer/buffer.cpp index 8d94a4bf..13212a45 100644 --- a/src/buffer/buffer.cpp +++ b/src/buffer/buffer.cpp @@ -59,12 +59,18 @@ namespace Buffer { pthread_setname_np(pthread_self(), "Push Input"); #endif conn.setBlocking(true); + int sockNo = 0; while (buffer_running && conn.connected()){ thisStream->parsePacket(conn); } if (buffer_running){ thisStream->endStream(); } + long long int wait_time = Util::getMS(); + while (Util::getMS() - wait_time < thisStream->metadata.bufferWindow){ + Util::sleep(thisStream->metadata.bufferWindow); + } + thisStream->removeSocket(sockNo); } ///\brief A function running a thread to handle input data through stdin. diff --git a/src/buffer/buffer_stream.cpp b/src/buffer/buffer_stream.cpp index bc18b358..22cd22f2 100644 --- a/src/buffer/buffer_stream.cpp +++ b/src/buffer/buffer_stream.cpp @@ -175,6 +175,39 @@ namespace Buffer { return DTSC::Stream::getNext(pos, allowedTracks); } + /// Removes a track and all related buffers from the stream. + void Stream::removeTrack(int trackId){ + metadata.tracks.erase(trackId); + std::set toDelete; + for (std::map::iterator it = buffers.begin(); it != buffers.end(); it++){ + if (it->first.trackID == trackId){ + toDelete.insert(it->first); + } + } + while (toDelete.size()){ + deletionCallback(*toDelete.begin()); + buffers.erase(*toDelete.begin()); + keyframes[trackId].erase(*toDelete.begin()); + toDelete.erase(toDelete.begin()); + } + } + + /// Calls removeTrack on all tracks that were streaming from this socket number. + void Stream::removeSocket(int sockNo){ + std::set toDelete; + std::map::iterator it; + for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){ + if ((it->first & (sockNo << 16)) == (sockNo << 16)){ + toDelete.insert(it->first); + std::cout << "Input lost, removing track: " << it->second.getIdentifier() << std::endl; + } + } + while (toDelete.size()){ + removeTrack(*toDelete.begin()); + toDelete.erase(toDelete.begin()); + } + } + /// parsePacket override that will lock the rw_mutex during parsing. bool Stream::parsePacket(Socket::Connection & c){ bool ret = false; diff --git a/src/buffer/buffer_stream.h b/src/buffer/buffer_stream.h index d523b631..7f0018c1 100644 --- a/src/buffer/buffer_stream.h +++ b/src/buffer/buffer_stream.h @@ -73,8 +73,13 @@ namespace Buffer { void sendMeta(Socket::Connection & s); /// Cleanup function ~Stream(); - /// TODO: WRITEME + /// Removes a track and all related buffers from the stream. + void removeTrack(int trackId); + /// Calls removeTrack on all tracks that were streaming from this socket number. + void removeSocket(int sockNo); + /// Thread-safe parsePacket override. bool parsePacket(std::string & buffer); + /// Thread-safe parsePacket override. bool parsePacket(Socket::Connection & c); DTSC::livePos getNext(DTSC::livePos & pos, std::set & allowedTracks); private: