diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index e566d8dc..ba4e656b 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -905,7 +905,7 @@ namespace IPC { } ///\brief Parse each of the possible payload pieces, and runs a callback on it if in use. - void sharedServer::parseEach(void (*callback)(char * data, size_t len, unsigned int id)) { + void sharedServer::parseEach(void (*activeCallback)(char * data, size_t len, unsigned int id), void (*disconCallback)(char * data, size_t len, unsigned int id)) { char * empty = 0; if (!hasCounter) { empty = (char *)malloc(payLen * sizeof(char)); @@ -944,7 +944,7 @@ namespace IPC { WARN_MSG("process disappeared, timing out. (pid %lu)", tmpPID); *counter = 125 | (0x80 & (*counter)); //if process is already dead, instant timeout. } - callback(it->mapped + offset + 1, payLen, id); + activeCallback(it->mapped + offset + 1, payLen, id); switch (countNum) { case 127: HIGH_MSG("Client %u requested disconnect", id); @@ -982,6 +982,9 @@ namespace IPC { break; } if (countNum == 127 || countNum == 126){ + if (disconCallback){ + disconCallback(it->mapped + offset + 1, payLen, id); + } memset(it->mapped + offset + 1, 0, payLen); it->mapped[offset] = 0; } else { @@ -1008,7 +1011,7 @@ namespace IPC { amount = id + 1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } - callback(it->mapped + offset, payLen, id); + activeCallback(it->mapped + offset, payLen, id); } else { //stop if we're past the amount counted and we're empty if (id >= amount) { diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 21d0ed39..1b665b78 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -181,7 +181,7 @@ namespace IPC { sharedServer(std::string name, int len, bool withCounter = false); void init(std::string name, int len, bool withCounter = false); ~sharedServer(); - void parseEach(void (*callback)(char * data, size_t len, unsigned int id)); + void parseEach(void (*activeCallback)(char * data, size_t len, unsigned int id), void (*disconCallback)(char * data, size_t len, unsigned int id) = 0); char * getIndex(unsigned int id); operator bool() const; ///\brief The amount of connected clients diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index d486f64d..18b5bc75 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -739,14 +739,18 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ sessions.erase(connToSession[id]); } } + if (!connToSession.count(id)){ + INSANE_MSG("New connection: %lu as %s", id, idx.toStr().c_str()); + } //store the index for later comparison connToSession[id] = idx; //update the session with the latest data sessions[idx].update(id, tmpEx); //check validity of stats data - char counter = (*(data - 1)); + char counter = (*(data - 1)) & 0x7F; if (counter == 126 || counter == 127){ //the data is no longer valid - connection has gone away, store for later + INSANE_MSG("Ended connection: %lu as %s", id, idx.toStr().c_str()); sessions[idx].finish(id); connToSession.erase(id); }else{ diff --git a/src/input/input.cpp b/src/input/input.cpp index 9d24a946..6f85d333 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -521,16 +521,20 @@ namespace Mist { getNext(); } uint64_t lastBuffered = 0; + uint64_t packCounter = 0; + uint64_t byteCounter = 0; while (thisPacket && thisPacket.getTime() < stopTime) { if (thisPacket.getTime() >= lastBuffered){ bufferNext(thisPacket); + ++packCounter; + byteCounter += thisPacket.getDataLen(); lastBuffered = thisPacket.getTime(); } getNext(); } bufferFinalize(track); bufferTimer = Util::getMS() - bufferTimer; - DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d for track %d in %llums", keyNum, track, bufferTimer); + DEBUG_MSG(DLVL_DEVEL, "Done buffering page %d (%llu packets, %llu bytes) for track %d in %llums", keyNum, packCounter, byteCounter, track, bufferTimer); pageCounter[track][keyNum] = 15; return true; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 9dcd752a..63251760 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -671,11 +671,12 @@ namespace Mist { }else{ VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset); unsigned int i = 0; - while (nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i < 42){ - Util::wait(100); + while (nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ + Util::wait(100*i); + stats(); } if (nProxy.curPage[tid].mapped[tmp.offset] == 0){ - FAIL_MSG("Track %d no data (key %u) - timeout", tid, getKeyForTime(tid, pos) + (getNextKey?1:0)); + FAIL_MSG("Track %d no data (key %u@%llu) - timeout", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset); }else{ return seek(tid, pos, getNextKey); } diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index 116e1f53..7ccd7f73 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -433,8 +433,8 @@ namespace Mist { H.StartResponse(H, myConn, VLCworkaround); //we assume whole fragments - but timestamps may be altered at will uint32_t fragIndice = Trk.timeToFragnum(from); - contCounters[0] = Trk.missedFrags + fragIndice; //PAT continuity counter - contCounters[4096] = Trk.missedFrags + fragIndice; //PMT continuity counter + contPAT = Trk.missedFrags + fragIndice; //PAT continuity counter + contPMT = Trk.missedFrags + fragIndice; //PMT continuity counter packCounter = 0; parseData = true; wantRequest = false; diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index 6f2890ab..d5ab7efa 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -65,7 +65,7 @@ namespace Mist { H.clearHeader("Range"); H.clearHeader("Icy-MetaData"); H.clearHeader("User-Agent"); - H.SetHeader("Content-Type", "video/mp2t"); + H.SetHeader("Content-Type", "video/mpeg"); H.setCORSHeaders(); if(method == "OPTIONS" || method == "HEAD"){ H.SendResponse("200", "OK", myConn); diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 1c1b08fa..54788047 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -12,73 +12,90 @@ namespace Mist { appleCompat=false; } - void TSOutput::fillPacket(const char * data, const size_t dataLen){ - - if (!packData.getBytesFree()){ - ///\todo only resend the PAT/PMT for HLS - if ( (sendRepeatingHeaders && packCounter % 42 == 0) || !packCounter){ - TS::Packet tmpPack; - tmpPack.FromPointer(TS::PAT); - tmpPack.setContinuityCounter(++contCounters[0]); - sendTS(tmpPack.checkAndGetBuffer()); - sendTS(TS::createPMT(selectedTracks, myMeta, ++contCounters[4096])); - packCounter += 2; - } - sendTS(packData.checkAndGetBuffer()); - packCounter ++; - packData.clear(); - } - - if (!dataLen){return;} - - if (packData.getBytesFree() == 184){ - packData.clear(); - packData.setPID(255 + thisPacket.getTrackId()); - packData.setContinuityCounter(++contCounters[packData.getPID()]); - if (first[thisPacket.getTrackId()]){ - packData.setUnitStart(1); - packData.setDiscontinuity(true); - if (myMeta.tracks[thisPacket.getTrackId()].type == "video"){ - if (thisPacket.getInt("keyframe")){ - packData.setRandomAccess(true); - packData.setESPriority(true); - } - packData.setPCR(thisPacket.getTime() * 27000); + void TSOutput::fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg){ + do { + if (!packData.getBytesFree()){ + if ( (sendRepeatingHeaders && packCounter % 42 == 0) || !packCounter){ + TS::Packet tmpPack; + tmpPack.FromPointer(TS::PAT); + tmpPack.setContinuityCounter(++contPAT); + sendTS(tmpPack.checkAndGetBuffer()); + sendTS(TS::createPMT(selectedTracks, myMeta, ++contPMT)); + packCounter += 2; } - first[thisPacket.getTrackId()] = false; + sendTS(packData.checkAndGetBuffer()); + packCounter ++; + packData.clear(); } - } - - int tmp = packData.fillFree(data, dataLen); - if (tmp != dataLen){ - return fillPacket(data+tmp, dataLen-tmp); - } + + if (!dataLen){return;} + + if (packData.getBytesFree() == 184){ + packData.clear(); + packData.setPID(pkgPid); + packData.setContinuityCounter(++contPkg); + if (firstPack){ + packData.setUnitStart(1); + packData.setDiscontinuity(true); + if (video){ + if (keyframe){ + packData.setRandomAccess(true); + packData.setESPriority(true); + } + packData.setPCR(thisPacket.getTime() * 27000); + } + firstPack = false; + } + } + + int tmp = packData.fillFree(data, dataLen); + data += tmp; + dataLen -= tmp; + } while(dataLen); } void TSOutput::sendNext(){ - first[thisPacket.getTrackId()] = true; + //Get ready some data to speed up accesses + uint32_t trackId = thisPacket.getTrackId(); + DTSC::Track & Trk = myMeta.tracks[trackId]; + bool & firstPack = first[trackId]; + uint32_t pkgPid = 255 + trackId; + int & contPkg = contCounters[pkgPid]; + uint64_t packTime = thisPacket.getTime(); + bool video = (Trk.type == "video"); + bool keyframe = thisPacket.getInt("keyframe"); + firstPack = true; + char * dataPointer = 0; unsigned int dataLen = 0; thisPacket.getString("data", dataPointer, dataLen); //data - if (thisPacket.getTime() >= until){ //this if should only trigger for HLS + if (packTime >= until){ //this if should only trigger for HLS stop(); wantRequest = true; parseData = false; sendTS("",0); return; } + //apple compatibility timestamp correction + if (appleCompat){ + packTime -= ts_from; + if (Trk.type == "audio"){ + packTime = 0; + } + } + packTime *= 90; std::string bs; //prepare bufferstring - if (myMeta.tracks[thisPacket.getTrackId()].type == "video"){ + if (video){ unsigned int extraSize = 0; //dataPointer[4] & 0x1f is used to check if this should be done later: fillPacket("\000\000\000\001\011\360", 6); - if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ + if (Trk.codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ extraSize += 6; } - if (thisPacket.getInt("keyframe")){ - if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264"){ + if (keyframe){ + if (Trk.codec == "H264"){ if (!haveAvcc){ - avccbox.setPayload(myMeta.tracks[thisPacket.getTrackId()].init); + avccbox.setPayload(Trk.init); haveAvcc = true; } bs = avccbox.asAnnexB(); @@ -102,23 +119,22 @@ namespace Mist { unsigned int ThisNaluSize = 0; unsigned int i = 0; unsigned int nalLead = 0; - + uint64_t offset = thisPacket.getInt("offset") * 90; + while (currPack <= splitCount){ unsigned int alreadySent = 0; - long long unsigned int tempTime = thisPacket.getTime(); - if (appleCompat){tempTime -= ts_from;} - bs = TS::Packet::getPESVideoLeadIn((currPack != splitCount ? watKunnenWeIn1Ding : dataLen+extraSize - currPack*watKunnenWeIn1Ding), tempTime * 90, thisPacket.getInt("offset") * 90, !currPack); - fillPacket(bs.data(), bs.size()); + bs = TS::Packet::getPESVideoLeadIn((currPack != splitCount ? watKunnenWeIn1Ding : dataLen+extraSize - currPack*watKunnenWeIn1Ding), packTime, offset, !currPack); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); if (!currPack){ - if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ + if (Trk.codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ //End of previous nal unit, if not already present - fillPacket("\000\000\000\001\011\360", 6); + fillPacket("\000\000\000\001\011\360", 6, firstPack, video, keyframe, pkgPid, contPkg); alreadySent += 6; } - if (thisPacket.getInt("keyframe")){ - if (myMeta.tracks[thisPacket.getTrackId()].codec == "H264"){ + if (keyframe){ + if (Trk.codec == "H264"){ bs = avccbox.asAnnexB(); - fillPacket(bs.data(), bs.size()); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); alreadySent += bs.size(); } /*LTS-START*/ @@ -132,7 +148,7 @@ namespace Mist { } while (i + 4 < (unsigned int)dataLen){ if (nalLead){ - fillPacket("\000\000\000\001"+4-nalLead,nalLead); + fillPacket("\000\000\000\001"+4-nalLead,nalLead, firstPack, video, keyframe, pkgPid, contPkg); i += nalLead; alreadySent += nalLead; nalLead = 0; @@ -145,57 +161,51 @@ namespace Mist { } if (alreadySent + 4 > watKunnenWeIn1Ding){ nalLead = 4 - (watKunnenWeIn1Ding-alreadySent); - fillPacket("\000\000\000\001",watKunnenWeIn1Ding-alreadySent); + fillPacket("\000\000\000\001",watKunnenWeIn1Ding-alreadySent, firstPack, video, keyframe, pkgPid, contPkg); i += watKunnenWeIn1Ding-alreadySent; alreadySent += watKunnenWeIn1Ding-alreadySent; }else{ - fillPacket("\000\000\000\001",4); + fillPacket("\000\000\000\001",4, firstPack, video, keyframe, pkgPid, contPkg); alreadySent += 4; i += 4; } } if (alreadySent + ThisNaluSize > watKunnenWeIn1Ding){ - fillPacket(dataPointer+i,watKunnenWeIn1Ding-alreadySent); + fillPacket(dataPointer+i,watKunnenWeIn1Ding-alreadySent, firstPack, video, keyframe, pkgPid, contPkg); i += watKunnenWeIn1Ding-alreadySent; ThisNaluSize -= watKunnenWeIn1Ding-alreadySent; alreadySent += watKunnenWeIn1Ding-alreadySent; }else{ - fillPacket(dataPointer+i,ThisNaluSize); + fillPacket(dataPointer+i,ThisNaluSize, firstPack, video, keyframe, pkgPid, contPkg); alreadySent += ThisNaluSize; i += ThisNaluSize; ThisNaluSize = 0; } if (alreadySent == watKunnenWeIn1Ding){ packData.addStuffing(); - fillPacket(0, 0); - first[thisPacket.getTrackId()] = true; + fillPacket(0, 0, firstPack, video, keyframe, pkgPid, contPkg); + firstPack = true; break; } } currPack++; } - }else if (myMeta.tracks[thisPacket.getTrackId()].type == "audio"){ + }else if (Trk.type == "audio"){ long unsigned int tempLen = dataLen; - if ( myMeta.tracks[thisPacket.getTrackId()].codec == "AAC"){ + if (Trk.codec == "AAC"){ tempLen += 7; } - long long unsigned int tempTime; - if (appleCompat){ - tempTime = 0;// myMeta.tracks[thisPacket.getTrackId()].rate / 1000; - }else{ - tempTime = thisPacket.getTime() * 90; + bs = TS::Packet::getPESAudioLeadIn(tempLen, packTime);// myMeta.tracks[thisPacket.getTrackId()].rate / 1000 ); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); + if (Trk.codec == "AAC"){ + bs = TS::getAudioHeader(dataLen, Trk.init); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); } - bs = TS::Packet::getPESAudioLeadIn(tempLen, tempTime);// myMeta.tracks[thisPacket.getTrackId()].rate / 1000 ); - fillPacket(bs.data(), bs.size()); - if (myMeta.tracks[thisPacket.getTrackId()].codec == "AAC"){ - bs = TS::getAudioHeader(dataLen, myMeta.tracks[thisPacket.getTrackId()].init); - fillPacket(bs.data(), bs.size()); - } - fillPacket(dataPointer,dataLen); + fillPacket(dataPointer,dataLen, firstPack, video, keyframe, pkgPid, contPkg); } if (packData.getBytesFree() < 184){ packData.addStuffing(); - fillPacket(0, 0); + fillPacket(0, 0, firstPack, video, keyframe, pkgPid, contPkg); } } } diff --git a/src/output/output_ts_base.h b/src/output/output_ts_base.h index 1aa36c2f..906f36cc 100644 --- a/src/output/output_ts_base.h +++ b/src/output/output_ts_base.h @@ -16,10 +16,12 @@ namespace Mist { virtual ~TSOutput(){}; void sendNext(); virtual void sendTS(const char * tsData, unsigned int len=188){}; - void fillPacket(const char * data, const size_t dataLen); + void fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg); protected: std::map first; std::map contCounters; + int contPAT; + int contPMT; unsigned int packCounter; ///\todo update constructors? TS::Packet packData; bool haveAvcc;