From 66441327620e00031466f89e73dbea9ebeec6c27 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Tue, 19 Feb 2013 10:36:22 +0100 Subject: [PATCH] Buffer edits for live support --- src/buffer.cpp | 32 ++++++++++++++++++++++---------- src/buffer_stream.cpp | 7 +++++++ src/buffer_user.cpp | 27 +++++++++++++++++++++------ src/buffer_user.h | 2 +- src/conn_http_live.cpp | 2 +- 5 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/buffer.cpp b/src/buffer.cpp index 72405cea..0004644e 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -64,7 +64,7 @@ namespace Buffer { while (usr->S.connected()){ usleep(5000); //sleep 5ms - usr->Send(); + if( !usr->Send()){ if (usr->S.spool()){ while (usr->S.Received().size()){ //delete anything that doesn't end with a newline @@ -89,8 +89,8 @@ namespace Buffer { }else{ usr->Disconnect("Push denied - invalid IP address!"); } - } break; + } case 'S': { //Stats usr->tmpStats = Stats(usr->S.Received().get().substr(2)); unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; @@ -101,30 +101,34 @@ namespace Buffer { usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; usr->lastStats = usr->tmpStats; thisStream->saveStats(usr->MyStr, usr->tmpStats); - } break; + } case 's': { //second-seek - //ignored for now - } + unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt(); + usr->myRing.waiting = false; + usr->myRing.starved = false; + usr->myRing.b = thisStream->getStream()->msSeek(ms); break; + } case 'f': { //frame-seek //ignored for now - } break; + } case 'p': { //play //ignored for now - } break; + } case 'o': { //once-play //ignored for now - } break; + } case 'q': { //quit-playing //ignored for now - } break; + } } - usr->S.Received().get().clear(); + usr->S.Received().get().clear(); + } } } } @@ -148,22 +152,30 @@ namespace Buffer { //slow down packet receiving to real-time now = getNowMS(); if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ + fprintf( stderr, "Obtaining write lock... " ); thisStream->getWriteLock(); + fprintf( stderr, "Done.\n" ); if (thisStream->getStream()->parsePacket(inBuffer)){ + fprintf( stderr, "Receiving a packet... " ); thisStream->getStream()->outPacket(0); lastPacket = thisStream->getStream()->getTime(); if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){ timeDiff = now - lastPacket; } thisStream->dropWriteLock(true); + fprintf( stderr, "Done.\n" ); }else{ + fprintf( stderr, "Not receiving a packet... " ); thisStream->dropWriteLock(false); std::cin.read(charBuffer, 1024 * 10); charCount = std::cin.gcount(); inBuffer.append(charBuffer, charCount); + fprintf( stderr, "Done.\n" ); } }else{ + fprintf( stderr, "Sleeping: %d...", std::min(14999LL, lastPacket - (now - timeDiff)) * 1000 ); usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000); + fprintf( stderr, "Done.\n" ); } } buffer_running = false; diff --git a/src/buffer_stream.cpp b/src/buffer_stream.cpp index 0d154cf8..a4f9c1fa 100644 --- a/src/buffer_stream.cpp +++ b/src/buffer_stream.cpp @@ -184,6 +184,13 @@ void Buffer::Stream::getWriteLock(){ /// Drops a previously gotten write lock. void Buffer::Stream::dropWriteLock(bool newpackets_available){ + if (newpackets_available){ + if (Strm->getPacket(0).isMember("keyframe")){ + stats_mutex.lock(); + Strm->updateHeaders(); + stats_mutex.unlock(); + } + } rw_mutex.lock(); writers--; rw_mutex.unlock(); diff --git a/src/buffer_user.cpp b/src/buffer_user.cpp index 5cb42baf..4d20dc5d 100644 --- a/src/buffer_user.cpp +++ b/src/buffer_user.cpp @@ -57,23 +57,29 @@ bool Buffer::user::doSend(const char * ptr, int len){ } //doSend /// Try to send data to this user. Disconnects if any problems occur. -void Buffer::user::Send(){ +bool Buffer::user::Send(){ if ( !myRing){ - return; + return false; } //no ring! if ( !S.connected()){ - return; + return false; } //cancel if not connected if (myRing->waiting){ Stream::get()->waitForData(); - return; + if( myRing->updated ) { + Stream::get()->getReadLock(); + S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() ); + Stream::get()->dropReadLock(); + myRing->updated = false; + } + return false; } //still waiting for next buffer? if (myRing->starved){ //if corrupt data, warn and get new DTSC::Ring std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; Stream::get()->dropRing(myRing); myRing = Stream::get()->getRing(); - return; + return false; } //try to complete a send Stream::get()->getReadLock(); @@ -82,11 +88,20 @@ void Buffer::user::Send(){ currsend = 0; if (myRing->b <= 0){ myRing->waiting = true; - return; + return false; } //no next buffer? go in waiting mode. myRing->b--; + if( myRing->updated ) { + Stream::get()->getReadLock(); + S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() ); + Stream::get()->dropReadLock(); + myRing->updated = false; + } + Stream::get()->dropReadLock(); + return true; } //completed a send Stream::get()->dropReadLock(); + return false; } //send /// Default constructor - should not be in use. diff --git a/src/buffer_user.h b/src/buffer_user.h index a58acb45..aedf1242 100644 --- a/src/buffer_user.h +++ b/src/buffer_user.h @@ -50,6 +50,6 @@ namespace Buffer { /// Has a side effect of dropping the connection if send will never complete. bool doSend(const char * ptr, int len); /// Try to send data to this user. Disconnects if any problems occur. - void Send(); + bool Send(); }; } diff --git a/src/conn_http_live.cpp b/src/conn_http_live.cpp index 71fcc4f4..ecbad952 100644 --- a/src/conn_http_live.cpp +++ b/src/conn_http_live.cpp @@ -57,7 +57,7 @@ namespace Connector_HTTP { Result << "#EXTM3U\r\n" //"#EXT-X-VERSION:1\r\n" //"#EXT-X-ALLOW-CACHE:YES\r\n" - "#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n" + "#EXT-X-TARGETDURATION:" << (longestFragment / 1000) + 1 << "\r\n" "#EXT-X-MEDIA-SEQUENCE:0\r\n"; //"#EXT-X-PLAYLIST-TYPE:VOD\r\n"; int lastDuration = 0;