Buffer edits for live support

This commit is contained in:
Erik Zandvliet 2013-02-19 10:36:22 +01:00 committed by Thulinma
parent 65a5c2c9b0
commit 6644132762
5 changed files with 52 additions and 18 deletions

View file

@ -64,7 +64,7 @@ namespace Buffer {
while (usr->S.connected()){ while (usr->S.connected()){
usleep(5000); //sleep 5ms usleep(5000); //sleep 5ms
usr->Send(); if( !usr->Send()){
if (usr->S.spool()){ if (usr->S.spool()){
while (usr->S.Received().size()){ while (usr->S.Received().size()){
//delete anything that doesn't end with a newline //delete anything that doesn't end with a newline
@ -89,8 +89,8 @@ namespace Buffer {
}else{ }else{
usr->Disconnect("Push denied - invalid IP address!"); usr->Disconnect("Push denied - invalid IP address!");
} }
}
break; break;
}
case 'S': { //Stats case 'S': { //Stats
usr->tmpStats = Stats(usr->S.Received().get().substr(2)); usr->tmpStats = Stats(usr->S.Received().get().substr(2));
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
@ -101,30 +101,34 @@ namespace Buffer {
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
usr->lastStats = usr->tmpStats; usr->lastStats = usr->tmpStats;
thisStream->saveStats(usr->MyStr, usr->tmpStats); thisStream->saveStats(usr->MyStr, usr->tmpStats);
}
break; break;
}
case 's': { //second-seek case 's': { //second-seek
//ignored for now unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt();
} usr->myRing.waiting = false;
usr->myRing.starved = false;
usr->myRing.b = thisStream->getStream()->msSeek(ms);
break; break;
}
case 'f': { //frame-seek case 'f': { //frame-seek
//ignored for now //ignored for now
}
break; break;
}
case 'p': { //play case 'p': { //play
//ignored for now //ignored for now
}
break; break;
}
case 'o': { //once-play case 'o': { //once-play
//ignored for now //ignored for now
}
break; break;
}
case 'q': { //quit-playing case 'q': { //quit-playing
//ignored for now //ignored for now
}
break; break;
}
} }
usr->S.Received().get().clear(); usr->S.Received().get().clear();
}
} }
} }
} }
@ -148,22 +152,30 @@ namespace Buffer {
//slow down packet receiving to real-time //slow down packet receiving to real-time
now = getNowMS(); now = getNowMS();
if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
fprintf( stderr, "Obtaining write lock... " );
thisStream->getWriteLock(); thisStream->getWriteLock();
fprintf( stderr, "Done.\n" );
if (thisStream->getStream()->parsePacket(inBuffer)){ if (thisStream->getStream()->parsePacket(inBuffer)){
fprintf( stderr, "Receiving a packet... " );
thisStream->getStream()->outPacket(0); thisStream->getStream()->outPacket(0);
lastPacket = thisStream->getStream()->getTime(); lastPacket = thisStream->getStream()->getTime();
if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){ if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){
timeDiff = now - lastPacket; timeDiff = now - lastPacket;
} }
thisStream->dropWriteLock(true); thisStream->dropWriteLock(true);
fprintf( stderr, "Done.\n" );
}else{ }else{
fprintf( stderr, "Not receiving a packet... " );
thisStream->dropWriteLock(false); thisStream->dropWriteLock(false);
std::cin.read(charBuffer, 1024 * 10); std::cin.read(charBuffer, 1024 * 10);
charCount = std::cin.gcount(); charCount = std::cin.gcount();
inBuffer.append(charBuffer, charCount); inBuffer.append(charBuffer, charCount);
fprintf( stderr, "Done.\n" );
} }
}else{ }else{
fprintf( stderr, "Sleeping: %d...", std::min(14999LL, lastPacket - (now - timeDiff)) * 1000 );
usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000); usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000);
fprintf( stderr, "Done.\n" );
} }
} }
buffer_running = false; buffer_running = false;

View file

@ -184,6 +184,13 @@ void Buffer::Stream::getWriteLock(){
/// Drops a previously gotten write lock. /// Drops a previously gotten write lock.
void Buffer::Stream::dropWriteLock(bool newpackets_available){ void Buffer::Stream::dropWriteLock(bool newpackets_available){
if (newpackets_available){
if (Strm->getPacket(0).isMember("keyframe")){
stats_mutex.lock();
Strm->updateHeaders();
stats_mutex.unlock();
}
}
rw_mutex.lock(); rw_mutex.lock();
writers--; writers--;
rw_mutex.unlock(); rw_mutex.unlock();

View file

@ -57,23 +57,29 @@ bool Buffer::user::doSend(const char * ptr, int len){
} //doSend } //doSend
/// Try to send data to this user. Disconnects if any problems occur. /// Try to send data to this user. Disconnects if any problems occur.
void Buffer::user::Send(){ bool Buffer::user::Send(){
if ( !myRing){ if ( !myRing){
return; return false;
} //no ring! } //no ring!
if ( !S.connected()){ if ( !S.connected()){
return; return false;
} //cancel if not connected } //cancel if not connected
if (myRing->waiting){ if (myRing->waiting){
Stream::get()->waitForData(); Stream::get()->waitForData();
return; if( myRing->updated ) {
Stream::get()->getReadLock();
S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() );
Stream::get()->dropReadLock();
myRing->updated = false;
}
return false;
} //still waiting for next buffer? } //still waiting for next buffer?
if (myRing->starved){ if (myRing->starved){
//if corrupt data, warn and get new DTSC::Ring //if corrupt data, warn and get new DTSC::Ring
std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
Stream::get()->dropRing(myRing); Stream::get()->dropRing(myRing);
myRing = Stream::get()->getRing(); myRing = Stream::get()->getRing();
return; return false;
} }
//try to complete a send //try to complete a send
Stream::get()->getReadLock(); Stream::get()->getReadLock();
@ -82,11 +88,20 @@ void Buffer::user::Send(){
currsend = 0; currsend = 0;
if (myRing->b <= 0){ if (myRing->b <= 0){
myRing->waiting = true; myRing->waiting = true;
return; return false;
} //no next buffer? go in waiting mode. } //no next buffer? go in waiting mode.
myRing->b--; myRing->b--;
if( myRing->updated ) {
Stream::get()->getReadLock();
S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() );
Stream::get()->dropReadLock();
myRing->updated = false;
}
Stream::get()->dropReadLock();
return true;
} //completed a send } //completed a send
Stream::get()->dropReadLock(); Stream::get()->dropReadLock();
return false;
} //send } //send
/// Default constructor - should not be in use. /// Default constructor - should not be in use.

View file

@ -50,6 +50,6 @@ namespace Buffer {
/// Has a side effect of dropping the connection if send will never complete. /// Has a side effect of dropping the connection if send will never complete.
bool doSend(const char * ptr, int len); bool doSend(const char * ptr, int len);
/// Try to send data to this user. Disconnects if any problems occur. /// Try to send data to this user. Disconnects if any problems occur.
void Send(); bool Send();
}; };
} }

View file

@ -57,7 +57,7 @@ namespace Connector_HTTP {
Result << "#EXTM3U\r\n" Result << "#EXTM3U\r\n"
//"#EXT-X-VERSION:1\r\n" //"#EXT-X-VERSION:1\r\n"
//"#EXT-X-ALLOW-CACHE:YES\r\n" //"#EXT-X-ALLOW-CACHE:YES\r\n"
"#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n" "#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n"
"#EXT-X-MEDIA-SEQUENCE:0\r\n"; "#EXT-X-MEDIA-SEQUENCE:0\r\n";
//"#EXT-X-PLAYLIST-TYPE:VOD\r\n"; //"#EXT-X-PLAYLIST-TYPE:VOD\r\n";
int lastDuration = 0; int lastDuration = 0;