Improved track handling for live streams.

This commit is contained in:
Thulinma 2013-12-17 15:58:43 +01:00
parent 733868aa70
commit a60b674471
3 changed files with 45 additions and 1 deletions

View file

@ -59,12 +59,18 @@ namespace Buffer {
pthread_setname_np(pthread_self(), "Push Input");
#endif
conn.setBlocking(true);
int sockNo = 0;
while (buffer_running && conn.connected()){
thisStream->parsePacket(conn);
}
if (buffer_running){
thisStream->endStream();
}
long long int wait_time = Util::getMS();
while (Util::getMS() - wait_time < thisStream->metadata.bufferWindow){
Util::sleep(thisStream->metadata.bufferWindow);
}
thisStream->removeSocket(sockNo);
}
///\brief A function running a thread to handle input data through stdin.

View file

@ -175,6 +175,39 @@ namespace Buffer {
return DTSC::Stream::getNext(pos, allowedTracks);
}
/// Removes a track and all related buffers from the stream.
void Stream::removeTrack(int trackId){
metadata.tracks.erase(trackId);
std::set<DTSC::livePos> toDelete;
for (std::map<DTSC::livePos, JSON::Value >::iterator it = buffers.begin(); it != buffers.end(); it++){
if (it->first.trackID == trackId){
toDelete.insert(it->first);
}
}
while (toDelete.size()){
deletionCallback(*toDelete.begin());
buffers.erase(*toDelete.begin());
keyframes[trackId].erase(*toDelete.begin());
toDelete.erase(toDelete.begin());
}
}
/// Calls removeTrack on all tracks that were streaming from this socket number.
void Stream::removeSocket(int sockNo){
std::set<int> toDelete;
std::map<int,DTSC::Track>::iterator it;
for (it = metadata.tracks.begin(); it != metadata.tracks.end(); ++it){
if ((it->first & (sockNo << 16)) == (sockNo << 16)){
toDelete.insert(it->first);
std::cout << "Input lost, removing track: " << it->second.getIdentifier() << std::endl;
}
}
while (toDelete.size()){
removeTrack(*toDelete.begin());
toDelete.erase(toDelete.begin());
}
}
/// parsePacket override that will lock the rw_mutex during parsing.
bool Stream::parsePacket(Socket::Connection & c){
bool ret = false;

View file

@ -73,8 +73,13 @@ namespace Buffer {
void sendMeta(Socket::Connection & s);
/// Cleanup function
~Stream();
/// TODO: WRITEME
/// Removes a track and all related buffers from the stream.
void removeTrack(int trackId);
/// Calls removeTrack on all tracks that were streaming from this socket number.
void removeSocket(int sockNo);
/// Thread-safe parsePacket override.
bool parsePacket(std::string & buffer);
/// Thread-safe parsePacket override.
bool parsePacket(Socket::Connection & c);
DTSC::livePos getNext(DTSC::livePos & pos, std::set<int> & allowedTracks);
private: