DTSC Pull optimizes and quick-negotiate.

This commit is contained in:
Erik Zandvliet 2016-05-10 14:12:58 +02:00 committed by Thulinma
parent e8eb3a36ee
commit a5a9facc22
16 changed files with 159 additions and 211 deletions

View file

@ -32,10 +32,6 @@
#define COUNTABLE_BYTES 128*1024
#ifndef STATS_DELAY
#define STATS_DELAY 15
#endif
std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; ///< list of sessions that have statistics data available
std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
@ -595,7 +591,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
sessions[idx].update(id, tmpEx);
//check validity of stats data
char counter = (*(data - 1));
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
if (counter == 126 || counter == 127){
//the data is no longer valid - connection has gone away, store for later
sessions[idx].finish(id);
connToSession.erase(id);

View file

@ -215,17 +215,19 @@ namespace Mist {
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str());
long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout
while ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT && config->is_active) { //15 second timeout
userPage.parseEach(callbackWrapper);
removeUnused();
if (userPage.amount) {
activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
if (userPage.connectedUsers) {
if (myMeta.tracks.size()){
activityCounter = Util::bootSecs();
}
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.connectedUsers);
} else {
DEBUG_MSG(DLVL_INSANE, "Timer running");
}
/*LTS-START*/
if ((Util::bootSecs() - activityCounter) >= 10 || !config->is_active){//10 second timeout
if ((Util::bootSecs() - activityCounter) >= INPUT_TIMEOUT || !config->is_active){//15 second timeout
if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){
@ -251,16 +253,19 @@ namespace Mist {
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!pullLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str());
pullLock.close();
return;
}
if (Util::streamAlive(streamName)){
pullLock.post();
pullLock.close();
pullLock.unlink();
return;
}
if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer
pullLock.post();
pullLock.close();
pullLock.unlink();
return;
}
@ -283,8 +288,10 @@ namespace Mist {
finish();
pullLock.post();
pullLock.close();
pullLock.unlink();
return;
}
nProxy.userClient.countAsViewer = false;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
it->second.firstms = 0;
@ -294,44 +301,20 @@ namespace Mist {
getNext();
unsigned long long lastTime = Util::getMS();
unsigned long long lastActive = Util::getMS();
while (thisPacket && config->is_active){
while (thisPacket && config->is_active && nProxy.userClient.isAlive()){
nProxy.bufferLivePacket(thisPacket, myMeta);
getNext();
nProxy.userClient.keepAlive();
if (Util::getMS() - lastTime >= 1000){
lastTime = Util::getMS();
if (nProxy.userClient.isSingleEntry()){
if (lastTime - lastActive >= 10000){//10sec timeout
config->is_active = false;
}
}else{
lastActive = lastTime;
}
}
}
closeStreamSource();
while (config->is_active){
Util::sleep(500);
nProxy.userClient.keepAlive();
if (Util::getMS() - lastTime >= 1000){
lastTime = Util::getMS();
if (nProxy.userClient.isSingleEntry()){
if (lastTime - lastActive >= 10000){//10sec timeout
config->is_active = false;
}
}else{
lastActive = lastTime;
}
}
}
nProxy.userClient.finish();
finish();
pullLock.post();
pullLock.close();
pullLock.unlink();
DEBUG_MSG(DLVL_DEVEL, "Pull input for stream %s closing clean", streamName.c_str());
return;
}

View file

@ -530,7 +530,7 @@ namespace Mist {
//If the current value indicates a valid trackid, and it is pushed from this user
if (pushLocation[value] == data) {
//Check for timeouts, and erase the track if necessary
if (counter == 126 || counter == 127 || counter == 254 || counter == 255) {
if (counter == 126 || counter == 127){
pushLocation.erase(value);
if (negotiatingTracks.count(value)) {
negotiatingTracks.erase(value);
@ -594,11 +594,9 @@ namespace Mist {
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap);
nProxy.metaPages[finalMap].init(firstPage, 8192, false);
INFO_MSG("Meh %d", finalMap);
//Update the metadata for this track
updateTrackMeta(finalMap);
INFO_MSG("Setting hasPush to true, quickNegotiate");
hasPush = true;
}
//Write the final mapped track number and keyframe number to the user page element

View file

@ -621,10 +621,8 @@ namespace Mist {
if (!trackState.count(tid)) {
memset(tmp + offset, 0, 4);
if (quickNegotiate){
unsigned long finalTid = getpid() + tid;
unsigned long finalTid = tid;
unsigned short firstPage = 1;
INFO_MSG("HANDLING quick negotiation for track %d ~> %d", tid, finalTid)
MEDIUM_MSG("Buffer has indicated that incoming track %lu should start writing on track %lu, page %lu", tid, finalTid, firstPage);
trackMap[tid] = finalTid;
if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){

View file

@ -184,6 +184,16 @@ namespace Mist {
return myConn.getBinHost();
}
bool Output::isReadyForPlay() {
if (myMeta.tracks.size()){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.keys.size() >= 2){
return true;
}
}
}
return false;
}
/// Connects or reconnects to the stream.
/// Assumes streamName class member has been set already.
/// Will start input if not currently active, calls onFail() if this does not succeed.
@ -215,27 +225,15 @@ namespace Mist {
return;
}
updateMeta();
if (myMeta.live && needsPlayableKeys()){
bool waitALittleLonger = true;
if (myMeta.live && !isReadyForPlay()){
unsigned int maxWaits = 15;
while (waitALittleLonger){
waitALittleLonger = true;
if (myMeta.tracks.size()){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.keys.size() >= needsPlayableKeys()){
waitALittleLonger = false;
break;
}
}
}
if (waitALittleLonger){
Util::sleep(1000);
if (--maxWaits == 0){
FAIL_MSG("Giving up waiting for playable tracks");
waitALittleLonger = false;
}
updateMeta();
while (!isReadyForPlay()){
Util::sleep(1000);
if (--maxWaits == 0){
FAIL_MSG("Giving up waiting for playable tracks");
break;
}
updateMeta();
}
}
}
@ -435,6 +433,7 @@ namespace Mist {
return;
}
DEBUG_MSG(DLVL_VERYHIGH, "Loading track %lu, containing key %lld", trackId, keyNum);
INFO_MSG("Loading track %lu, containing key %lld", trackId, keyNum);
unsigned int timeout = 0;
unsigned long pageNum = pageNumForKey(trackId, keyNum);
while (pageNum == -1){
@ -482,6 +481,7 @@ namespace Mist {
return;
}
currKeyOpen[trackId] = pageNum;
INFO_MSG("page %s loaded", id);
}
/// Prepares all tracks from selectedTracks for seeking to the specified ms position.
@ -507,7 +507,14 @@ namespace Mist {
INFO_MSG("Aborting seek to %llums in track %u: past end of track.", pos, tid);
return false;
}
loadPageForKey(tid, getKeyForTime(tid, pos) + (getNextKey?1:0));
unsigned int keyNum = getKeyForTime(tid, pos);
if (myMeta.tracks[tid].getKey(keyNum).getTime() > pos){
if (myMeta.live){
INFO_MSG("Actually seeking to %d, for %d is not available anymore", myMeta.tracks[tid].getKey(keyNum).getTime(), pos);
pos = myMeta.tracks[tid].getKey(keyNum).getTime();
}
}
loadPageForKey(tid, keyNum + (getNextKey?1:0));
if (!nProxy.curPage.count(tid) || !nProxy.curPage[tid].mapped){
INFO_MSG("Aborting seek to %llums in track %u: not available.", pos, tid);
return false;
@ -524,6 +531,7 @@ namespace Mist {
tmpPack.reInit(mpd + tmp.offset, 0, true);
tmp.time = tmpPack.getTime();
}
INFO_MSG("Found time %d", tmp.time);
if (tmpPack){
buffer.insert(tmp);
return true;
@ -1022,9 +1030,26 @@ namespace Mist {
if (thisPacket.getTime() != nxt.time && nxt.time){
WARN_MSG("Loaded track %ld@%llu instead of %ld@%llu", thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time);
}
if ((myMeta.tracks[nxt.tid].type == "video" && thisPacket.getFlag("keyframe")) || (++nonVideoCount % 30 == 0)){
bool isVideoTrack = (myMeta.tracks[nxt.tid].type == "video");
if ((isVideoTrack && thisPacket.getFlag("keyframe")) || (!isVideoTrack && (++nonVideoCount % 30 == 0))){
if (myMeta.live){
updateMeta();
if (myMeta.tracks[nxt.tid].type == "video"){
//Check whether returned keyframe is correct. If not, wait for approximately 5 seconds while checking.
//Failure here will cause tracks to drop due to inconsistent internal state.
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
int counter = 0;
while(counter < 10 && myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime() != thisPacket.getTime()){
if (counter++){
//Only sleep 500ms if this is not the first updatemeta try
Util::sleep(500);
}
updateMeta();
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
}
}else{
//On non-video tracks, just update metadata and assume everything else is correct
updateMeta();
}
}
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
DEBUG_MSG(DLVL_VERYHIGH, "Track %u @ %llums = key %lu", nxt.tid, thisPacket.getTime(), nxtKeyNum[nxt.tid]);
@ -1081,6 +1106,12 @@ namespace Mist {
if (nProxy.curPage[nxt.tid]){
if (nxt.offset < nProxy.curPage[nxt.tid].len){
unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
int ctr = 0;
//sleep at most half a second for new data.
while (!nextTime && ++ctr < 5){
Util::sleep(1000);
nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
}
if (nextTime){
nxt.time = nextTime;
}else{
@ -1100,7 +1131,7 @@ namespace Mist {
unsigned long long int now = Util::epoch();
if (now != lastStats){
/*LTS-START*/
if (statsPage.getData()[-1] > 127){
if (!statsPage.isAlive()){
myConn.close();
return;
}
@ -1136,7 +1167,7 @@ namespace Mist {
return;
}
}
if (nProxy.userClient.getData()[-1] > 127){
if (!nProxy.userClient.isAlive()){
myConn.close();
return;
}

View file

@ -64,7 +64,7 @@ namespace Mist {
void selectDefaultTracks();
bool connectToFile(std::string file);
static bool listenMode(){return true;}
virtual unsigned int needsPlayableKeys(){return 2;}
virtual bool isReadyForPlay();
//virtuals. The optional virtuals have default implementations that do as little as possible.
virtual void sendNext() {}//REQUIRED! Others are optional.
virtual void prepareNext();

View file

@ -29,7 +29,6 @@ namespace Mist {
myConn.SendNow(sSize, 4);
prep.sendTo(myConn);
pushing = false;
fastAsPossibleTime = 0;
}
OutDTSC::~OutDTSC() {}
@ -45,29 +44,6 @@ namespace Mist {
}
void OutDTSC::sendNext(){
if (!realTime && thisPacket.getTime() >= fastAsPossibleTime){
realTime = 1000;
}
if (thisPacket.getFlag("keyframe")){
std::set<unsigned long> availableTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.type == "video" || it->second.type == "audio"){
availableTracks.insert(it->first);
}
}
if (availableTracks != selectedTracks){
//reset, resendheader
JSON::Value prep;
prep["cmd"] = "reset";
/// \todo Make this securererer.
unsigned long sendSize = prep.packedSize();
myConn.SendNow("DTCM");
char sSize[4] = {0, 0, 0, 0};
Bit::htobl(sSize, prep.packedSize());
myConn.SendNow(sSize, 4);
prep.sendTo(myConn);
}
}
myConn.SendNow(thisPacket.getData(), thisPacket.getDataLen());
}
@ -81,15 +57,9 @@ namespace Mist {
}
myMeta.send(myConn, true, selectedTracks);
if (myMeta.live){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!fastAsPossibleTime || it->second.lastms < fastAsPossibleTime){
fastAsPossibleTime = it->second.lastms;
realTime = 0;
}
}
}else{
realTime = 1000;
realTime = 0;
}
seek(0);
}
void OutDTSC::onRequest(){

View file

@ -3,30 +3,20 @@
#include <unistd.h>
namespace Mist {
bool OutHLS::isReadyForPlay() {
if (myMeta.tracks.size()){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.fragments.size() >= 3){
return true;
}
}
}
return false;
}
///\brief Builds an index file for HTTP Live streaming.
///\return The index file for HTTP Live Streaming.
std::string OutHLS::liveIndex() {
static int timer = 0;
bool checkWait = true;
while (checkWait && ++timer < 10){
checkWait = false;
if (!myMeta.tracks.size()){
checkWait = true;
}
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.keys.size() <= 3){
checkWait = true;
break;
}
}
if (checkWait){
Util::sleep(500);
INFO_MSG("SLeeping timer %d", timer);
updateMeta();
}
}
std::stringstream result;
result << "#EXTM3U\r\n";
int audioId = -1;

View file

@ -9,6 +9,7 @@ namespace Mist {
static void init(Util::Config * cfg);
void sendTS(const char * tsData, unsigned int len=188);
void onHTTP();
bool isReadyForPlay();
protected:
std::string liveIndex();
std::string liveIndex(int tid, std::string & sessId);

View file

@ -9,6 +9,7 @@ namespace Mist {
OutProgressiveMP4::OutProgressiveMP4(Socket::Connection & conn) : HTTPOutput(conn) {
completeKeysOnly = false;
}
OutProgressiveMP4::~OutProgressiveMP4() {}
void OutProgressiveMP4::init(Util::Config * cfg) {
@ -747,25 +748,6 @@ namespace Mist {
void OutProgressiveMP4::setvidTrack() {
vidTrack = 0;
static int timer = 0;
bool checkWait = true;
while (checkWait && ++timer < 10){
checkWait = false;
if (!myMeta.tracks.size()){
checkWait = true;
}
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!it->second.keys.size()){
checkWait = true;
break;
}
}
if (checkWait){
Util::sleep(500);
updateMeta();
}
}
if (!selectedTracks.size()){
selectDefaultTracks();
}

View file

@ -153,12 +153,18 @@ namespace Mist {
return !(config->getString("target").size());
}
unsigned int OutRTMP::needsPlayableKeys(){
bool OutRTMP::isReadyForPlay(){
if (isPushing){
return 0;
}else{
return 2;
return true;
}
if (myMeta.tracks.size()){
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.keys.size() >= 2){
return true;
}
}
}
return false;
}
void OutRTMP::parseVars(std::string data){

View file

@ -14,7 +14,7 @@ namespace Mist {
void onRequest();
void sendNext();
void sendHeader();
unsigned int needsPlayableKeys();
bool isReadyForPlay();
static bool listenMode();
protected:
bool isPushing;