diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index 8341c0d1..a25257ad 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -5,6 +5,7 @@ #include "mp4_generic.h" #include "nal.h" #include +#include namespace TS{ @@ -222,6 +223,8 @@ namespace TS{ return; } + + if (threaded){globalSem.wait();} bool parsePes = false; @@ -229,18 +232,21 @@ namespace TS{ int packNum = 1; // Usually we append a packet at a time, so the start code is expected to show up at the end. std::deque &inStream = pesStreams[tid]; - if (inStream.rbegin()->getUnitStart()){ - parsePes = true; - }else{ - // But, sometimes (e.g. live) we do multiples, and need to check all of it... - std::deque::iterator lastPack = inStream.end(); - std::deque::iterator curPack = inStream.begin(); - curPack++; - while (curPack != lastPack && !curPack->getUnitStart()){ + + if(inStream.size() > 1) { + if (inStream.rbegin()->getUnitStart()){ + parsePes = true; + }else{ + // But, sometimes (e.g. live) we do multiples, and need to check all of it... + std::deque::iterator lastPack = inStream.end(); + std::deque::iterator curPack = inStream.begin(); curPack++; - packNum++; + while (curPack != lastPack && !curPack->getUnitStart()){ + curPack++; + packNum++; + } + if (curPack != lastPack){parsePes = true;} } - if (curPack != lastPack){parsePes = true;} } if (threaded){globalSem.post();} @@ -249,8 +255,9 @@ namespace TS{ void Stream::parse(Packet &newPack, unsigned long long bytePos){ add(newPack, bytePos); - int tid = newPack.getPID(); - parse(tid); + if (newPack.getUnitStart()){ + parse(newPack.getPID()); + } } bool Stream::hasPacketOnEachTrack() const{ @@ -287,7 +294,8 @@ namespace TS{ bool Stream::hasPacket(unsigned long tid) const{ if (threaded){globalSem.wait();} - if (!pesStreams.count(tid)){ + std::map >::const_iterator pesIt = pesStreams.find(tid); + if (pesIt == pesStreams.end()){ if (threaded){globalSem.post();} return false; } @@ -295,13 +303,15 @@ namespace TS{ if (threaded){globalSem.post();} return true; } - std::deque::const_iterator curPack = pesStreams.at(tid).begin(); + const std::deque & thisStream = pesIt->second; + std::deque::const_iterator curPack = thisStream.begin(); + std::deque::const_iterator endPack = thisStream.end(); - if (curPack != pesStreams.at(tid).end()){curPack++;} + if (curPack != endPack){curPack++;} - while (curPack != pesStreams.at(tid).end() && !curPack->getUnitStart()){curPack++;} + while (curPack != endPack && !curPack->getUnitStart()){curPack++;} - if (curPack != pesStreams.at(tid).end()){ + if (curPack != endPack){ if (threaded){globalSem.post();} return true; } @@ -371,10 +381,17 @@ namespace TS{ int packNum = 1; std::deque::iterator curPack = inStream.begin(); - curPack++; - while (curPack != inStream.end() && !curPack->getUnitStart()){ + + if (inStream.rbegin()->getUnitStart()){ + packNum = inStream.size() - 1; + curPack = inStream.end(); + curPack --; + }else{ curPack++; - packNum++; + while (curPack != inStream.end() && !curPack->getUnitStart()){ + curPack++; + packNum++; + } } if (!finished && curPack == inStream.end()){ if (threaded){globalSem.post();} @@ -396,6 +413,11 @@ namespace TS{ // allocate a buffer, do it all again, but this time also copy the data bytes over to char* // payload char *payload = (char *)malloc(paySize); + if(!payload){ + FAIL_MSG("cannot allocate PES packet!"); + return; + } + paySize = 0; curPack = inStream.begin(); int lastCtr = curPack->getContinuityCounter() - 1; @@ -452,9 +474,11 @@ namespace TS{ // We substract PES_header_data_length, plus the 9 bytes of mandatory header bytes realPayloadSize -= (9 + pesHeader[8]); + // Read the metadata for this PES Packet ///\todo Determine keyframe-ness - unsigned int timeStamp = 0; + uint64_t timeStamp = 0; + int64_t timeOffset = 0; unsigned int pesOffset = 9; // mandatory headers if ((pesHeader[7] >> 6) & 0x02){// Check for PTS presence @@ -469,6 +493,7 @@ namespace TS{ } } + if (pesHeader[7] & 0x20){// ESCR - ignored pesOffset += 6; } @@ -491,7 +516,7 @@ namespace TS{ } const char *pesPayload = pesHeader + pesOffset; - parseBitstream(tid, pesPayload, realPayloadSize, timeStamp, timeOffset, bPos); + parseBitstream(tid, pesPayload, realPayloadSize, timeStamp, timeOffset, bPos, pesHeader[6] & 0x04 ); // Shift the offset by the payload size, the mandatory headers and the optional // headers/padding @@ -501,10 +526,13 @@ namespace TS{ } void Stream::parseBitstream(uint32_t tid, const char *pesPayload, uint32_t realPayloadSize, - uint64_t timeStamp, int64_t timeOffset, uint64_t bPos){ + uint64_t timeStamp, int64_t timeOffset, uint64_t bPos, bool alignment){ +//INFO_MSG("timestamp: %llu offset: %lld", timeStamp, timeOffset); // Create a new (empty) DTSC Packet at the end of the buffer - if (pidToCodec[tid] == AAC){ + unsigned long thisCodec = pidToCodec[tid]; + std::deque & out = outPackets[tid]; + if (thisCodec == AAC){ // Parse all the ADTS packets unsigned long offsetInPes = 0; uint64_t msRead = 0; @@ -522,8 +550,8 @@ namespace TS{ MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); adtsInfo[tid] = adtsPack; } - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill( + out.push_back(DTSC::Packet()); + out.back().genericFill( timeStamp - ((adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency()), timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), remainders[tid].getBpos(), 0); @@ -538,8 +566,8 @@ namespace TS{ MEDIUM_MSG("Setting new ADTS header: %s", adtsPack.toPrettyString().c_str()); adtsInfo[tid] = adtsPack; } - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill(timeStamp + msRead, timeOffset, tid, + out.push_back(DTSC::Packet()); + out.back().genericFill(timeStamp + msRead, timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos, 0); msRead += (adtsPack.getSampleCount() * 1000) / adtsPack.getFrequency(); @@ -558,25 +586,71 @@ namespace TS{ } if (threaded){globalSem.post();} } - if (pidToCodec[tid] == ID3 || pidToCodec[tid] == AC3){ + if (thisCodec == ID3 || thisCodec == AC3){ if (threaded){globalSem.wait();} - outPackets[tid].push_back(DTSC::Packet()); - outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, + out.push_back(DTSC::Packet()); + out.back().genericFill(timeStamp, timeOffset, tid, pesPayload, realPayloadSize, bPos, 0); if (threaded){globalSem.post();} } - if (pidToCodec[tid] == H264 || pidToCodec[tid] == H265){ + if (thisCodec == H264 || thisCodec == H265){ const char *nextPtr; const char *pesEnd = pesPayload+realPayloadSize; bool isKeyFrame = false; uint32_t nalSize = 0; + + nextPtr = nalu::scanAnnexB(pesPayload, realPayloadSize); if (!nextPtr){ - WARN_MSG("No H264 start code found in entire PES packet!"); - return; + nextPtr = pesEnd; + nalSize = realPayloadSize; + if(!alignment && timeStamp && buildPacket.count(tid) && timeStamp != buildPacket[tid].getTime()){ + FAIL_MSG("No startcode in packet @ %llu ms, and time is not equal to %llu ms so can't merge", timeStamp, buildPacket[tid].getTime()); + return; + } + if (alignment){ + // If the timestamp differs from current PES timestamp, send the previous packet out and + // fill a new one. + if (buildPacket[tid].getTime() != timeStamp){ + // Add the finished DTSC packet to our output buffer + if (threaded){globalSem.wait();} + out.push_back(buildPacket[tid]); + if (threaded){globalSem.post();} + + uint32_t size; + char * tmp ; + buildPacket[tid].getString("data", tmp, size); + + INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime()) + + if (threaded){globalSem.post();} + // Create a new empty packet with the key frame bit set to true + buildPacket[tid].null(); + buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); + buildPacket[tid].setKeyFrame(false); + } + + if (!buildPacket.count(tid)){ + buildPacket[tid].genericFill(timeStamp, timeOffset, tid, 0, 0, bPos, true); + buildPacket[tid].setKeyFrame(false); + } + + // Check if this is a keyframe + parseNal(tid, pesPayload, nextPtr, isKeyFrame); + // If yes, set the keyframe flag + if (isKeyFrame){ + buildPacket[tid].setKeyFrame(true); + } + + // No matter what, now append the current NAL unit to the current packet + buildPacket[tid].appendNal(pesPayload, nalSize); + }else{ + buildPacket[tid].upgradeNal(pesPayload, nalSize); + return; + } } while (nextPtr < pesEnd){ @@ -603,7 +677,13 @@ namespace TS{ if (buildPacket[tid].getTime() != timeStamp){ // Add the finished DTSC packet to our output buffer if (threaded){globalSem.wait();} - outPackets[tid].push_back(buildPacket[tid]); + out.push_back(buildPacket[tid]); + + uint32_t size; + char * tmp ; + buildPacket[tid].getString("data", tmp, size); + + // INFO_MSG("buildpacket: size: %d, timestamp: %llu", size, buildPacket[tid].getTime()) if (threaded){globalSem.post();} // Create a new empty packet with the key frame bit set to true buildPacket[tid].null(); @@ -611,12 +691,14 @@ namespace TS{ buildPacket[tid].setKeyFrame(false); } // No matter what, now append the current NAL unit to the current packet - buildPacket[tid].appendNal(pesPayload, nalSize, nalSize); + buildPacket[tid].appendNal(pesPayload, nalSize); } if (((nextPtr - pesPayload) + 3) >= realPayloadSize){return;}//end of the line + realPayloadSize -= ((nextPtr - pesPayload) + 3); // decrease the total size pesPayload = nextPtr + 3; + nextPtr = nalu::scanAnnexB(pesPayload, realPayloadSize); } } diff --git a/lib/ts_stream.h b/lib/ts_stream.h index b7734d3e..21237069 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -56,7 +56,7 @@ namespace TS{ void eraseTrack(unsigned long tid); bool isDataTrack(unsigned long tid); void parseBitstream(uint32_t tid, const char *pesPayload, uint32_t realPayloadSize, - uint64_t timeStamp, int64_t timeOffset, uint64_t bPos); + uint64_t timeStamp, int64_t timeOffset, uint64_t bPos, bool alignment); std::set getActiveTracks(); private: