From 391daaaa71fa8780962e32173c2b1da9dc898496 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet Date: Wed, 11 Jun 2014 16:46:59 +0200 Subject: [PATCH] Various fixes for live, pushing as well as output --- src/input/input_buffer.cpp | 17 +++++++++++++++++ src/output/output.cpp | 24 ++++++++++++++++++------ src/output/output_hds.cpp | 2 +- src/output/output_hss.cpp | 18 +++++++++--------- 4 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index e6d91684..16cde760 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -1,3 +1,6 @@ +#include +#include + #include #include #include @@ -53,7 +56,10 @@ namespace Mist { myMeta.vod = false; myMeta.live = true; myMeta.writeTo(metaPage.mapped); + IPC::semaphore liveMeta(std::string("liveMeta@" + config->getString("streamname")).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + liveMeta.wait(); memset(metaPage.mapped+myMeta.getSendLen(), 0, metaPage.len > myMeta.getSendLen() ? std::min(metaPage.len-myMeta.getSendLen(), 4ll) : 0); + liveMeta.post(); } bool inputBuffer::removeKey(unsigned int tid){ @@ -134,6 +140,10 @@ namespace Mist { //Skip value 0xFFFFFFFF as this indicates a previously declined track continue; } + if (value == 0){ + //Skip value 0 as this indicates an empty track + continue; + } if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ if (negotiateTracks.count(value)){ negotiateTracks.erase(value); @@ -141,6 +151,9 @@ namespace Mist { } if (givenTracks.count(value)){ givenTracks.erase(value); + indexPages.erase(value); + dataPages.erase(value); + inputLoc.erase(value); } continue; } @@ -247,6 +260,7 @@ namespace Mist { char nextPageName[100]; sprintf(nextPageName, "%s%lu_%d", config->getString("streamname").c_str(), value, nextPage); dataPages[value][nextPage].init(nextPageName, 20971520, true); + DEVEL_MSG("Created page %s", nextPageName); bool createdNew = false; for (int i = 0; i < 8192; i += 8){ unsigned int thisKeyNum = ((((long long int *)(indexPages[value].mapped + i))[0]) >> 32) & 0xFFFFFFFF; @@ -269,6 +283,9 @@ namespace Mist { void inputBuffer::updateMetaFromPage(int tNum, int pageNum){ DTSC::Packet tmpPack; tmpPack.reInit(dataPages[tNum][pageNum].mapped + inputLoc[tNum][pageNum].curOffset, 0); + if (!tmpPack && inputLoc[tNum][pageNum].curOffset == 0){ + return; + } while (tmpPack) { myMeta.update(tmpPack); if (inputLoc[tNum][pageNum].firstTime == 0){ diff --git a/src/output/output.cpp b/src/output/output.cpp index 9b89f232..70c517f2 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -62,6 +62,11 @@ namespace Mist { void Output::updateMeta(){ //read metadata from page to myMeta variable + IPC::semaphore liveMeta(std::string("liveMeta@" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + bool lock = myMeta.live; + if (lock){ + liveMeta.wait(); + } if (streamIndex.mapped){ DTSC::Packet tmpMeta(streamIndex.mapped, streamIndex.len, true); if (tmpMeta.getVersion()){ @@ -69,6 +74,9 @@ namespace Mist { myMeta.reinit(tmpMeta); } } + if (lock){ + liveMeta.post(); + } } /// Called when stream initialization has failed. @@ -640,13 +648,17 @@ namespace Mist { } } for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < 5; it++){ + int tId = *it; + if (trackMap.count(tId)){ + tId = trackMap[tId]; + } char thisData[6]; - thisData[0] = ((*it >> 24) & 0xFF); - thisData[1] = ((*it >> 16) & 0xFF); - thisData[2] = ((*it >> 8) & 0xFF); - thisData[3] = ((*it) & 0xFF); - thisData[4] = ((nxtKeyNum[*it] >> 8) & 0xFF); - thisData[5] = ((nxtKeyNum[*it]) & 0xFF); + thisData[0] = ((tId >> 24) & 0xFF); + thisData[1] = ((tId >> 16) & 0xFF); + thisData[2] = ((tId >> 8) & 0xFF); + thisData[3] = ((tId) & 0xFF); + thisData[4] = ((nxtKeyNum[tId] >> 8) & 0xFF); + thisData[5] = ((nxtKeyNum[tId]) & 0xFF); memcpy(playerConn.getData() + (6 * tNum), thisData, 6); tNum ++; playerConn.keepAlive(); diff --git a/src/output/output_hds.cpp b/src/output/output_hds.cpp index b06deb74..77b88ad1 100644 --- a/src/output/output_hds.cpp +++ b/src/output/output_hds.cpp @@ -233,7 +233,7 @@ namespace Mist { //send a zero-size mdat, meaning it stretches until end of file. HTTP_S.Chunkify("\000\000\000\000mdat", 8, myConn); //send init data, if needed. - if (audioTrack > 0){ + if (audioTrack > 0 && myMeta.tracks[audioTrack].init != ""){ tag.DTSCAudioInit(myMeta.tracks[audioTrack]); tag.tagTime(mstime); HTTP_S.Chunkify(tag.data, tag.len, myConn); diff --git a/src/output/output_hss.cpp b/src/output/output_hss.cpp index 00cf0b5b..ac35c2b4 100644 --- a/src/output/output_hss.cpp +++ b/src/output/output_hss.cpp @@ -72,7 +72,7 @@ namespace Mist { void OutHSS::sendNext() { if (currentPacket.getTime() >= playUntil) { - DEBUG_MSG(DLVL_DEVEL, "(%d) Done sending fragment %d:%d", getpid(), myTrackStor, myKeyStor); + DEBUG_MSG(DLVL_HIGH, "(%d) Done sending fragment %d:%d", getpid(), myTrackStor, myKeyStor); stop(); wantRequest = true; HTTP_S.Chunkify("", 0, myConn); @@ -95,19 +95,19 @@ namespace Mist { int OutHSS::canSeekms(unsigned int ms) { //no tracks? Frame too new by definition. if (!myMeta.tracks.size()) { - DEBUG_MSG(DLVL_DEVEL, "HSS Canseek to %d returns 1 because no tracks", ms); + DEBUG_MSG(DLVL_DONTEVEN, "HSS Canseek to %d returns 1 because no tracks", ms); return 1; } //loop trough all selected tracks for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { //return "too late" if one track is past this point if (ms < myMeta.tracks[*it].firstms) { - DEBUG_MSG(DLVL_DEVEL, "HSS Canseek to %d returns -1 because track %lu firstms == %llu", ms, *it, myMeta.tracks[*it].firstms); + DEBUG_MSG(DLVL_DONTEVEN, "HSS Canseek to %d returns -1 because track %lu firstms == %llu", ms, *it, myMeta.tracks[*it].firstms); return -1; } //return "too early" if one track is not yet at this point if (ms > myMeta.tracks[*it].lastms) { - DEBUG_MSG(DLVL_DEVEL, "HSS Canseek to %d returns 1 because track %lu lastms == %llu", ms, *it, myMeta.tracks[*it].lastms); + DEBUG_MSG(DLVL_DONTEVEN, "HSS Canseek to %d returns 1 because track %lu lastms == %llu", ms, *it, myMeta.tracks[*it].lastms); return 1; } } @@ -160,10 +160,10 @@ namespace Mist { return; } } - DEBUG_MSG(DLVL_DEVEL, "(%d) Seeking to time %lld on track %d", getpid(), seekTime, tid); + DEBUG_MSG(DLVL_HIGH, "(%d) Seeking to time %lld on track %d", getpid(), seekTime, tid); seek(seekTime); playUntil = (*(keyTimes[tid].upper_bound(seekTime))); - DEBUG_MSG(DLVL_DEVEL, "Set playUntil to %lld", playUntil); + DEBUG_MSG(DLVL_HIGH, "Set playUntil to %lld", playUntil); myTrackStor = tid; myKeyStor = seekTime; keysToSend = 1; @@ -269,7 +269,7 @@ namespace Mist { int fragCount = 0; for (unsigned int i = 0; fragCount < 2 && i < myMeta.tracks[tid].keys.size() - 1; i++) { if (myMeta.tracks[tid].keys[i].getTime() > seekTime) { - DEBUG_MSG(DLVL_DEVEL, "Key %d added to fragRef box, time %ld > %lld", i, myMeta.tracks[tid].keys[i].getTime(), seekTime); + DEBUG_MSG(DLVL_HIGH, "Key %d added to fragRef box, time %ld > %lld", i, myMeta.tracks[tid].keys[i].getTime(), seekTime); fragref_box.setTime(fragCount, myMeta.tracks[tid].keys[i].getTime() * 10000); fragref_box.setDuration(fragCount, myMeta.tracks[tid].keys[i].getLength() * 10000); fragref_box.setFragmentCount(++fragCount); @@ -296,7 +296,7 @@ namespace Mist { HTTP_S.Chunkify("mdat", 4, myConn); sentHeader = true; HTTP_R.Clean(); - DEBUG_MSG(DLVL_DEVEL, "(%d) Sent full header", getpid()); + DEBUG_MSG(DLVL_HIGH, "(%d) Sent full header", getpid()); } @@ -336,7 +336,7 @@ namespace Mist { } } } - DEBUG_MSG(DLVL_DEVEL, "Buffer window here %lld", myMeta.bufferWindow); + DEBUG_MSG(DLVL_DONTEVEN, "Buffer window here %lld", myMeta.bufferWindow); if (myMeta.vod) { Result << "Duration=\"" << (*videoIters.begin())->second.lastms << "0000\""; } else {