diff --git a/CMakeLists.txt b/CMakeLists.txt index 42f3d9fb..0aadfee6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -339,6 +339,7 @@ macro(makeInput inputName format) ) endmacro() +makeInput(HLS hls) makeInput(DTSC dtsc) makeInput(DTSCCrypt dtsccrypt) makeInput(MP3 mp3) diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index 2ddf1c7b..1cb1537c 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -8,48 +8,81 @@ namespace TS { - class ADTSRemainder{ - public: - char * data; - uint32_t max; - uint32_t now; - uint32_t len; - uint32_t bpos; - void setRemainder(const aac::adts & p, const void * source, const uint32_t avail, const uint32_t bPos){ - if (max < p.getCompleteSize()){ - void * newmainder = realloc(data, p.getCompleteSize()); - if (newmainder){ - max = p.getCompleteSize(); - data = (char*)newmainder; - } - } - if (max >= p.getCompleteSize()){ - len = p.getCompleteSize(); - now = avail; - bpos = bPos; - memcpy(data, source, now); - } - } - ADTSRemainder(){ - data = 0; - max = 0; - now = 0; - len = 0; - bpos = 0; - } - ~ADTSRemainder(){ - if (data){ - free(data); - data = 0; - } - } + void ADTSRemainder::setRemainder(const aac::adts & p, const void * source, const uint32_t avail, const uint32_t bPos) { + if (!p.getCompleteSize()) { + return; + } - }; + if (max < p.getCompleteSize()) { + void * newmainder = realloc(data, p.getCompleteSize()); + if (newmainder) { + max = p.getCompleteSize(); + data = (char *)newmainder; + } + } + if (max >= p.getCompleteSize()) { + len = p.getCompleteSize(); + now = avail; + bpos = bPos; + memcpy(data, source, now); + } + } + + void ADTSRemainder::append(const char * p, uint32_t pLen) { + if (now + pLen > len) { + FAIL_MSG("Data to append does not fit into the remainder"); + return; + } + + memcpy(data + now, p, pLen); + now += pLen; + } + + bool ADTSRemainder::isComplete() { + return (len == now); + } + + void ADTSRemainder::clear() { + len = 0; + now = 0; + bpos = 0; + } - Stream::Stream(bool _threaded){ + ADTSRemainder::ADTSRemainder() { + data = 0; + max = 0; + now = 0; + len = 0; + bpos = 0; + } + ADTSRemainder::~ADTSRemainder() { + if (data) { + free(data); + data = 0; + } + } + + uint32_t ADTSRemainder::getLength() { + return len; + } + + uint32_t ADTSRemainder::getBpos() { + return bpos; + } + + + uint32_t ADTSRemainder::getTodo() { + return len - now; + } + char * ADTSRemainder::getData() { + return data; + } + + Stream::Stream(bool _threaded) { threaded = _threaded; - if (threaded){ + firstPacketFound = false; + if (threaded) { globalSem.open("MstTSInputLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!globalSem) { FAIL_MSG("Creating semaphore failed: %s", strerror(errno)); @@ -60,8 +93,8 @@ namespace TS { } } - Stream::~Stream(){ - if (threaded){ + Stream::~Stream() { + if (threaded) { globalSem.unlink(); } } @@ -72,18 +105,51 @@ namespace TS { parse(newPacket, bytePos); } - void Stream::clear(){ - if (threaded){ + void Stream::partialClear() { + if (threaded) { globalSem.wait(); } pesStreams.clear(); pesPositions.clear(); outPackets.clear(); - if (threaded){ + buildPacket.clear(); + partialBuffer.clear(); + if (threaded) { globalSem.post(); } } + void Stream::clear() { + partialClear(); + if (threaded) { + globalSem.wait(); + } + pidToCodec.clear(); + adtsInfo.clear(); + spsInfo.clear(); + ppsInfo.clear(); + hevcInfo.clear(); + metaInit.clear(); + descriptors.clear(); + mappingTable.clear(); + lastPMT.clear(); + lastPAT = 0; + pmtTracks.clear(); + remainders.clear(); + associationTable = ProgramAssociationTable(); + if (threaded) { + globalSem.post(); + } + } + + void Stream::finish(){ + if(!pesStreams.size()){return;} + + for(std::map >::const_iterator i = pesStreams.begin(); i != pesStreams.end();i++){ + parsePES(i->first,true); + } + } + void Stream::add(char * newPack, unsigned long long bytePos) { Packet newPacket; newPacket.FromPointer(newPack); @@ -91,110 +157,110 @@ namespace TS { } void Stream::add(Packet & newPack, unsigned long long bytePos) { - if (threaded){ + if (threaded) { globalSem.wait(); } int tid = newPack.getPID(); - if ((pidToCodec.count(tid) || tid == 0 || newPack.isPMT()) && (pesStreams[tid].size() || newPack.getUnitStart())){ + if ((pidToCodec.count(tid) || tid == 0 || newPack.isPMT()) && (pesStreams[tid].size() || newPack.getUnitStart())) { pesStreams[tid].push_back(newPack); pesPositions[tid].push_back(bytePos); } - if (threaded){ + if (threaded) { globalSem.post(); } } - bool Stream::isDataTrack(unsigned long tid){ - if (tid == 0){ + bool Stream::isDataTrack(unsigned long tid) { + if (tid == 0) { return false; } - if (threaded){ + if (threaded) { globalSem.wait(); } bool result = !pmtTracks.count(tid); - if (threaded){ + if (threaded) { globalSem.post(); } return result; } - + void Stream::parse(unsigned long tid) { - if (threaded){ + if (threaded) { globalSem.wait(); } - if (!pesStreams.count(tid) || pesStreams[tid].size() == 0){ - if (threaded){ + if (!pesStreams.count(tid) || pesStreams[tid].size() == 0) { + if (threaded) { globalSem.post(); } return; } std::deque & trackPackets = pesStreams[tid]; - if (threaded){ + if (threaded) { globalSem.post(); } - + //Handle PAT packets - if (tid == 0){ + if (tid == 0) { ///\todo Keep track of updates in PAT instead of keeping only the last PAT as a reference - - if (threaded){ + + if (threaded) { globalSem.wait(); } associationTable = trackPackets.back(); associationTable.parsePIDs(); lastPAT = Util::bootSecs(); - if (threaded){ + if (threaded) { globalSem.post(); } - int pmtCount = associationTable.getProgramCount(); - for (int i = 0; i < pmtCount; i++){ + for (int i = 0; i < pmtCount; i++) { pmtTracks.insert(associationTable.getProgramPID(i)); } - if (threaded){ + if (threaded) { globalSem.wait(); } pesStreams.erase(0); pesPositions.erase(0); - if (threaded){ + if (threaded) { globalSem.post(); } return; } + //Ignore conditional access packets. We don't care. - if (tid == 1){ + if (tid == 1) { return; } //Handle PMT packets - if (pmtTracks.count(tid)){ + if (pmtTracks.count(tid)) { ///\todo Keep track of updates in PMT instead of keeping only the last PMT per program as a reference - if (threaded){ + if (threaded) { globalSem.wait(); } mappingTable[tid] = trackPackets.back(); lastPMT[tid] = Util::bootSecs(); - if (threaded){ + if (threaded) { globalSem.post(); } ProgramMappingEntry entry = mappingTable[tid].getEntry(0); - while (entry){ + while (entry) { unsigned long pid = entry.getElementaryPid(); unsigned long sType = entry.getStreamType(); - switch(sType){ + switch (sType) { case H264: case AAC: case H265: case AC3: case ID3: pidToCodec[pid] = sType; - if (sType == ID3){ + if (sType == ID3) { metaInit[pid] = std::string(entry.getESInfo(), entry.getESInfoLength()); } break; @@ -204,19 +270,19 @@ namespace TS { entry.advance(); } - if (threaded){ + if (threaded) { globalSem.wait(); } pesStreams.erase(tid); pesPositions.erase(tid); - if (threaded){ + if (threaded) { globalSem.post(); } - + return; } - if (threaded){ + if (threaded) { globalSem.wait(); } @@ -224,90 +290,162 @@ namespace TS { int packNum = 1; std::deque & inStream = pesStreams[tid]; + if (!inStream.rbegin()->getUnitStart()){ + if (threaded) { + globalSem.post(); + } + return; + } + std::deque::iterator lastPack = inStream.end(); std::deque::iterator curPack = inStream.begin(); curPack++; - while (curPack != inStream.end() && !curPack->getUnitStart()){ + while (curPack != lastPack && !curPack->getUnitStart()) { curPack++; packNum++; } - if (curPack != inStream.end()){ + if (curPack != lastPack) { parsePes = true; } - if (threaded){ + if (threaded) { globalSem.post(); } - if (parsePes){ + if (parsePes) { parsePES(tid); } } void Stream::parse(Packet & newPack, unsigned long long bytePos) { add(newPack, bytePos); - int tid = newPack.getPID(); parse(tid); } bool Stream::hasPacketOnEachTrack() const { - if (threaded){ + if (threaded) { globalSem.wait(); } - if (!pidToCodec.size() || pidToCodec.size() != outPackets.size()){ - if (threaded){ + if (!pidToCodec.size() ) { + + if (threaded) { globalSem.post(); } + + //INFO_MSG("no packet on each track 1, pidtocodec.size: %d, outpacket.size: %d", pidToCodec.size(), outPackets.size()); return false; } - for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){ - if (!hasPacket(it->first)){ - if (threaded){ - globalSem.post(); + + unsigned int missing = 0; + uint64_t firstTime = 0xffffffffffffffffull, lastTime = 0; + for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++) { + if (!hasPacket(it->first)) { + missing++; + }else{ + if(outPackets.at(it->first).front().getTime() < firstTime){ + firstTime = outPackets.at(it->first).front().getTime(); + } + if(outPackets.at(it->first).back().getTime() > lastTime){ + lastTime = outPackets.at(it->first).back().getTime(); } - return false; } } - if (threaded){ + + if (threaded) { globalSem.post(); } - return true; + + return (!missing || (missing != pidToCodec.size() && lastTime - firstTime > 2000)); } - + bool Stream::hasPacket(unsigned long tid) const { - if (threaded){ + if (threaded) { globalSem.wait(); } - if (!pesStreams.count(tid)){ - if (threaded){ + if (!pesStreams.count(tid)) { + if (threaded) { globalSem.post(); } return false; } - if (outPackets.count(tid) && outPackets.at(tid).size()){ - if (threaded){ + if (outPackets.count(tid) && outPackets.at(tid).size()) { + if (threaded) { globalSem.post(); } return true; } std::deque::const_iterator curPack = pesStreams.at(tid).begin(); - curPack++; - while (curPack != pesStreams.at(tid).end() && !curPack->getUnitStart()){ + + if (curPack != pesStreams.at(tid).end()) { curPack++; } - if (curPack != pesStreams.at(tid).end()){ - if (threaded){ + + while (curPack != pesStreams.at(tid).end() && !curPack->getUnitStart()) { + curPack++; + } + + if (curPack != pesStreams.at(tid).end()) { + if (threaded) { globalSem.post(); } return true; } - if (threaded){ + if (threaded) { globalSem.post(); } return false; } - unsigned long long decodePTS(const char * data){ + bool Stream::hasPacket() const { + if (threaded) { + globalSem.wait(); + } + if (!pesStreams.size()) { + if (threaded) { + globalSem.post(); + } + return false; + } + + if (outPackets.size()) { + for(std::map >::const_iterator i = outPackets.begin(); i != outPackets.end();i++){ + if(i->second.size()){ + if (threaded) { + globalSem.post(); + } + return true; + } + } + } + + + + for(std::map >::const_iterator i = pesStreams.begin(); i != pesStreams.end();i++){ + std::deque::const_iterator curPack = i->second.begin(); + + if(curPack != i->second.end()){ + curPack++; + } + + while(curPack != i->second.end() && !curPack->getUnitStart()){ + curPack++; + } + + if(curPack != i->second.end()){ + if (threaded) { + globalSem.post(); + } + return true; + } + } + + if (threaded) { + globalSem.post(); + } + return false; + } + + unsigned long long decodePTS(const char * data) { unsigned long long time; time = ((data[0] >> 1) & 0x07); time <<= 15; @@ -318,15 +456,17 @@ namespace TS { return time; } - void Stream::parsePES(unsigned long tid){ - if (!pidToCodec.count(tid)){return;}//skip unknown codecs - if (threaded){ + void Stream::parsePES(unsigned long tid, bool finished) { + if (!pidToCodec.count(tid)) { + return; //skip unknown codecs + } + if (threaded) { globalSem.wait(); } std::deque & inStream = pesStreams[tid]; std::deque & inPositions = pesPositions[tid]; - if (inStream.size() == 1){ - if (threaded){ + if (inStream.size() <= 1) { + if (threaded) { globalSem.post(); } return; @@ -336,34 +476,38 @@ namespace TS { std::deque::iterator curPack = inStream.begin(); curPack++; - while (curPack != inStream.end() && !curPack->getUnitStart()){ + while (curPack != inStream.end() && !curPack->getUnitStart()) { curPack++; packNum++; } - if (curPack == inStream.end()){ - if (threaded){ + if (!finished && curPack == inStream.end()) { + if (threaded) { globalSem.post(); } return; } unsigned long long bPos = inPositions.front(); + //Create a buffer for the current PES, and remove it from the pesStreams buffer. int paySize = 0; - + curPack = inStream.begin(); - for (int i = 0; i < packNum; i++){ + for (int i = 0; i < packNum; i++) { paySize += curPack->getPayloadLength(); curPack++; } VERYHIGH_MSG("Parsing PES for track %lu, length %i", tid, paySize); - char * payload = (char*)malloc(paySize); + char * payload = (char *)malloc(paySize); paySize = 0; curPack = inStream.begin(); int lastCtr = curPack->getContinuityCounter() - 1; - for (int i = 0; i < packNum; i++){ - if (curPack->getContinuityCounter() == lastCtr){curPack++; continue;} - if (curPack->getContinuityCounter() - lastCtr != 1 && curPack->getContinuityCounter()){ + for (int i = 0; i < packNum; i++) { + if (curPack->getContinuityCounter() == lastCtr) { + curPack++; + continue; + } + if (curPack->getContinuityCounter() - lastCtr != 1 && curPack->getContinuityCounter()) { INFO_MSG("Parsing a pes on track %d, missed %d packets", tid, curPack->getContinuityCounter() - lastCtr - 1); } lastCtr = curPack->getContinuityCounter(); @@ -373,24 +517,24 @@ namespace TS { } inStream.erase(inStream.begin(), curPack); inPositions.erase(inPositions.begin(), inPositions.begin() + packNum); - if (threaded){ + if (threaded) { globalSem.post(); } //Parse the PES header int offset = 0; - while(offset < paySize){ + while (offset < paySize) { const char * pesHeader = payload + offset; //Check for large enough buffer - if ((paySize - offset) < 9 || (paySize - offset) < 9 + pesHeader[8]){ + if ((paySize - offset) < 9 || (paySize - offset) < 9 + pesHeader[8]) { INFO_MSG("Not enough data on track %lu (%d / %d), discarding remainder of data", tid, paySize - offset, 9 + pesHeader[8]); break; } //Check for valid PES lead-in - if(pesHeader[0] != 0 || pesHeader[1] != 0x00 || pesHeader[2] != 0x01){ + if (pesHeader[0] != 0 || pesHeader[1] != 0x00 || pesHeader[2] != 0x01) { INFO_MSG("Invalid PES Lead in on track %lu, discarding it", tid); break; } @@ -399,25 +543,22 @@ namespace TS { //Note: if the payload size is 0, then we assume the pes packet will cover the entire TS Unit. //Note: this is technically only allowed for video pes streams. unsigned long long realPayloadSize = (((int)pesHeader[4] << 8) | pesHeader[5]); - if (!realPayloadSize){ + if (!realPayloadSize) { realPayloadSize = paySize; + } else { + realPayloadSize += 6; } - if (pidToCodec[tid] == AAC || pidToCodec[tid] == MP3 || pidToCodec[tid] == AC3){ - realPayloadSize -= (3 + pesHeader[8]); - }else{ - realPayloadSize -= (9 + pesHeader[8]); - } - + realPayloadSize -= (9 + pesHeader[8]); //Read the metadata for this PES Packet ///\todo Determine keyframe-ness unsigned int timeStamp = 0; - unsigned int timeOffset = 0; + int64_t timeOffset = 0; unsigned int pesOffset = 9; - if ((pesHeader[7] >> 6) & 0x02){//Check for PTS presence + if ((pesHeader[7] >> 6) & 0x02) { //Check for PTS presence timeStamp = decodePTS(pesHeader + pesOffset); pesOffset += 5; - if (((pesHeader[7] & 0xC0) >> 6) & 0x01){//Check for DTS presence (yes, only if PTS present) + if (((pesHeader[7] & 0xC0) >> 6) & 0x01) { //Check for DTS presence (yes, only if PTS present) timeOffset = timeStamp; timeStamp = decodePTS(pesHeader + pesOffset); pesOffset += 5; @@ -425,415 +566,537 @@ namespace TS { } } - if (pesHeader[7] & 0x20){ //ESCR - ignored + if (pesHeader[7] & 0x20) { //ESCR - ignored pesOffset += 6; } - if (pesHeader[7] & 0x10){ //ESR - ignored + if (pesHeader[7] & 0x10) { //ESR - ignored pesOffset += 3; } - if (pesHeader[7] & 0x08){ //trick mode - ignored + if (pesHeader[7] & 0x08) { //trick mode - ignored pesOffset += 1; } - if (pesHeader[7] & 0x04){//additional copy - ignored + if (pesHeader[7] & 0x04) { //additional copy - ignored pesOffset += 1; } - if (pesHeader[7] & 0x02){ //crc - ignored + if (pesHeader[7] & 0x02) { //crc - ignored pesOffset += 2; } - if (paySize - offset - pesOffset < realPayloadSize){ + if (paySize - offset - pesOffset < realPayloadSize) { WARN_MSG("Packet loss detected, glitches will occur"); realPayloadSize = paySize - offset - pesOffset; } - char * pesPayload = payload + offset + pesOffset; + const char * pesPayload = pesHeader + pesOffset; - //Create a new (empty) DTSC Packet at the end of the buffer - if (pidToCodec[tid] == AAC){ - //Parse all the ADTS packets - unsigned long offsetInPes = 0; - uint64_t msRead = 0; - if (threaded){ - globalSem.wait(); - } - static std::map remainders; - if (remainders.count(tid) && remainders[tid].len){ - offsetInPes = std::min((unsigned long)(remainders[tid].len - remainders[tid].now), (unsigned long)realPayloadSize); - memcpy(remainders[tid].data+remainders[tid].now, pesPayload, offsetInPes); - remainders[tid].now += offsetInPes; - if (remainders[tid].now == remainders[tid].len){ - aac::adts adtsPack(remainders[tid].data, remainders[tid].len); - if (adtsPack){ - if (!adtsInfo.count(tid) || !adtsInfo[tid].sameHeader(adtsPack)){ - MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); - adtsInfo[tid] = adtsPack; - } - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill(timeStamp-((adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency()), timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), remainders[tid].bpos, 0); - } - remainders[tid].len = 0; - } - } - while (offsetInPes < realPayloadSize){ - aac::adts adtsPack(pesPayload + offsetInPes, realPayloadSize - offsetInPes); - if (adtsPack && adtsPack.getCompleteSize() + offsetInPes <= realPayloadSize){ - if (!adtsInfo.count(tid) || !adtsInfo[tid].sameHeader(adtsPack)){ - MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); - adtsInfo[tid] = adtsPack; - } - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill(timeStamp + msRead, timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos, 0); - msRead += (adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency(); - offsetInPes += adtsPack.getCompleteSize(); - }else{ - /// \todo What about the case that we have an invalid start, going over the PES boundary? - if (!adtsPack.hasSync()){ - offsetInPes++; - }else{ - //remainder, keep it, use it next time - remainders[tid].setRemainder(adtsPack, pesPayload + offsetInPes, realPayloadSize - offsetInPes, bPos); - offsetInPes = realPayloadSize;//skip to end of PES - } - } - } - if (threaded){ - globalSem.post(); - } + if (memmem(pesPayload, realPayloadSize, "DTP2", 4) != 0) { + INFO_MSG("dtp found"); } - if (pidToCodec[tid] == ID3 || pidToCodec[tid] == AC3){ - if (threaded){ - globalSem.wait(); - } - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, bPos, 0); - if (threaded){ - globalSem.post(); - } - } - if (pidToCodec[tid] == H264 || pidToCodec[tid] == H265){ - //Convert from annex b - char * parsedData = (char*)malloc(realPayloadSize * 2); - bool isKeyFrame = false; - unsigned long parsedSize = nalu::fromAnnexB(pesPayload, realPayloadSize, parsedData); - std::deque nalInfo; - if (pidToCodec[tid] == H264) { - nalInfo = h264::analysePackets(parsedData, parsedSize); - } - if (pidToCodec[tid] == H265){ - nalInfo = h265::analysePackets(parsedData, parsedSize); - } - int dataOffset = 0; - bool firstSlice = true; - for (std::deque::iterator it = nalInfo.begin(); it != nalInfo.end(); it++){ - if (pidToCodec[tid] == H264){ - switch (it->nalType){ - case 0x01: { - if (firstSlice) { - firstSlice = false; - if (!isKeyFrame){ - char * data = parsedData + dataOffset + 4; - Utils::bitstream bs; - for (size_t i = 1; i < 10 && i < it->nalSize; i++) { - if (i + 2 < it->nalSize && (memcmp(data + i, "\000\000\003", 3) == 0)) { //Emulation prevention bytes - bs.append(data + i, 2); - i += 2; - } else { - bs.append(data + i, 1); - } - } - bs.getExpGolomb();//Discard first_mb_in_slice - uint64_t sliceType = bs.getUExpGolomb(); - if (sliceType == 2 || sliceType == 4 || sliceType == 7 || sliceType == 9){ - isKeyFrame = true; - } - } - } - break; - } - case 0x05: { - isKeyFrame = true; - break; - } - case 0x07: { - if (threaded){ - globalSem.wait(); - } - spsInfo[tid] = std::string(parsedData + dataOffset + 4, it->nalSize); - if (threaded){ - globalSem.post(); - } - break; - } - case 0x08: { - if (threaded){ - globalSem.wait(); - } - ppsInfo[tid] = std::string(parsedData + dataOffset + 4, it->nalSize); - if (threaded){ - globalSem.post(); - } - break; - } - default: break; - } - } - if (pidToCodec[tid] == H265){ - switch (it->nalType){ - case 2: case 3: //TSA Picture - case 4: case 5: //STSA Picture - case 6: case 7: //RADL Picture - case 8: case 9: //RASL Picture - case 16: case 17: case 18: //BLA Picture - case 19: case 20: //IDR Picture - case 21: { //CRA Picture - isKeyFrame = true; - break; - } - case 32: - case 33: - case 34: { - if (threaded){ - globalSem.wait(); - } - hevcInfo[tid].addUnit(parsedData + dataOffset); - if (threaded){ - globalSem.post(); - } - break; - } - default: break; - } - } - dataOffset += 4 + it->nalSize; - } - if (threaded){ - globalSem.wait(); - } - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, parsedData, parsedSize, bPos, isKeyFrame); - if (threaded){ - globalSem.post(); - } - free(parsedData); - } - //We are done with the realpayload size, reverse calculation so we know the correct offset increase. - if (pidToCodec[tid] == AAC){ - realPayloadSize += (3 + pesHeader[8]); - }else{ - realPayloadSize += (9 + pesHeader[8]); - } - offset += realPayloadSize + 6; + + + parseBitstream(tid, pesPayload, realPayloadSize, timeStamp, timeOffset, bPos); + +//We are done with the realpayload size, reverse calculation so we know the correct offset increase. + realPayloadSize += (9 + pesHeader[8]); + offset += realPayloadSize; } free(payload); } + + void Stream::parseBitstream(uint32_t tid, const char * pesPayload, uint32_t realPayloadSize, uint64_t timeStamp, int64_t timeOffset, uint64_t bPos) { + + //Create a new (empty) DTSC Packet at the end of the buffer + if (pidToCodec[tid] == AAC) { + //Parse all the ADTS packets + unsigned long offsetInPes = 0; + uint64_t msRead = 0; + if (threaded) { + globalSem.wait(); + } + + if (remainders.count(tid) && remainders[tid].getLength()) { + offsetInPes = std::min((unsigned long)(remainders[tid].getTodo()), (unsigned long)realPayloadSize); + remainders[tid].append(pesPayload, offsetInPes); + + if (remainders[tid].isComplete()) { + aac::adts adtsPack(remainders[tid].getData(), remainders[tid].getLength()); + if (adtsPack) { + if (!adtsInfo.count(tid) || !adtsInfo[tid].sameHeader(adtsPack)) { + MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); + adtsInfo[tid] = adtsPack; + } + outPackets[tid].push_back(DTSC::Packet()); + outPackets[tid].back().genericFill(timeStamp - ((adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency()), timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), remainders[tid].getBpos(), 0); + } + remainders[tid].clear(); + } + } + while (offsetInPes < realPayloadSize) { + aac::adts adtsPack(pesPayload + offsetInPes, realPayloadSize - offsetInPes); + if (adtsPack && adtsPack.getCompleteSize() + offsetInPes <= realPayloadSize) { + if (!adtsInfo.count(tid) || !adtsInfo[tid].sameHeader(adtsPack)) { + MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); + adtsInfo[tid] = adtsPack; + } + outPackets[tid].push_back(DTSC::Packet()); + outPackets[tid].back().genericFill(timeStamp + msRead, timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos, 0); + msRead += (adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency(); + offsetInPes += adtsPack.getCompleteSize(); + } else { + /// \todo What about the case that we have an invalid start, going over the PES boundary? + if (!adtsPack.hasSync()) { + offsetInPes++; + } else { + //remainder, keep it, use it next time + remainders[tid].setRemainder(adtsPack, pesPayload + offsetInPes, realPayloadSize - offsetInPes, bPos); + offsetInPes = realPayloadSize;//skip to end of PES + } + } + } + if (threaded) { + globalSem.post(); + } + } + if (pidToCodec[tid] == ID3 || pidToCodec[tid] == AC3) { + if (threaded) { + globalSem.wait(); + } + outPackets[tid].push_back(DTSC::Packet()); + outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, bPos, 0); + + if (threaded) { + globalSem.post(); + } + } + + + if (pidToCodec[tid] == H264 || pidToCodec[tid] == H265) { + //loop through scanAnnexB until startcode is found, if packetPointer equals NULL, then read next PES packet + bool completePES = false; + const char * packetPtr; + const char * nalEnd; + bool checkForKeyFrame = true; + bool isKeyFrame = false; + int nalRemove = 0; + bool clearKey = false; + + nalu::scanAnnexB(pesPayload, realPayloadSize, packetPtr); +// std::cerr << "\t\tNew PES Packet" << std::endl; + while (!completePES) { + + if (packetPtr) { + //when startcode is found, check if we were already constructing a packet. + if (!partialBuffer[tid].empty()) { + parseNal(tid, partialBuffer[tid].c_str(), partialBuffer[tid].c_str() + partialBuffer[tid].length(), isKeyFrame); + } else { + parseNal(tid, pesPayload, packetPtr, isKeyFrame); + } + + if (!isKeyFrame || clearKey) { + clearKey = true; + } + + nalEnd = nalu::nalEndPosition(pesPayload, packetPtr - pesPayload); + nalRemove = packetPtr - nalEnd; + + if (firstPacketFound && checkForKeyFrame) { + checkForKeyFrame = false; + } + + if (!buildPacket[tid] && !firstPacketFound) { + //clean start + //remove everything before this point as this is the very first hal packet. + if (partialBuffer[tid].empty() && !firstPacketFound) { //if buffer is empty + //very first packet, check for keyframe + checkForKeyFrame = true; + } + firstPacketFound = true; + } else { + if (!buildPacket[tid]) { //when no data in packet -> genericFill + buildPacket[tid].genericFill(timeStamp, timeOffset, tid, "\000\000\000\002\011\360", 6, bPos, true); + } + + //if the timestamp differs from current PES timestamp, send the previous packet out and fill a new one. + if (buildPacket[tid].getTime() != timeStamp) { + //next packet's timestamp differs from current timestamp, add hal packet to buildpacket and send it out. + if (!partialBuffer[tid].empty()) { + //std::cerr << "append remaining data to partialbuffer" << std::endl; + buildPacket[tid].appendNal(partialBuffer[tid].c_str(), partialBuffer[tid].length(), + partialBuffer[tid].length() + (packetPtr - pesPayload) - nalRemove); + buildPacket[tid].appendData(pesPayload, (packetPtr - pesPayload) - nalRemove); + if (clearKey) { + buildPacket[tid].clearKeyFrame(); + } + + outPackets[tid].push_back(buildPacket[tid]); + buildPacket[tid].null(); + buildPacket[tid].genericFill(timeStamp, timeOffset, tid, "\000\000\000\002\011\360", 6, bPos, true); + + partialBuffer[tid].clear(); + } else { + if (clearKey) { + buildPacket[tid].clearKeyFrame(); + } + + outPackets[tid].push_back(buildPacket[tid]); + buildPacket[tid].null(); + buildPacket[tid].genericFill(timeStamp, timeOffset, tid, "\000\000\000\002\011\360", 6, bPos, true); + + buildPacket[tid].appendNal(pesPayload, (packetPtr - pesPayload) - nalRemove, (packetPtr - pesPayload) - nalRemove); + } + + if (threaded) { + globalSem.wait(); + } + + if (threaded) { + globalSem.post(); + } + + } else { + //we have a partial packet which belongs to the previous packet in partialBuffer. + if (!partialBuffer[tid].empty()) { //if buffer is used + buildPacket[tid].appendNal(partialBuffer[tid].c_str(), partialBuffer[tid].length(), + partialBuffer[tid].length() + (packetPtr - pesPayload) - nalRemove); + buildPacket[tid].appendData(pesPayload, (packetPtr - pesPayload) - nalRemove); + partialBuffer.clear(); + + } else { + //hal packet at first position +// buildPacket[tid].appendData(pesPayload, packetPtr-pesPayload); //append part before the startcode. + buildPacket[tid].appendNal(pesPayload, (packetPtr - pesPayload) - nalRemove, (packetPtr - pesPayload) - nalRemove); //append part before the startcode. + } + } + } + + realPayloadSize -= ((packetPtr - pesPayload) + 3); //decrease the total size + pesPayload = packetPtr + 3; + } else { //no startcode found... + if (partialBuffer[tid].empty()) { + partialBuffer[tid].assign(pesPayload, realPayloadSize); + } else { + partialBuffer[tid].append(pesPayload, realPayloadSize); + } + + completePES = true; + } + + nalu::scanAnnexB(pesPayload, realPayloadSize, packetPtr); + } + + } + + } + void Stream::getPacket(unsigned long tid, DTSC::Packet & pack) { pack.null(); - if (!hasPacket(tid)){ + if (!hasPacket(tid)) { ERROR_MSG("Trying to obtain a packet on track %lu, but no full packet is available", tid); return; } - if (threaded){ + if (threaded) { globalSem.wait(); } bool packetReady = outPackets.count(tid) && outPackets[tid].size(); - if (threaded){ + if (threaded) { globalSem.post(); } - if (!packetReady){ + if (!packetReady) { parse(tid); } - - if (threaded){ + + if (threaded) { globalSem.wait(); } packetReady = outPackets.count(tid) && outPackets[tid].size(); - if (threaded){ + if (threaded) { globalSem.post(); } - - if (!packetReady){ + + if (!packetReady) { ERROR_MSG("Obtaining a packet on track %lu failed", tid); return; } - if (threaded){ + if (threaded) { globalSem.wait(); } pack = outPackets[tid].front(); outPackets[tid].pop_front(); - - if (!outPackets[tid].size()){ + + if (!outPackets[tid].size()) { outPackets.erase(tid); } - if (threaded){ + if (threaded) { globalSem.post(); } } - void Stream::getEarliestPacket(DTSC::Packet & pack){ - if (threaded){ + void Stream::parseNal(uint32_t tid, const char * pesPayload, const char * packetPtr, bool & isKeyFrame) { + //bool isKeyFrame = false; + //const char * packetPtr; + bool firstSlice = true; + char typeNal; + + isKeyFrame = false; + typeNal = pesPayload[0] & 0x1F; + + if (pidToCodec[tid] == H264) { + switch (typeNal) { + case 0x01: { + if (firstSlice) { + firstSlice = false; + if (!isKeyFrame) { + Utils::bitstream bs; + for (size_t i = 1; i < 10 && i < (packetPtr - pesPayload); i++) { + if (i + 2 < (packetPtr - pesPayload) && (memcmp(pesPayload + i, "\000\000\003", 3) == 0)) { //Emulation prevention bytes + bs.append(pesPayload + i, 2); + i += 2; + } else { + bs.append(pesPayload + i, 1); + } + } + bs.getExpGolomb();//Discard first_mb_in_slice + uint64_t sliceType = bs.getUExpGolomb(); + if (sliceType == 2 || sliceType == 4 || sliceType == 7 || sliceType == 9) { + isKeyFrame = true; + } + } + } + break; + } + case 0x05: { + isKeyFrame = true; + break; + } + case 0x07: { + if (threaded) { + globalSem.wait(); + } + spsInfo[tid] = std::string(pesPayload, (packetPtr - pesPayload)); + if (threaded) { + globalSem.post(); + } + break; + } + case 0x08: { + if (threaded) { + globalSem.wait(); + } + ppsInfo[tid] = std::string(pesPayload, (packetPtr - pesPayload)); + if (threaded) { + globalSem.post(); + } + break; + } + default: + break; + } + } else if (pidToCodec[tid] == H265) { + switch (typeNal) { + case 2: + case 3: //TSA Picture + case 4: + case 5: //STSA Picture + case 6: + case 7: //RADL Picture + case 8: + case 9: //RASL Picture + case 16: + case 17: + case 18: //BLA Picture + case 19: + case 20: //IDR Picture + case 21: { //CRA Picture + isKeyFrame = true; + break; + } + case 32: + case 33: + case 34: { + if (threaded) { + globalSem.wait(); + } + hevcInfo[tid].addUnit((char *)pesPayload);//may i convert to (char *)? + if (threaded) { + globalSem.post(); + } + break; + } + default: + break; + } + } + } + + void Stream::getEarliestPacket(DTSC::Packet & pack) { + if (threaded) { globalSem.wait(); } pack.null(); - if (!hasPacketOnEachTrack()){ - if (threaded){ - globalSem.post(); - } - return; - } unsigned long packTime = 0xFFFFFFFFull; unsigned long packTrack = 0; - for (std::map >::iterator it = outPackets.begin(); it != outPackets.end(); it++){ - if (it->second.front().getTime() < packTime){ + for (std::map >::iterator it = outPackets.begin(); it != outPackets.end(); it++) { + if (it->second.front().getTime() < packTime) { packTrack = it->first; packTime = it->second.front().getTime(); } } - if (threaded){ + if (threaded) { globalSem.post(); } - getPacket(packTrack, pack); + if(packTrack){ + getPacket(packTrack, pack); + } } - void Stream::initializeMetadata(DTSC::Meta & meta, unsigned long tid) { - if (threaded){ + void Stream::initializeMetadata(DTSC::Meta & meta, unsigned long tid, unsigned long mappingId) { + if (threaded) { globalSem.wait(); } - for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){ - if (tid && it->first != tid){ + + unsigned long mId = mappingId; + + for (std::map::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++) { + if (tid && it->first != tid) { continue; } - if (meta.tracks.count(it->first) && meta.tracks[it->first].codec.size()){continue;} - switch (it->second){ + + if (mId == 0) { + mId = it->first; + } + + if (meta.tracks.count(mId) && meta.tracks[mId].codec.size()) { + continue; + } + + switch (it->second) { case H264: { - if (!spsInfo.count(it->first) || !ppsInfo.count(it->first)){ - MEDIUM_MSG("Aborted meta fill for h264 track %lu: no SPS/PPS", it->first); - continue; + if (!spsInfo.count(it->first) || !ppsInfo.count(it->first)) { + MEDIUM_MSG("Aborted meta fill for h264 track %lu: no SPS/PPS", it->first); + continue; + } + meta.tracks[mId].type = "video"; + meta.tracks[mId].codec = "H264"; + meta.tracks[mId].trackID = mId; + std::string tmpBuffer = spsInfo[it->first]; + h264::sequenceParameterSet sps(spsInfo[it->first].data(), spsInfo[it->first].size()); + h264::SPSMeta spsChar = sps.getCharacteristics(); + meta.tracks[mId].width = spsChar.width; + meta.tracks[mId].height = spsChar.height; + meta.tracks[mId].fpks = spsChar.fps * 1000; + MP4::AVCC avccBox; + avccBox.setVersion(1); + avccBox.setProfile(spsInfo[it->first][1]); + avccBox.setCompatibleProfiles(spsInfo[it->first][2]); + avccBox.setLevel(spsInfo[it->first][3]); + avccBox.setSPSNumber(1); + avccBox.setSPS(spsInfo[it->first]); + avccBox.setPPSNumber(1); + avccBox.setPPS(ppsInfo[it->first]); + meta.tracks[mId].init = std::string(avccBox.payload(), avccBox.payloadSize()); } - meta.tracks[it->first].type = "video"; - meta.tracks[it->first].codec = "H264"; - meta.tracks[it->first].trackID = it->first; - h264::sequenceParameterSet sps(spsInfo[it->first].data(), spsInfo[it->first].size()); - h264::SPSMeta spsChar = sps.getCharacteristics(); - meta.tracks[it->first].width = spsChar.width; - meta.tracks[it->first].height = spsChar.height; - meta.tracks[it->first].fpks = spsChar.fps * 1000; - MP4::AVCC avccBox; - avccBox.setVersion(1); - avccBox.setProfile(spsInfo[it->first][1]); - avccBox.setCompatibleProfiles(spsInfo[it->first][2]); - avccBox.setLevel(spsInfo[it->first][3]); - avccBox.setSPSNumber(1); - avccBox.setSPS(spsInfo[it->first]); - avccBox.setPPSNumber(1); - avccBox.setPPS(ppsInfo[it->first]); - meta.tracks[it->first].init = std::string(avccBox.payload(), avccBox.payloadSize()); - - - - - } - break; + break; case H265: { - if (!hevcInfo.count(it->first) || !hevcInfo[it->first].haveRequired()){ - MEDIUM_MSG("Aborted meta fill for hevc track %lu: no info nal unit", it->first); - continue; - } - meta.tracks[it->first].type = "video"; - meta.tracks[it->first].codec = "HEVC"; - meta.tracks[it->first].trackID = it->first; - meta.tracks[it->first].init = hevcInfo[it->first].generateHVCC(); - int pmtCount = associationTable.getProgramCount(); - for (int i = 0; i < pmtCount; i++){ - int pid = associationTable.getProgramPID(i); - ProgramMappingEntry entry = mappingTable[pid].getEntry(0); - while (entry){ - if (entry.getElementaryPid() == tid){ - meta.tracks[it->first].lang = ProgramDescriptors(entry.getESInfo(), entry.getESInfoLength()).getLanguage(); + if (!hevcInfo.count(it->first) || !hevcInfo[it->first].haveRequired()) { + MEDIUM_MSG("Aborted meta fill for hevc track %lu: no info nal unit", it->first); + continue; + } + meta.tracks[mId].type = "video"; + meta.tracks[mId].codec = "HEVC"; + meta.tracks[mId].trackID = mId; + meta.tracks[mId].init = hevcInfo[it->first].generateHVCC(); + int pmtCount = associationTable.getProgramCount(); + for (int i = 0; i < pmtCount; i++) { + int pid = associationTable.getProgramPID(i); + ProgramMappingEntry entry = mappingTable[pid].getEntry(0); + while (entry) { + if (entry.getElementaryPid() == tid) { + meta.tracks[mId].lang = ProgramDescriptors(entry.getESInfo(), entry.getESInfoLength()).getLanguage(); + } + entry.advance(); } - entry.advance(); } } - } - break; + break; case ID3: { - meta.tracks[it->first].type = "meta"; - meta.tracks[it->first].codec = "ID3"; - meta.tracks[it->first].trackID = it->first; - meta.tracks[it->first].init = metaInit[it->first]; - } - break; + meta.tracks[mId].type = "meta"; + meta.tracks[mId].codec = "ID3"; + meta.tracks[mId].trackID = mId; + meta.tracks[mId].init = metaInit[it->first]; + } + break; case AC3: { - meta.tracks[it->first].type = "audio"; - meta.tracks[it->first].codec = "AC3"; - meta.tracks[it->first].trackID = it->first; - meta.tracks[it->first].size = 16; - ///\todo Fix these 2 values - meta.tracks[it->first].rate = 0; - meta.tracks[it->first].channels = 0; - } - break; + meta.tracks[mId].type = "audio"; + meta.tracks[mId].codec = "AC3"; + meta.tracks[mId].trackID = mId; + meta.tracks[mId].size = 16; + ///\todo Fix these 2 values + meta.tracks[mId].rate = 0; + meta.tracks[mId].channels = 0; + } + break; case AAC: { - meta.tracks[it->first].type = "audio"; - meta.tracks[it->first].codec = "AAC"; - meta.tracks[it->first].trackID = it->first; - meta.tracks[it->first].size = 16; - meta.tracks[it->first].rate = adtsInfo[it->first].getFrequency(); - meta.tracks[it->first].channels = adtsInfo[it->first].getChannelCount(); - char audioInit[2];//5 bits object type, 4 bits frequency index, 4 bits channel index - audioInit[0] = ((adtsInfo[it->first].getAACProfile() & 0x1F) << 3) | ((adtsInfo[it->first].getFrequencyIndex() & 0x0E) >> 1); - audioInit[1] = ((adtsInfo[it->first].getFrequencyIndex() & 0x01) << 7) | ((adtsInfo[it->first].getChannelConfig() & 0x0F) << 3); - meta.tracks[it->first].init = std::string(audioInit, 2); - } - break; + meta.tracks[mId].type = "audio"; + meta.tracks[mId].codec = "AAC"; + meta.tracks[mId].trackID = mId; + meta.tracks[mId].size = 16; + meta.tracks[mId].rate = adtsInfo[it->first].getFrequency(); + meta.tracks[mId].channels = adtsInfo[it->first].getChannelCount(); + char audioInit[2];//5 bits object type, 4 bits frequency index, 4 bits channel index + audioInit[0] = ((adtsInfo[it->first].getAACProfile() & 0x1F) << 3) | ((adtsInfo[it->first].getFrequencyIndex() & 0x0E) >> 1); + audioInit[1] = ((adtsInfo[it->first].getFrequencyIndex() & 0x01) << 7) | ((adtsInfo[it->first].getChannelConfig() & 0x0F) << 3); + meta.tracks[mId].init = std::string(audioInit, 2); + } + break; } int pmtCount = associationTable.getProgramCount(); - for (int i = 0; i < pmtCount; i++){ + for (int i = 0; i < pmtCount; i++) { int pid = associationTable.getProgramPID(i); ProgramMappingEntry entry = mappingTable[pid].getEntry(0); - while (entry){ - if (entry.getElementaryPid() == tid){ - meta.tracks[it->first].lang = ProgramDescriptors(entry.getESInfo(), entry.getESInfoLength()).getLanguage(); + while (entry) { + if (entry.getElementaryPid() == tid) { + meta.tracks[mId].lang = ProgramDescriptors(entry.getESInfo(), entry.getESInfoLength()).getLanguage(); } entry.advance(); } } - MEDIUM_MSG("Initialized track %lu as %s %s", it->first, meta.tracks[it->first].codec.c_str(), meta.tracks[it->first].type.c_str()); + MEDIUM_MSG("Initialized track %lu as %s %s", it->first, meta.tracks[mId].codec.c_str(), meta.tracks[mId].type.c_str()); } - if (threaded){ + if (threaded) { globalSem.post(); } } std::set Stream::getActiveTracks() { - if (threaded){ + if (threaded) { globalSem.wait(); } std::set result; //Track 0 is always active result.insert(0); //IF PAT updated in the last 5 seconds, check for contents - if (Util::bootSecs() - lastPAT < 5){ + if (Util::bootSecs() - lastPAT < 5) { int pmtCount = associationTable.getProgramCount(); //For each PMT - for (int i = 0; i < pmtCount; i++){ + for (int i = 0; i < pmtCount; i++) { int pid = associationTable.getProgramPID(i); //Add PMT track result.insert(pid); //IF PMT updated in last 5 seconds, check for contents - if (Util::bootSecs() - lastPMT[pid] < 5){ + if (Util::bootSecs() - lastPMT[pid] < 5) { ProgramMappingEntry entry = mappingTable[pid].getEntry(0); //Add all tracks in PMT - while (entry){ - switch(entry.getStreamType()){ + while (entry) { + switch (entry.getStreamType()) { case H264: case AAC: case H265: @@ -849,20 +1112,20 @@ namespace TS { } } } - if (threaded){ + if (threaded) { globalSem.post(); } return result; } - void Stream::eraseTrack(unsigned long tid){ - if (threaded){ + void Stream::eraseTrack(unsigned long tid) { + if (threaded) { globalSem.wait(); } pesStreams.erase(tid); pesPositions.erase(tid); outPackets.erase(tid); - if (threaded){ + if (threaded) { globalSem.post(); } } diff --git a/lib/ts_stream.h b/lib/ts_stream.h index db93e1f1..7dffa8ad 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -16,6 +16,28 @@ namespace TS { H265 = 0x24, ID3 = 0x15 }; + + class ADTSRemainder{ + private: + char * data; + uint32_t max; + uint32_t now; + uint32_t len; + uint32_t bpos; + public: + void setRemainder(const aac::adts & p, const void * source, const uint32_t avail, const uint32_t bPos); + + ADTSRemainder(); + ~ADTSRemainder(); + uint32_t getLength(); + uint32_t getBpos(); + uint32_t getTodo(); + char* getData(); + + void append(const char *p,uint32_t pLen); + bool isComplete(); + void clear(); + }; class Stream{ public: @@ -26,25 +48,33 @@ namespace TS { void parse(Packet & newPack, unsigned long long bytePos); void parse(char * newPack, unsigned long long bytePos); void parse(unsigned long tid); + void parseNal(uint32_t tid, const char *pesPayload, const char * packetPtr, bool &isKeyFrame); bool hasPacketOnEachTrack() const; bool hasPacket(unsigned long tid) const; + bool hasPacket() const; void getPacket(unsigned long tid, DTSC::Packet & pack); void getEarliestPacket(DTSC::Packet & pack); - void initializeMetadata(DTSC::Meta & meta, unsigned long tid = 0); + void initializeMetadata(DTSC::Meta & meta, unsigned long tid = 0, unsigned long mappingId = 0); + void partialClear(); void clear(); + void finish(); void eraseTrack(unsigned long tid); bool isDataTrack(unsigned long tid); + void parseBitstream(uint32_t tid, const char * pesPayload, uint32_t realPayloadSize,uint64_t timeStamp, int64_t timeOffset, uint64_t bPos); std::set getActiveTracks(); private: unsigned long long lastPAT; ProgramAssociationTable associationTable; - + std::map remainders; + + bool firstPacketFound; std::map lastPMT; std::map mappingTable; std::map > pesStreams; std::map > pesPositions; std::map > outPackets; + std::map buildPacket; std::map pidToCodec; std::map adtsInfo; std::map spsInfo; @@ -52,13 +82,13 @@ namespace TS { std::map hevcInfo; std::map metaInit; std::map descriptors; - + std::map partialBuffer; mutable IPC::semaphore globalSem; bool threaded; std::set pmtTracks; - void parsePES(unsigned long tid); + void parsePES(unsigned long tid, bool finished = false); }; } diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp new file mode 100644 index 00000000..0c7576b4 --- /dev/null +++ b/src/input/input_hls.cpp @@ -0,0 +1,1216 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "input_hls.h" +#include +#include +#include +#include +#include + +#define SEM_TS_CLAIM "/MstTSIN%s" + + +namespace Mist { + + Playlist::Playlist(){ + lastFileIndex = 0; + entryCount = 0; + waitTime = 2; + playlistEnd = false; + noChangeCount = 0; + vodLive = false; + } + + /// Constructor of HLS Input + inputHLS::inputHLS(Util::Config * cfg) : Input(cfg) { + currentPlaylist = 0; + + capa["name"] = "HLS"; + capa["decs"] = "Enables HLS Input"; + capa["source_match"].append("/*.m3u8"); + capa["source_match"].append("http://*.m3u8"); + + capa["priority"] = 9ll; + capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("AC3"); + capa["codecs"][0u][1u].append("MP3"); + + isUrl = false; + + initDone = false; + inFile = NULL; + } + + inputHLS::~inputHLS() { + if (inFile) { + fclose(inFile); + } + } + + void inputHLS::printContent() { + for(int i=0;i< playlists.size();i++) { + std::cout << i << ": " << playlists[i].uri << std::endl; + for(int j = 0;j < playlists[i].entries.size(); j++){ + std::cout << " " << j << ": " << playlists[i].entries.at(j).filename << " bytePos: " << playlists[i].entries[j].bytePos << std::endl; + } + } + } + + bool inputHLS::setup() { + if (config->getString("input") != "-") { + playlistFile = config->getString("input"); + INFO_MSG("opening playlist file... %s" , playlistFile.c_str()); + playlistRootPath = playlistFile.substr(0,playlistFile.rfind("/")+1); + + if(initPlaylist(playlistFile)) { +// printContent(); + return true; + } + } + + return false; + } + + void inputHLS::trackSelect(std::string trackSpec) { + selectedTracks.clear(); + long long int index; + while (trackSpec != "") { + index = trackSpec.find(' '); + selectedTracks.insert(atoi(trackSpec.substr(0, index).c_str())); + if (index != std::string::npos) { + trackSpec.erase(0, index + 1); + } else { + trackSpec = ""; + } + } + } + + void inputHLS::parseStreamHeader() { + bool hasHeader = false; + if(!hasHeader){ + myMeta = DTSC::Meta(); + } + + TS::Packet packet;//to analyse and extract data + int thisEntry = 0; + int thisPlaylist =0; + int counter = 1; + int packetId = 0; + + char * data; + unsigned int dataLen; + bool keepReading = false; + + for(int i = 0;i::iterator entryIt = playlists[i].entries.begin(); + //INFO_MSG("opening...: %s",(playlists[i].uri_root + entryIt->filename).c_str()); + + tsStream.clear(); + uint64_t lastBpos = entryIt->bytePos; + + if(isUrl){ + openURL((playlists[i].uri_root + entryIt->filename).c_str(),playlists[i]); +// packetPtr = source.c_str(); + + keepReading = packet.FromPointer(playlists[i].packetPtr); + playlists[i].packetPtr += 188; + }else{ + in.open((playlists[i].uri_root + entryIt->filename).c_str()); + keepReading = packet.FromStream(in); + } + + //while(packet.FromStream(in)) { + while(keepReading) { + tsStream.parse(packet, lastBpos); +// INFO_MSG("keepreading"); + if(isUrl){ + lastBpos = entryIt->bytePos + playlists[i].source.size(); + //get size... TODO + }else{ + lastBpos = entryIt->bytePos + in.tellg(); + } + + while (tsStream.hasPacketOnEachTrack()) { + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + int tmpTrackId = headerPack.getTrackId(); + packetId = pidMapping[(i<<16)+tmpTrackId]; + + if(packetId == 0) { + pidMapping[(i<<16)+headerPack.getTrackId()] = counter; + pidMappingR[counter] = (i<<16)+headerPack.getTrackId(); + packetId = counter; + HIGH_MSG("Added file %s, trackid: %d, mapped to: %d",(playlists[i].uri_root + entryIt->filename).c_str(),headerPack.getTrackId(),counter); + counter++; + } + +myMeta.live = (playlists[currentPlaylist].playlistType == LIVE); +myMeta.vod = !myMeta.live; + +// myMeta.live = true; +// myMeta.vod = false; + + myMeta.live = false; + myMeta.vod = true; + + if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())) { + tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); + } + } + + if(isUrl){ + if((playlists[i].packetPtr - playlists[i].source.c_str()) +188 < playlists[i].source.size()){ + keepReading = packet.FromPointer(playlists[i].packetPtr); + playlists[i].packetPtr += 188; + }else{ + keepReading = false; + } + + }else{ + keepReading = packet.FromStream(in); + } + } + + in.close(); + } + tsStream.clear(); + +INFO_MSG("end stream header tracks: %d",myMeta.tracks.size()); + if(hasHeader) { + return; + } + +// myMeta.live = true; +// myMeta.vod = false; + in.close(); + } + + bool inputHLS::readHeader() { + //if(playlists[currentPlaylist].playlistType == LIVE || isUrl){ + if(playlists[currentPlaylist].playlistType == LIVE){ + return true; + } + + std::istringstream urlSource; + std::ifstream fileSource; + + bool endOfFile = false; + bool hasHeader = false; + + //See whether a separate header file exists. + DTSC::File tmp(config->getString("input") + ".dtsh"); + if (tmp) { + myMeta = tmp.getMeta(); + if (myMeta) { + hasHeader = true; + } + } + + if(!hasHeader){ + myMeta = DTSC::Meta(); + } + + TS::Packet packet;//to analyse and extract data + + int thisEntry = 0; + int thisPlaylist =0; + int counter = 1; + int packetId = 0; + + char * data; + unsigned int dataLen; + + for(int i = 0;i::iterator entryIt = playlists[i].entries.begin(); entryIt != playlists[i].entries.end();entryIt++) { + //WORK + tsStream.partialClear(); + endOfFile = false; + + if(isUrl){ + openURL((playlists[i].uri_root + entryIt->filename).c_str(),playlists[i]); + urlSource.str(playlists[i].source); + + if ((playlists[i].packetPtr - playlists[i].source.data() +188) < playlists[i].source.size()) + { + packet.FromPointer(playlists[i].packetPtr); + endOfFile = false; + }else{ + endOfFile = true; + } + playlists[i].packetPtr += 188; + }else{ + // fileSource.open(uri.c_str()) + in.close(); + in.open((playlists[i].uri_root + entryIt->filename).c_str()); + packet.FromStream(in); + endOfFile = in.eof(); + } + + entId++; + uint64_t lastBpos = entryIt->bytePos; + while(!endOfFile) { + tsStream.parse(packet, lastBpos); + + if(isUrl){ + lastBpos = entryIt->bytePos + playlists[currentPlaylist].source.size(); + }else{ + lastBpos = entryIt->bytePos + in.tellg(); + } + + while (tsStream.hasPacketOnEachTrack()) { + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + + int tmpTrackId = headerPack.getTrackId(); + packetId = pidMapping[(i<<16)+tmpTrackId]; + + if(packetId == 0) { + pidMapping[(i<<16)+headerPack.getTrackId()] = counter; + pidMappingR[counter] = (i<<16)+headerPack.getTrackId(); + packetId = counter; + INFO_MSG("Added file %s, trackid: %d, mapped to: %d",(playlists[i].uri_root + entryIt->filename).c_str(),headerPack.getTrackId(),counter); + counter++; + } + + if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())) { + tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); + } + + if(!hasHeader){ + headerPack.getString("data", data, dataLen); + uint64_t pBPos = headerPack.getInt("bpos"); + + //keyframe data exists, so always add 19 bytes keyframedata. + long long packOffset = headerPack.hasMember("offset")?headerPack.getInt("offset"):0; + long long packSendSize = 24 + (packOffset?17:0) + (entId>=0?15:0) + 19 + dataLen+11; + myMeta.update(headerPack.getTime(), packOffset, packetId, dataLen, entId, headerPack.hasMember("keyframe"),packSendSize); + } + } + + if(isUrl){ + if ((playlists[i].packetPtr - playlists[i].source.data() +188) < playlists[i].source.size()) + { + packet.FromPointer(playlists[i].packetPtr); + endOfFile = false; + }else{ + endOfFile = true; + } + playlists[i].packetPtr += 188; + }else{ + // fileSource.open(uri.c_str()) + packet.FromStream(in); + endOfFile = in.eof(); + } + + } +//get last packets + tsStream.finish(); + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + while (headerPack) { + + int tmpTrackId = headerPack.getTrackId(); + packetId = pidMapping[(i<<16)+tmpTrackId]; + + if(packetId == 0) { + pidMapping[(i<<16)+headerPack.getTrackId()] = counter; + pidMappingR[counter] = (i<<16)+headerPack.getTrackId(); + packetId = counter; + INFO_MSG("Added file %s, trackid: %d, mapped to: %d",(playlists[i].uri_root + entryIt->filename).c_str(),headerPack.getTrackId(),counter); + counter++; + } + + if (!hasHeader && (!myMeta.tracks.count(packetId) || !myMeta.tracks[packetId].codec.size())) { + tsStream.initializeMetadata(myMeta, tmpTrackId, packetId); + } + + if(!hasHeader){ + headerPack.getString("data", data, dataLen); + uint64_t pBPos = headerPack.getInt("bpos"); + + //keyframe data exists, so always add 19 bytes keyframedata. + long long packOffset = headerPack.hasMember("offset")?headerPack.getInt("offset"):0; + long long packSendSize = 24 + (packOffset?17:0) + (entId>=0?15:0) + 19 + dataLen+11; + myMeta.update(headerPack.getTime(), packOffset, packetId, dataLen, entId, headerPack.hasMember("keyframe"),packSendSize); + } + tsStream.getEarliestPacket(headerPack); + } + + + if(isUrl){ + in.close(); + } + + if(hasHeader) { + break; + } + + } + + } + + if(hasHeader || isUrl) { + return true; + } + + + INFO_MSG("write header file..."); + std::ofstream oFile((config->getString("input") + ".dtsh").c_str()); + + oFile << myMeta.toJSON().toNetPacked(); + oFile.close(); + in.close(); + + return true; + } + + bool inputHLS::needsLock() { + if(isUrl){ + return false; + } + return (playlists.size() <= currentPlaylist) || !(playlists[currentPlaylist].playlistType == LIVE); + } + + bool inputHLS::openStreamSource(){ + return true; + } + + int inputHLS::getFirstPlaylistToReload(){ + //at this point, we need to check which playlist we need to reload, and keep reading from that playlist until EndOfPlaylist + std::vector::iterator result = std::min_element(reloadNext.begin(), reloadNext.end()); + int playlistToReload = std::distance(reloadNext.begin(), result); + // std::cout << "min element at: " << std::distance(std::begin(reloadNext), result); + // currentPlaylist = playlistToReload; + return playlistToReload; + } + + void inputHLS::getNext(bool smart) { + INSANE_MSG("Getting next"); + uint32_t tid; + bool hasPacket = false; + bool keepReading = false; + bool endOfFile = false; + bool doReload = false; + + thisPacket.null(); + + while (!hasPacket && config->is_active) { + //tsBuf.FromStream(in); + + if(isUrl){ + + if ((playlists[currentPlaylist].packetPtr - playlists[currentPlaylist].source.data() +188) <= playlists[currentPlaylist].source.size()) + { + tsBuf.FromPointer(playlists[currentPlaylist].packetPtr); + endOfFile = false; + }else{ + endOfFile = true; + } + + playlists[currentPlaylist].packetPtr += 188; + }else{ + tsBuf.FromStream(in); + endOfFile = in.eof(); + } + + + //eof flag is set after unsuccesful read, so check again + //if(in.eof()) { + + if(endOfFile){ + tsStream.finish(); + } + + if(playlists[currentPlaylist].playlistType == LIVE){ + hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket()); + }else{ + + if(!selectedTracks.size()) { + return; + } + + tid = *selectedTracks.begin(); + hasPacket = tsStream.hasPacket(getMappedTrackId(tid)); + } + + if(endOfFile && !hasPacket) { + INFO_MSG("end of file: bootsecs: %d",Util::bootSecs()); + + if(playlists[currentPlaylist].playlistType == LIVE){ + + int a = getFirstPlaylistToReload(); + int segmentTime = 30; + HIGH_MSG("need to reload playlist %d, time: %d",a,reloadNext[a]- Util::bootSecs()); + + int f = firstSegment(); + if(f >= 0){ + segmentTime = playlists[f].entries.front().timestamp - Util::bootSecs(); + } + + int playlistTime = reloadNext.at(currentPlaylist) - Util::bootSecs() -1; + + if(playlistTime < segmentTime){ +// printBuffer(); + INFO_MSG("playlist waiting... %d ms",playlistTime * 900); + + while(playlistTime > 0){ + Util::wait(900); + nProxy.userClient.keepAlive(); + playlistTime--; + } + + //on eof, first reload playlist. + if(reloadPlaylist(playlists[a])){ +// INFO_MSG("playlist %d reloaded!",a); +// playlists[currentPlaylist].noChangeCount = 0; + }else{ + + // INFO_MSG("playlist %d reloaded without changes!, checked %d times...",currentPlaylist,playlists[currentPlaylist].noChangeCount); + // playlists[currentPlaylist].noChangeCount++; + +// if(playlists[currentPlaylist].noChangeCount > 3){ + // INFO_MSG("enough!"); +// return; + // } + } + } + + //check if other playlists have to be reloaded, and do so. +// printBuffer(); + getNextSegment(); + } + + int b = Util::bootSecs(); + + if(!readNextFile()) { + + if(playlists[currentPlaylist].playlistType == LIVE){ + //need to reload all available playlists. update the map with the amount of ms to wait before the next check. + + if(reloadNext.size() < playlists.size()) + { + reloadNext.push_back(Util::bootSecs() + playlists[currentPlaylist].waitTime); + currentPlaylist++; + }else{ + //set specific elements with the correct bootsecs() + //reloadNext.at(currentPlaylist) = Util::bootSecs() + playlists[currentPlaylist].waitTime; + reloadNext.at(currentPlaylist) = b + playlists[currentPlaylist].waitTime; + //for(int i = 0; i < reloadNext.size(); i++) + //{ + //INFO_MSG("Reload vector index %d, time: %d", i,reloadNext[i]- Util::bootSecs()); + //} + initDone = true; + } + + int timeToWait = reloadNext.at(currentPlaylist) - Util::bootSecs(); + + if(playlists[currentPlaylist].vodLive){ + //if(currentPlaylist == playlists.size()-1)//if last playlist, put a delay + if(currentPlaylist == 0) + { + timeToWait = 0; //playlists[currentPlaylist].waitTime; + }else{ + timeToWait = 0; + } + }else{ + //at this point, we need to check which playlist we need to reload, and keep reading from that playlist until EndOfPlaylist + std::vector::iterator result = std::min_element(reloadNext.begin(), reloadNext.end()); + int playlistToReload = std::distance(reloadNext.begin(), result); + currentPlaylist = playlistToReload; + } + //dont wait the first time. + if(timeToWait > 0 && initDone && playlists[currentPlaylist].noChangeCount > 0) + { + if(timeToWait > playlists[currentPlaylist].waitTime){ + WARN_MSG("something is not right..."); + return; + } + + if(playlists[currentPlaylist].noChangeCount < 2){ + timeToWait /= 2;//wait half of the segment size when no segments are found. + } + }else{ +// INFO_MSG("no need to delay, update time already past"); + } + + if(playlists[currentPlaylist].playlistEnd){ + INFO_MSG("Playlist %d has reached his end!"); + thisPacket.null(); + return; + } + + if(playlists[currentPlaylist].vodLive){ + currentPlaylist++; +// INFO_MSG("currentplaylist: %d, playlistsize: %d",currentPlaylist, playlists.size()); + if(currentPlaylist >= playlists.size()) + { + currentPlaylist = 0; + for(int i = 0;i < playlists.size();i++) + { + INFO_MSG("p %d entry 0: %s",i, playlists[i].entries[0].filename.c_str()); + } + + Util::wait(1000); + } + } + }else{ + return; + } + } + + if(isUrl){ + if (playlists[currentPlaylist].packetPtr - playlists[currentPlaylist].source.c_str() +188 <= playlists[currentPlaylist].source.size()) + { + tsBuf.FromPointer(playlists[currentPlaylist].packetPtr); + endOfFile = false; + }else{ + endOfFile = true; + } + + playlists[currentPlaylist].packetPtr += 188; + + }else{ + tsBuf.FromStream(in); + endOfFile = in.eof(); + } + }else{ +// INFO_MSG("not eof, read: %d, total: %d", packetPtr-source.data(), source.size()); + } + + if(!endOfFile){ + tsStream.parse(tsBuf, 0); + if(playlists[currentPlaylist].playlistType == LIVE){ + hasPacket = tsStream.hasPacketOnEachTrack() || (endOfFile && tsStream.hasPacket()); + }else{ + hasPacket = tsStream.hasPacket(getMappedTrackId(tid)); + } + } + } + + if(playlists[currentPlaylist].playlistType == LIVE){ + tsStream.getEarliestPacket(thisPacket); + tid = getOriginalTrackId(currentPlaylist,thisPacket.getTrackId()); + }else{ + tsStream.getPacket(getMappedTrackId( tid), thisPacket); + } + + if(!thisPacket) { + FAIL_MSG("Could not getNExt TS packet!"); + return; + } + + //overwrite trackId + Bit::htobl(thisPacket.getData()+8,tid); + } + + + void inputHLS::readPMT(){ + if(isUrl){ + + size_t bpos; + TS::Packet tsBuffer; + const char *tmpPtr = playlists[currentPlaylist].source.data(); + + while (!tsStream.hasPacketOnEachTrack() && (tmpPtr - playlists[currentPlaylist].source.c_str() +188 <= playlists[currentPlaylist].source.size())) + { + tsBuffer.FromPointer(tmpPtr); + tsStream.parse(tsBuffer, 0); + tmpPtr += 188; + } + tsStream.partialClear(); + + }else{ + size_t bpos = in.tellg(); + in.seekg(0, in.beg); + TS::Packet tsBuffer; + while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromStream(in)) { + tsStream.parse(tsBuffer, 0); + } + + //tsStream.clear(); + tsStream.partialClear(); //?? partialclear gebruiken?, input raakt hierdoor inconsistent.. + + in.seekg(bpos,in.beg); + } + } + + void inputHLS::seek(int seekTime) { + INFO_MSG("SEEK"); + tsStream.clear(); + readPMT(); + int trackId = 0; + + unsigned long seekPos = 0xFFFFFFFFull; + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { + unsigned long thisBPos = 0; + for (std::deque::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++) { + if (keyIt->getTime() > seekTime) { + break; + } + thisBPos = keyIt->getBpos(); + } + if (thisBPos < seekPos) { + seekPos = thisBPos; + trackId = *it; + } + } + + int playlistId = getMappedTrackPlaylist(trackId); + int entryId = seekPos-1; + + if(entryId < 0) { + WARN_MSG("attempted to seek outside the file"); + return; + } + + currentIndex = entryId; + currentPlaylist = playlistId; + + if(isUrl){ + openURL((playlists[currentPlaylist].uri_root+ playlists[currentPlaylist].entries.at(entryId).filename).c_str(), playlists[currentPlaylist]); + + }else{ + in.close(); + in.open((playlists[currentPlaylist].uri_root+ playlists[currentPlaylist].entries.at(entryId).filename).c_str()); + } + } + + int inputHLS::getEntryId(int playlistId, uint64_t bytePos) { + if(bytePos == 0) { return 0;} + + for(int i = 0;i bytePos) { + return i-1; + } + } + + return playlists[playlistId].entries.size()-1; + } + + int inputHLS::getOriginalTrackId(int playlistId,int id) { + return pidMapping[(playlistId << 16) + id]; + } + + int inputHLS::getMappedTrackId(int id) { + return (pidMappingR[id] & 0xFFFF); + } + + int inputHLS::getMappedTrackPlaylist(int id) { + return (pidMappingR[id] >> 16); + } + + ///Very first function to be called on a regular playlist or variant playlist. + bool inputHLS::initPlaylist(std::string uri) { + std::string line; + bool ret = false; + startTime = Util::bootSecs(); + + playlistRootPath = uri.substr(0,uri.rfind("/")+1); + + std::istringstream urlSource; + std::ifstream fileSource; + + if(uri.compare(0,7,"http://") == 0){ + isUrl = true; + Playlist p; + openURL(uri,p); + init_source = p.source; + urlSource.str(init_source); + }else{ + fileSource.open(uri.c_str()); + } + + std::istream & input = (isUrl ? (std::istream&)urlSource : (std::istream&)fileSource); + std::getline(input,line); + + while(std::getline(input, line)) { + if(!line.empty()){ //skip empty lines in the playlist + if (line.compare(0,17,"#EXT-X-STREAM-INF") == 0) { + //this is a variant playlist file.. next line is an uri to a playlist file + std::getline(input, line); + ret = readPlaylist(playlistRootPath + line); + }else if(line.compare(0,12,"#EXT-X-MEDIA") == 0){ + //this is also a variant playlist, but streams need to be processed another way + + std::string mediafile; + if(line.compare(18,5,"AUDIO") == 0) { + //find URI attribute + int pos = line.find("URI"); + if (pos != std::string::npos) { + mediafile = line.substr(pos+5,line.length()-pos-6); + ret = readPlaylist(playlistRootPath + mediafile); + } + } + + }else if(line.compare(0,7,"#EXTINF") ==0) { + //current file is not a variant playlist, but regular playlist. + ret = readPlaylist(uri); + break; + }else{ + //ignore wrong lines + WARN_MSG("ignore wrong line: %s",line.c_str()); + } + } + } + + if(!isUrl){ + fileSource.close(); + } + + initDone = true; + return ret; + } + + ///Function for reading every playlist. + bool inputHLS::readPlaylist(std::string uri) { + std::string line; + std::string key; + std::string val; + Playlist p; + int count = 0; + p.lastTimestamp = 0; + p.uri = uri; + uint64_t totalBytes = 0; + p.uri_root = uri.substr(0,uri.rfind("/")+1); + p.playlistType = LIVE; //TMP + INFO_MSG("readplaylist: %s",uri.c_str()); + + std::istringstream urlSource; + std::ifstream fileSource; + + p.id = playlists.size(); + + if(uri.compare(0,7,"http://") == 0){ + isUrl = true; + openURL(uri,p); + urlSource.str(p.source); + }else{ + fileSource.open(uri.c_str()); + isUrl = false; + } + + std::istream & input = (isUrl ? (std::istream&)urlSource : (std::istream&)fileSource); + std::getline(input,line); + + while(std::getline(input, line)) { + cleanLine(line); + + if(!line.empty()){ + if (line.compare(0,7,"#EXT-X-") == 0) { + size_t pos = line.find(":"); + key = line.substr(7,pos-7); + val = line.c_str() + pos + 1; + + if(key == "VERSION") { + p.version = atoi(line.c_str()+pos+1); + } + + if(key == "TARGETDURATION") { + p.targetDuration = atoi(line.c_str()+pos+1); + p.waitTime = p.targetDuration; + } + + if(key == "MEDIA-SEQUENCE") { + p.media_sequence = atoi(line.c_str()+pos+1); + p.lastFileIndex = p.media_sequence; + } + + if(key == "PLAYLIST-TYPE") { + if(val == "VOD") { + p.playlistType = VOD; + }else if(val == "LIVE") { + p.playlistType = LIVE; + }else if(val == "EVENT") { + p.playlistType = EVENT; + } + } + + if(key == "ENDLIST"){ + //end of playlist reached! + p.playlistEnd = true; + p.playlistType = VOD; + } + + } + else if(line.compare(0,7,"#EXTINF") == 0) { + float f = atof(line.c_str()+8); + std::string filename; + std::getline(input,filename); + addEntryToPlaylist(p,filename,f,totalBytes); + count++; + } + else { + VERYHIGH_MSG("ignoring wrong line: %s.", line.c_str()); + continue; + } + } + } + + if(isUrl) + { + p.playlistType = LIVE;//VOD over HTTP needs to be processed as LIVE. + //p.vodLive= true; + + p.playlistEnd = false; + fileSource.close(); + } + + //set size of reloadNext to playlist count with default value 0 + playlists.push_back(p); + + if(reloadNext.size() < playlists.size()){ + reloadNext.resize(playlists.size()); + } + + reloadNext.at(p.id) = Util::bootSecs() + p.waitTime; + return true; + } + + ///For debugging purposes only. prints the entries for every playlist which needs to be processed. + void inputHLS::printBuffer() + { + INFO_MSG("--------------------------- printbuffer---------------------#######"); + for(int i = 0;i < playlists.size();i++){ + for(int j = 0;j 0){ + INFO_MSG("breaking here!!!!!!!!!!!!"); + fileSource.close(); + return true; + + break;//max files to process + }*/ + + } + + if(isUrl){ + fileSource.close(); + } + + ret = (count >0); + + if(ret){ + + p.noChangeCount = 0; + }else{ +// INFO_MSG("playlist %d reloaded without changes!, checked %d times...",p.id,p.noChangeCount); + p.noChangeCount++; + if(p.noChangeCount > 3){ + VERYHIGH_MSG("enough!"); + //return; + } + } + + return ret; + } + + //remove trailing \r for windows generated playlist files + int inputHLS::cleanLine(std::string &s) { + if (s.length() > 0 && s.at( s.length() - 1 ) == '\r') { + s.erase(s.size() - 1); + } + } + + bool inputHLS::openURL(std::string urlString, Playlist &p){ + //HTTP::URL url("http://nikujkjk"); + HIGH_MSG("opening URL: %s",urlString.c_str()); + + HTTP::URL url(urlString); + if (url.protocol != "http"){ + FAIL_MSG("Protocol %s is not supported", url.protocol.c_str()); + return false; + } + + //if connection is open, reuse this connection + if(!conn){ +// INFO_MSG("init not connected"); + conn = Socket::Connection(url.host, url.getPort(), false); + if(!conn){ + INFO_MSG("Failed to reach %s on port %lu", url.host.c_str(), url.getPort()); + return false; + } + + } + + HTTP::Parser http; + http.url = "/" + url.path; + http.method = "GET"; + http.SetHeader("Host", url.host); + http.SetHeader("X-MistServer", PACKAGE_VERSION); + + conn.SendNow(http.BuildRequest()); + http.Clean(); + + unsigned int startTime = Util::epoch(); + p.source.clear(); + p.packetPtr = 0; + while ((Util::epoch() - startTime < 10) && (conn || conn.Received().size())){ + if (conn.spool() || conn.Received().size()){ + if (http.Read(conn)){ + p.source = http.body; + p.packetPtr = p.source.data(); + conn.close(); + return true; + } + } + } + + if (conn){ + FAIL_MSG("Timeout!"); + conn.close(); + }else{ + FAIL_MSG("Lost connection!"); + INFO_MSG("bytes received %d",conn.Received().size()); + } + + return false; + } + + ///Read next .ts file from the playlist. (from the list of entries which needs to be processed) + bool inputHLS::readNextFile() { + tsStream.clear(); + + if(!playlists[currentPlaylist].entries.size()){ + VERYHIGH_MSG("no entries found in playlist: %d!",currentPlaylist); + return false; + } + + std::string url = (playlists[currentPlaylist].uri_root + playlists[currentPlaylist].entries.front().filename).c_str(); + + if(isUrl){ + if(openURL(url,playlists[currentPlaylist])){ + playlists[currentPlaylist].entries.pop_front(); //remove the item which is opened for reading. + }else{ + + } + } + + if(playlists[currentPlaylist].playlistType == LIVE){ + in.close(); + in.open(url.c_str()); + + // INFO_MSG("\t\t\t\t\t\t\t\t ############ reading segment: %s for playlist: %d",url.c_str(), currentPlaylist) ; + if(in.good()){ + playlists[currentPlaylist].entries.pop_front(); //remove the item which is opened for reading. + return true; + }else{ + return false; + } + }else{ + currentIndex++; + if(playlists[currentPlaylist].entries.size() <= currentIndex) { + INFO_MSG("end of playlist reached!"); + return false; + }else{ + in.close(); + url = playlists[currentPlaylist].uri_root + playlists[currentPlaylist].entries.at(currentIndex).filename; + + // INFO_MSG("\t\t\t\t\t\t\t\t ############ reading segment: %s for playlist: %d",url.c_str(), currentPlaylist) ; + in.open(url.c_str()); + return true; + } + } + } + + ///return the playlist id from which we need to read the first upcoming segment by timestamp. this will keep the playlists in sync while reading segments. + int inputHLS::firstSegment(){ + bool foundSegment = false; + int firstTimeStamp = 0; + int tmp = 0; + + if(playlists.size() <=0){ + //do nothing, there is only one playlist, but return true when there are segments to process + return (playlists[0].entries.size() > 0); + } + + for(int i = 0;i 0){ + if(playlists[i].entries.front().timestamp < firstTimeStamp || !foundSegment){ + firstTimeStamp = playlists[i].entries.front().timestamp; + foundSegment = true; +// currentPlaylist = i; + tmp = i; + } + } + } + + if(foundSegment){ + return tmp; + }else{ + return -1; + } + } + + //read the next segment + bool inputHLS::getNextSegment(){ + int tmp = 0; + bool foundSegment = false; + + tmp = firstSegment(); + foundSegment = (tmp >= 0); +// currentPlaylist = tmp; + + if(foundSegment){ + int segmentTime = playlists[tmp].entries.front().timestamp - Util::bootSecs(); + if(playlists[tmp].entries.front().timestamp - Util::bootSecs() > 0) + { + int t = playlists[tmp].entries.front().timestamp - Util::bootSecs() -1; + while(t > 1){ + Util::wait(1000); + t--; + nProxy.userClient.keepAlive(); + } +// printBuffer(); + } + + }else{ + VERYHIGH_MSG("no segments found!"); + } + + //first segment is set currentPlaylist with the first entry. + return foundSegment; + } + +} + diff --git a/src/input/input_hls.h b/src/input/input_hls.h new file mode 100644 index 00000000..dc25bcea --- /dev/null +++ b/src/input/input_hls.h @@ -0,0 +1,148 @@ +#pragma once +#include "input.h" +#include +#include +#include +#include +#include +#include +#include +#include +//#include + +#define BUFFERTIME 10 + +namespace Mist { + + enum PlaylistType { VOD, LIVE, EVENT }; + + + struct playListEntries + { + std::string filename; + uint64_t bytePos; + float duration; + unsigned int timestamp; + unsigned int wait; + }; + + class Playlist { + public: + Playlist(); + std::string codecs; + std::string video; + std::string audio; + std::string uri; + std::string uri_root; + + std::string source; + const char *packetPtr; + + int id; + bool playlistEnd; + int noChangeCount; + int version; + int targetDuration; + uint64_t media_sequence; + int lastFileIndex; + int waitTime; + bool vodLive; + PlaylistType playlistType; + std::deque entries; + int entryCount; + int programId; + int bandwidth; + unsigned int lastTimestamp; + }; + + + struct entryBuffer + { + int timestamp; + playListEntries entry; + int playlistIndex; + }; + + class inputHLS : public Input { + public: + inputHLS(Util::Config * cfg); + ~inputHLS(); + bool needsLock(); + bool openStreamSource(); + protected: + //Private Functions + + unsigned int startTime; + PlaylistType playlistType; + int version; + int targetDuration; + int media_sequence; + bool endPlaylist; + int currentPlaylist; + + bool initDone; + std::string init_source; + + //std::vector entries; + std::vector playlists; + //std::vector pidMapping; + std::map pidMapping; + std::map pidMappingR; + + std::string playlistFile; + std::string playlistRootPath; + std::vector reloadNext; + + + bool liveStream; + int currentIndex; + std::string currentFile; + std::ifstream in; + bool isUrl; + + TS::Stream tsStream;///getString("input"); @@ -205,7 +204,17 @@ namespace Mist { } myMeta.update(headerPack); } - + } + + DTSC::Packet headerPack; + tsStream.getEarliestPacket(headerPack); + + while (headerPack) { + if (!myMeta.tracks.count(headerPack.getTrackId()) || !myMeta.tracks[headerPack.getTrackId()].codec.size()) { + tsStream.initializeMetadata(myMeta, headerPack.getTrackId()); + } + myMeta.update(headerPack); + tsStream.getEarliestPacket(headerPack); } fseek(inFile, 0, SEEK_SET); @@ -230,9 +239,6 @@ namespace Mist { hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); } if (!hasPacket) { - if (!feof(inFile)) { - getNext(); - } return; } if (selectedTracks.size() == 1) { @@ -264,14 +270,12 @@ namespace Mist { } //Clear leaves the PMT in place - tsStream.clear(); - + tsStream.partialClear(); //Restore original file position if (fseek(inFile, bpos, SEEK_SET)) { return; } - } ///Seeks to a specific time